|
@@ -0,0 +1,72 @@
|
|
|
+package com.ywt.outpatient.rpc.mq;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.ywt.biz.common.util.Checker;
|
|
|
+import com.ywt.gapi.nat.PayCallbackRequest;
|
|
|
+import com.ywt.gapi.nat.RefundNatOrderCallbackRequest;
|
|
|
+import com.ywt.outpatient.rpc.provider.NatProvider;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.kafka.support.Acknowledgment;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Walker
|
|
|
+ * Created on 2024/1/15
|
|
|
+ */
|
|
|
+
|
|
|
+@Component
|
|
|
+@ConditionalOnProperty(value = "spring.kafka.consumer.enable", havingValue = "true")
|
|
|
+public class KafkaMsgListener {
|
|
|
+ private final Logger logger = LoggerFactory.getLogger(KafkaMsgListener.class);
|
|
|
+
|
|
|
+ public static final String GROUP_NAME = "outpatient-rpc";
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private NatProvider natProvider;
|
|
|
+
|
|
|
+
|
|
|
+ @KafkaListener(topics = "/com.ywt.gapi.nat.NatService/refundNatOrderCallback", groupId = GROUP_NAME)
|
|
|
+ public void refundNatOrderCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
+ logger.info("KafkaMsgListener#refundNatOrderCallback(topic={}, partition={}, value={}): 开始消费", record.topic(), record.partition(), record.value());
|
|
|
+ try {
|
|
|
+ boolean isValid = JSON.isValid(record.value());
|
|
|
+ if (isValid) {
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(record.value());
|
|
|
+ natProvider.refundNatOrderCallback(RefundNatOrderCallbackRequest.newBuilder()
|
|
|
+ .setRefundNo(Checker.getStringValue(jsonObject.getString("refundNo")))
|
|
|
+ .setRefundStatus(Checker.getIntegerValue(jsonObject.getIntValue("refundStatus")))
|
|
|
+ .build());
|
|
|
+ logger.info("KafkaMsgListener#refundNatOrderCallback(topic={}, partition={}, value={}): 消费完成", record.topic(), record.partition(), record.value());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("KafkaMsgListener#refundNatOrderCallback(topic={}, partition={}, value={}):\n {}", record.topic(), record.partition(), record.value(), e.getMessage(), e);
|
|
|
+ }
|
|
|
+ ack.acknowledge();
|
|
|
+ }
|
|
|
+
|
|
|
+ @KafkaListener(topics = "/com.ywt.gapi.nat.NatService/payCallback", groupId = GROUP_NAME)
|
|
|
+ public void payCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
+ logger.info("KafkaMsgListener#payCallback(topic={}, partition={}, value={}): 开始消费", record.topic(), record.partition(), record.value());
|
|
|
+ try {
|
|
|
+ boolean isValid = JSON.isValid(record.value());
|
|
|
+ if (isValid) {
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(record.value());
|
|
|
+ natProvider.payCallback(PayCallbackRequest.newBuilder()
|
|
|
+ .setPaymentNo(Checker.getStringValue(jsonObject.getString("paymentNo")))
|
|
|
+ .setPaymentChannel(Checker.getIntegerValue(jsonObject.getIntValue("paymentChannel")))
|
|
|
+ .setBizId(Checker.getIntegerValue(jsonObject.getIntValue("bizId")))
|
|
|
+ .build());
|
|
|
+ logger.info("KafkaMsgListener#payCallback(topic={}, partition={}, value={}): 消费完成", record.topic(), record.partition(), record.value());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("KafkaMsgListener#payCallback(topic={}, partition={}, value={}):\n {}", record.topic(), record.partition(), record.value(), e.getMessage(), e);
|
|
|
+ }
|
|
|
+ ack.acknowledge();
|
|
|
+ }
|
|
|
+}
|