2 Commity 1574fad26a ... 4373e94ca6

Autor SHA1 Wiadomość Data
  Walker 4373e94ca6 Merge branch 'master' of http://gogs.ywtinfo.com/ywt_rebuild_group/ywt-platform-outpatient-parent 1 rok temu
  Walker faf2595c1d feature: 增加 grpc 回调 1 rok temu

+ 4 - 0
ywt-platform-outpatient-rpc/pom.xml

@@ -22,6 +22,10 @@
             <groupId>com.ywt</groupId>
             <artifactId>ywt-platform-outpatient-sdk</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
 
     </dependencies>
 

+ 72 - 0
ywt-platform-outpatient-rpc/src/main/java/com/ywt/outpatient/rpc/mq/KafkaMsgListener.java

@@ -0,0 +1,72 @@
+package com.ywt.outpatient.rpc.mq;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.ywt.biz.common.util.Checker;
+import com.ywt.gapi.nat.PayCallbackRequest;
+import com.ywt.gapi.nat.RefundNatOrderCallbackRequest;
+import com.ywt.outpatient.rpc.provider.NatProvider;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Walker
+ * Created on 2024/1/15
+ */
+
+@Component
+@ConditionalOnProperty(value = "spring.kafka.consumer.enable", havingValue = "true")
+public class KafkaMsgListener {
+    private final Logger logger = LoggerFactory.getLogger(KafkaMsgListener.class);
+
+    public static final String GROUP_NAME = "outpatient-rpc";
+
+    @Autowired
+    private NatProvider natProvider;
+
+
+    @KafkaListener(topics = "/com.ywt.gapi.nat.NatService/refundNatOrderCallback", groupId = GROUP_NAME)
+    public void refundNatOrderCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        logger.info("KafkaMsgListener#refundNatOrderCallback(topic={}, partition={}, value={}): 开始消费", record.topic(), record.partition(), record.value());
+        try {
+            boolean isValid = JSON.isValid(record.value());
+            if (isValid) {
+                JSONObject jsonObject = JSONObject.parseObject(record.value());
+                natProvider.refundNatOrderCallback(RefundNatOrderCallbackRequest.newBuilder()
+                        .setRefundNo(Checker.getStringValue(jsonObject.getString("refundNo")))
+                        .setRefundStatus(Checker.getIntegerValue(jsonObject.getIntValue("refundStatus")))
+                        .build());
+                logger.info("KafkaMsgListener#refundNatOrderCallback(topic={}, partition={}, value={}): 消费完成", record.topic(), record.partition(), record.value());
+            }
+        } catch (Exception e) {
+            logger.error("KafkaMsgListener#refundNatOrderCallback(topic={}, partition={}, value={}):\n {}", record.topic(), record.partition(), record.value(), e.getMessage(), e);
+        }
+        ack.acknowledge();
+    }
+
+    @KafkaListener(topics = "/com.ywt.gapi.nat.NatService/payCallback", groupId = GROUP_NAME)
+    public void payCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        logger.info("KafkaMsgListener#payCallback(topic={}, partition={}, value={}): 开始消费", record.topic(), record.partition(), record.value());
+        try {
+            boolean isValid = JSON.isValid(record.value());
+            if (isValid) {
+                JSONObject jsonObject = JSONObject.parseObject(record.value());
+                natProvider.payCallback(PayCallbackRequest.newBuilder()
+                        .setPaymentNo(Checker.getStringValue(jsonObject.getString("paymentNo")))
+                        .setPaymentChannel(Checker.getIntegerValue(jsonObject.getIntValue("paymentChannel")))
+                        .setBizId(Checker.getIntegerValue(jsonObject.getIntValue("bizId")))
+                        .build());
+                logger.info("KafkaMsgListener#payCallback(topic={}, partition={}, value={}): 消费完成", record.topic(), record.partition(), record.value());
+            }
+        } catch (Exception e) {
+            logger.error("KafkaMsgListener#payCallback(topic={}, partition={}, value={}):\n {}", record.topic(), record.partition(), record.value(), e.getMessage(), e);
+        }
+        ack.acknowledge();
+    }
+}

+ 13 - 0
ywt-platform-outpatient-rpc/src/main/resources/application.yml

@@ -34,6 +34,19 @@ spring:
       poolName: HikariPool_center
       maxLifetime: 1800000
       connectionTestQuery: SELECT 1
+  kafka:
+    consumer: # consumer消费者
+      enable: true
+      group-id: ${spring.application.name} # 默认的消费组ID
+      enable-auto-commit: false # 是否自动提交offset
+      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
+      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
+      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
+      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
+      auto-offset-reset: latest
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+
 #dubbo 相关配置
 dubbo:
   application: