Browse Source

UPDATE-兼容消息队列

liyang1 6 months ago
parent
commit
07ee027a44

+ 1 - 1
README.md

@@ -68,4 +68,4 @@
             </dependency>
    ```
   
-## 参考文档
+## 参考文档

+ 1 - 0
pom.xml

@@ -114,6 +114,7 @@
 		<dependency>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-spring-boot-starter</artifactId>
+			<scope>provided</scope>
 		</dependency>
 
 

+ 44 - 36
src/main/java/com/ywt/biz/common/config/rocketmq/BaseMqMessageListener.java

@@ -1,17 +1,14 @@
 package com.ywt.biz.common.config.rocketmq;
 
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
+import com.ywt.biz.common.constant.MsgConstants;
+import com.ywt.biz.common.exception.YwtException;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import javax.annotation.Resource;
+import java.io.Serializable;
 import java.time.Instant;
-import java.util.Objects;
 
 /**
  * 抽象消息监听器,封装了所有公共处理业务,如
@@ -24,13 +21,16 @@ import java.util.Objects;
  * @author tianxincoord@163.com
  * @since 2022/4/17
  */
-public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
+public abstract class BaseMqMessageListener<T extends BaseMqMessage> implements Serializable {
     /**
      * 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化
      */
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
-    @Autowired
-    private RocketMQTemplate rocketMQTemplate;
+
+    private static  final String  Logger_prefix = "[{}]消费者收到消息[{}]";
+
+    @Autowired(required = false)
+    private RocketAnalyzeProducer rocketAnalyzeProducer;
 
     /**
      * 消息者名称
@@ -53,6 +53,7 @@ public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
      * @param message 待处理消息
      */
     protected abstract void overMaxRetryTimesMessage(T message);
+
     /**
      * 是否过滤消息,例如某些
      *
@@ -72,8 +73,8 @@ public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
 
     /**
      * 消费异常时是否抛出异常
-     *
-     * @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)
+     * 若返回true,则由rocketmq机制自动重试
+     * false:消费异常(如果没有开启重试则消息会被自动ack)
      */
     protected abstract boolean isThrowException();
 
@@ -96,18 +97,17 @@ public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
     }
 
     /**
-     * 由父类来完成基础的日志和调配,下面的只是提供一个思路
+     * 由父类来完成基础的日志和调配
      */
     public void dispatchMessage(T message) {
-        //MDC.put(RocketMqSysConstant.TRACE_ID, message.T());
-        // 基础日志记录被父类处理了
-        logger.info("[{}]消费者收到消息[{}]", consumerName(), message);
+        logger.info(Logger_prefix, consumerName(), message);
         if (isFilter(message)) {
-            logger.info("消息不满足消费条件,已过滤");
+            logger.info(Logger_prefix+"消息不满足消费条件,已过滤", consumerName(), message);
             return;
         }
         // 超过最大重试次数时调用子类方法处理
         if (message.getRetryTimes() > maxRetryTimes()) {
+            logger.error(Logger_prefix+"超过重试次数消息", consumerName(), message);
             overMaxRetryTimesMessage(message);
             return;
         }
@@ -115,34 +115,42 @@ public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
             long start = Instant.now().toEpochMilli();
             handleMessage(message);
             long end = Instant.now().toEpochMilli();
-            logger.info("消息消费成功,耗时[{}ms]", (end - start));
+            logger.info(Logger_prefix+"消息消费成功,耗时[{}ms]", consumerName(), message, (end - start));
         } catch (Exception e) {
-            logger.error("消息消费异常", e);
+            if(e instanceof YwtException){
+                logger.error(Logger_prefix+" 业务异常,原因:{}", consumerName(), message, ((YwtException) e).getErrDetail());
+            }else{
+                logger.error(Logger_prefix+ "消息消费系统异常", consumerName(), message, e);
+            }
             // 是捕获异常还是抛出,由子类决定
             if (isThrowException()) {
+                // 如果设置抛异常, 将会在按照Rocketmq消费者端的机制进行重试
                 throw new RuntimeException(e);
             }
             if (isRetry()) {
-                // 获取子类RocketMQMessageListener注解拿到topic和tag
-               /* RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
-                if (Objects.nonNull(annotation)) {
-                    message.setSource(message.getSource() + "消息重试");
-                    message.setRetryTimes(message.getRetryTimes() + 1);
-                    SendResult sendResult;
-                    try {
-                        // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
-                        // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
-                        sendResult = rocketMQTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
-                    } catch (Exception ex) {
-                        throw new RuntimeException(ex);
-                    }
-                    // 发送失败的处理就是不进行ACK,由RocketMQ重试
-                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
-                        throw new RuntimeException("重试消息发送失败");
-                    }
-                }*/
+                handleRetry(message);
             }
         }
     }
+
+    protected void handleRetry(T message) {
+        // 获取子类RocketMQMessageListener注解拿到topic和tag
+        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
+        if (annotation == null) {
+            return;
+        }
+        //重新构建消息体
+        String messageSource = message.getSource();
+        if (!messageSource.startsWith("retry")) {
+            message.setSource(MsgConstants.RETRY_PREFIX + messageSource);
+        }
+        message.setRetryTimes(message.getRetryTimes() + 1);
+
+        // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
+        boolean result = rocketAnalyzeProducer.delayConsumer(annotation.topic(), annotation.selectorExpression(), message, 10);
+        if (!result) {
+            throw new RuntimeException("重试消息发送失败");
+        }
+    }
 }
 

+ 2 - 7
src/main/java/com/ywt/biz/common/config/rocketmq/RocketAnalyzeProducer.java

@@ -155,12 +155,7 @@ public class RocketAnalyzeProducer {
      * @param delay 数量 单位秒
      * @param <T>
      */
-    public<T extends BaseMqMessage> void delayConsumer(String topic, String tag, T message,Integer delay){
-        //延迟时间,单位秒
-        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
-        rocketMQTemplate.syncSendDelayTimeSeconds(
-                buildDestination(topic, tag),
-                sendMessage,
-                delay);
+    public<T extends BaseMqMessage> boolean delayConsumer(String topic, String tag, T message,Integer delay){
+        return  delayConsumer(buildDestination(topic, tag),message,delay);
     }
 }

+ 0 - 15
src/main/java/com/ywt/biz/common/constant/KafkaTopicsConstants.java

@@ -1,15 +0,0 @@
-package com.ywt.biz.common.constant;
-
-/**
- * 用于 kafka 的主题定义规则
- * 规则:方法名-类名-模块-能力中心
- * 规则对应最新能力中心划分的方式来(倒序是为了方便引用)
- */
-public class KafkaTopicsConstants {
-
-    /**
-     * 我清除缓存
-     */
-    public static final String INVALIDATECACHE_SYSTEMSERVICEPROVIDER_RPC_INTERNETHOSPITAL = "invalidateCache.SystemServiceProvider.rpc.internethospital";
-
-}

+ 15 - 0
src/main/java/com/ywt/biz/common/constant/MsgConstants.java

@@ -0,0 +1,15 @@
+package com.ywt.biz.common.constant;
+
+/**
+ * 消息常量定义
+ */
+public class MsgConstants {
+
+    /**
+     * 我清除缓存
+     */
+   // public static final String INVALIDATECACHE_SYSTEMSERVICEPROVIDER_RPC_INTERNETHOSPITAL = "invalidateCache.SystemServiceProvider.rpc.internethospital";
+
+
+    public static final String  RETRY_PREFIX = "retry_msg";
+}