瀏覽代碼

fix 修改mq,新增方法

daiyihua 1 年之前
父節點
當前提交
04f57e3502

+ 45 - 0
ywt-platform-outpatient-rpc/src/main/java/com/ywt/outpatient/rpc/mq/KafkaMsgListener.java

@@ -5,6 +5,8 @@ import com.alibaba.fastjson2.JSONObject;
 import com.ywt.biz.common.util.Checker;
 import com.ywt.gapi.nat.PayCallbackRequest;
 import com.ywt.gapi.nat.RefundNatOrderCallbackRequest;
+import com.ywt.gapi.third.cloudfilm.RefundCheckResultOrderCallbackRequest;
+import com.ywt.outpatient.rpc.provider.CloudfilmProvider;
 import com.ywt.outpatient.rpc.provider.NatProvider;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
@@ -30,6 +32,8 @@ public class KafkaMsgListener {
     @Autowired
     private NatProvider natProvider;
 
+    @Autowired
+    private CloudfilmProvider cloudfilmProvider;
 
     @KafkaListener(topics = "/com.ywt.gapi.nat.NatService/refundNatOrderCallback", groupId = GROUP_NAME)
     public void refundNatOrderCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
@@ -69,4 +73,45 @@ public class KafkaMsgListener {
         }
         ack.acknowledge();
     }
+
+
+    @KafkaListener(topics = "/com.ywt.gapi.third.cloudfilm.CloudfilmService/refundCheckResultOrderCallback", groupId = GROUP_NAME)
+    public void refundCheckResultOrderCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        logger.info("KafkaMsgListener#refundCheckResultOrderCallback(topic={}, partition={}, value={}): 开始消费", record.topic(), record.partition(), record.value());
+        try {
+            boolean isValid = JSON.isValid(record.value());
+            if (isValid) {
+                JSONObject jsonObject = JSONObject.parseObject(record.value());
+                cloudfilmProvider.refundCheckResultOrderCallback(RefundCheckResultOrderCallbackRequest.newBuilder()
+                        .setRefundStatus(Checker.getIntegerValue(jsonObject.getIntValue("refundStatus")))
+                        .setRefundNo(Checker.getStringValue(jsonObject.getString("refundNo")))
+                        .build());
+                logger.info("KafkaMsgListener#refundCheckResultOrderCallback(topic={}, partition={}, value={}): 消费完成", record.topic(), record.partition(), record.value());
+            }
+        } catch (Exception e) {
+            logger.error("KafkaMsgListener#refundCheckResultOrderCallback(topic={}, partition={}, value={}):\n {}", record.topic(), record.partition(), record.value(), e.getMessage(), e);
+        }
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(topics = "/com.ywt.gapi.third.cloudfilm.CloudfilmService/payCallback", groupId = GROUP_NAME)
+    public void cloudfilmServicePayCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        logger.info("KafkaMsgListener#cloudfilmServicePayCallback#payCallback(topic={}, partition={}, value={}): 开始消费", record.topic(), record.partition(), record.value());
+        try {
+            boolean isValid = JSON.isValid(record.value());
+            if (isValid) {
+                JSONObject jsonObject = JSONObject.parseObject(record.value());
+                cloudfilmProvider.payCallback(com.ywt.gapi.third.cloudfilm.PayCallbackRequest.newBuilder()
+                        .setPaymentNo(Checker.getStringValue(jsonObject.getString("paymentNo")))
+                        .setPaymentChannel(Checker.getIntegerValue(jsonObject.getIntValue("paymentChannel")))
+                        .setBizId(Checker.getIntegerValue(jsonObject.getIntValue("bizId")))
+                        .build());
+                logger.info("KafkaMsgListener#cloudfilmServicePayCallback#payCallback(topic={}, partition={}, value={}): 消费完成", record.topic(), record.partition(), record.value());
+            }
+        } catch (Exception e) {
+            logger.error("KafkaMsgListener#cloudfilmServicePayCallback#payCallback(topic={}, partition={}, value={}):\n {}", record.topic(), record.partition(), record.value(), e.getMessage(), e);
+        }
+        ack.acknowledge();
+    }
 }