|
@@ -1,161 +0,0 @@
|
|
|
-package com.ywt.biz.common.config.rocketmq;
|
|
|
-
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
-import org.apache.rocketmq.client.producer.SendResult;
|
|
|
-import org.apache.rocketmq.client.producer.SendStatus;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
-import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
-import org.springframework.messaging.Message;
|
|
|
-import org.springframework.messaging.MessagingException;
|
|
|
-import org.springframework.messaging.support.MessageBuilder;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-
|
|
|
-/**
|
|
|
- * @author liyang
|
|
|
- * @version 1.0
|
|
|
- * @description: rocket生产者封装类
|
|
|
- * @date 2022/9/29 8:50
|
|
|
- */
|
|
|
-@Slf4j
|
|
|
-@Component
|
|
|
-@ConditionalOnProperty(value = "rocketmq.producer.enable",havingValue = "true")
|
|
|
-public class RocketAnalyzeProducer {
|
|
|
-
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private RocketMQTemplate rocketMQTemplate;
|
|
|
-
|
|
|
- /**
|
|
|
- * 构建目的地 String destination = topic + “:” + tag
|
|
|
- */
|
|
|
- public String buildDestination(String topic, String tag) {
|
|
|
- return topic + ":" + tag;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送同步消息 带主题和标签
|
|
|
- */
|
|
|
- public <T extends BaseMqMessage> boolean send(String topic, String tag, T message) {
|
|
|
- // 注意分隔符
|
|
|
- return send(buildDestination(topic, tag), message);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- public <T extends BaseMqMessage> boolean send(String destination, T message) {
|
|
|
- // 设置业务键,此处根据公共的参数进行处理
|
|
|
- // 更多的其它基础业务处理...
|
|
|
- boolean result = true;
|
|
|
- try{
|
|
|
- log.info("消息目的:[{}],消息Key:{},消息内容:{},开始发送同步消息",destination,message.getKey(),message);
|
|
|
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
|
|
|
- SendResult sendResult = rocketMQTemplate.syncSend(destination, sendMessage);
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送同步消息,发送结果[{}]", destination,message.getKey(), message, sendResult);
|
|
|
- if(SendStatus.SEND_OK.compareTo(sendResult.getSendStatus())!=0){
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送同步消息失败,status:{}", destination,message.getKey(), message, sendResult.getSendStatus());
|
|
|
- result = false;
|
|
|
- }
|
|
|
- }catch (MessagingException e ){
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送同步消息异常,reason:{}", destination,message.getKey(), message, e);
|
|
|
- result = false;
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送同步消息 带主题和标签
|
|
|
- */
|
|
|
-
|
|
|
- public <T extends BaseMqMessage> void asyncSend(String topic, String tag, T message, SendCallback sendCallBack) {
|
|
|
- // 注意分隔符
|
|
|
- asyncSend(buildDestination(topic, tag), message,sendCallBack);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 异步发生消息自定义回调
|
|
|
- *
|
|
|
- * @param destination 目的地
|
|
|
- * @param message 发送信息字符串
|
|
|
- * @param sendCallBack 发送回调
|
|
|
- */
|
|
|
- public <T extends BaseMqMessage> void asyncSend(String destination, T message, SendCallback sendCallBack) {
|
|
|
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
|
|
|
- rocketMQTemplate.asyncSend(destination, sendMessage, sendCallBack);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * producer 异步方式发送数据 通用回调
|
|
|
- *
|
|
|
- * @param destination 目的名称
|
|
|
- * @param message producer发送的数据
|
|
|
- */
|
|
|
- public <T extends BaseMqMessage> void asyncSend(String destination, T message) {
|
|
|
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
|
|
|
- log.info("消息目的:[{}],消息Key:{},消息内容:{},开始发送异步消息",destination,message.getKey(),message);
|
|
|
- rocketMQTemplate.asyncSend(destination, sendMessage, new SendCallback() {
|
|
|
- @Override
|
|
|
- public void onSuccess(SendResult var1) {
|
|
|
- if(SendStatus.SEND_OK.compareTo(var1.getSendStatus())!=0){
|
|
|
- log.error("消息目的:[{}],消息Key:{},发送异步消息结果失败,status:{}", destination,message.getKey(), var1.getSendStatus());
|
|
|
- }else{
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送异步消息状态成功",destination,message.getKey());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onException(Throwable var1) {
|
|
|
- log.info("消息目的:[{}],消息Key:{},消息内容:{},开始发送异步消息出现异常,reason:{}",destination,message.getKey(),message,var1.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 延时消息
|
|
|
- * @param destination
|
|
|
- * @param message
|
|
|
- * @param delay
|
|
|
- * @param <T>
|
|
|
- */
|
|
|
- public<T extends BaseMqMessage> boolean delayConsumer(String destination, T message,Integer delay){
|
|
|
- boolean result =true;
|
|
|
- //延迟时间,单位秒
|
|
|
- try {
|
|
|
- log.info("消息目的:[{}],消息Key:{},消息内容:{},开始发送延迟消息",destination,message.getKey(),message);
|
|
|
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
|
|
|
- SendResult sendResult = rocketMQTemplate.syncSendDelayTimeSeconds(
|
|
|
- destination,
|
|
|
- sendMessage,
|
|
|
- delay);
|
|
|
- if(SendStatus.SEND_OK.compareTo(sendResult.getSendStatus())!=0){
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送同步消息发送同步延迟消息成功,返回失败,status:{}", destination,message.getKey(), message, sendResult.getSendStatus());
|
|
|
- result = false;
|
|
|
- }else{
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送同步延迟消息成功,返回成功,status:{}", destination,message.getKey(), message, sendResult.getSendStatus());
|
|
|
- }
|
|
|
- }catch (MessagingException e){
|
|
|
- result = false;
|
|
|
- log.info("消息目的:[{}],消息Key:{},发送同步消息异常,reason:{}", destination,message.getKey(), message, e);
|
|
|
- }
|
|
|
- return result ;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 延时消息
|
|
|
- * @param topic
|
|
|
- * @param tag
|
|
|
- * @param message
|
|
|
- * @param delay 数量 单位秒
|
|
|
- * @param <T>
|
|
|
- */
|
|
|
- public<T extends BaseMqMessage> boolean delayConsumer(String topic, String tag, T message,Integer delay){
|
|
|
- return delayConsumer(buildDestination(topic, tag),message,delay);
|
|
|
- }
|
|
|
-}
|