Explorar o código

UPDATE-添加kafka

liyang hai 1 ano
pai
achega
329b986293

+ 5 - 0
.idea/jarRepositories.xml

@@ -7,6 +7,11 @@
       <option name="url" value="https://repo.maven.apache.org/maven2" />
     </remote-repository>
     <remote-repository>
+      <option name="id" value="nexus" />
+      <option name="name" value="Team Nexus Repository" />
+      <option name="url" value="http://nexus.ywtinfo.com/repository/maven-public/" />
+    </remote-repository>
+    <remote-repository>
       <option name="id" value="central" />
       <option name="name" value="Maven Central repository" />
       <option name="url" value="https://repo1.maven.org/maven2" />

+ 0 - 1
.idea/misc.xml

@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="ExternalStorageConfigurationManager" enabled="true" />
   <component name="MavenProjectsManager">

+ 6 - 7
pom.xml

@@ -74,11 +74,6 @@
 		</dependency>
 
 		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-		</dependency>
-
-		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 		</dependency>
@@ -156,9 +151,13 @@
 			<artifactId>pinyin4j</artifactId>
 			<version>2.5.1</version>
 		</dependency>
+<!--		<dependency>-->
+<!--			<groupId>com.google.guava</groupId>-->
+<!--			<artifactId>guava</artifactId>-->
+<!--		</dependency>-->
 		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka</artifactId>
 		</dependency>
 
 	</dependencies>

+ 1 - 1
src/main/java/com/ywt/biz/common/config/db/YwtLog.java

@@ -49,7 +49,7 @@ public class YwtLog {
                 .properties(new HibernateProperties()
                         .determineHibernateProperties(
                                 jpaProperties.getProperties(), new HibernateSettings()))
-                .packages("com.ywt.rpc.domain.entities_log") //设置实体类所在位置
+                .packages("com.ywt.domain.entity.log") //设置实体类所在位置
                 .persistenceUnit("ywtLogPersistenceUnit")
                 .build();
     }

+ 1 - 1
src/main/java/com/ywt/biz/common/config/db/Ywtcenter.java

@@ -50,7 +50,7 @@ public class Ywtcenter {
                 .properties(new HibernateProperties()
                         .determineHibernateProperties(
                                 jpaProperties.getProperties(), new HibernateSettings()))
-                .packages("com.ywt.rpc.domain.entities") //设置实体类所在位置
+                .packages("com.ywt.domain.entity.center") //设置实体类所在位置
                 .persistenceUnit("ywtcenterPersistenceUnit")
                 .build();
     }

+ 79 - 0
src/main/java/com/ywt/biz/common/config/kafka/KafkaAnalyzeProducer.java

@@ -0,0 +1,79 @@
+package com.ywt.biz.common.config.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.FailureCallback;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+import org.springframework.util.concurrent.SuccessCallback;
+
+import javax.validation.constraints.NotNull;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author liyang
+ * @version 1.0
+ * @description: 生产者封装类
+ * @date 2022/9/29 8:50
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(value = "spring.kafka.producer.enable",havingValue = "true")
+public class KafkaAnalyzeProducer {
+    @Autowired
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    /**
+     * 异步发送数据到kafka 自定义回调
+     *
+     * @param topic        topic名称
+     * @param message      发送信息字符串
+     * @param sendCallBack 发送回调
+     */
+    public void send(String topic, String message, SendCallBack sendCallBack) {
+
+        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, message);
+        //发送成功后回调
+        SuccessCallback<Object> successCallback = result -> sendCallBack.sendSuccessCallBack(topic, message);
+        //发送失败回调
+        FailureCallback failureCallback = ex -> sendCallBack.sendFailCallBack(topic, message, ex);
+        listenableFuture.addCallback(successCallback, failureCallback);
+    }
+
+
+    /**
+     * producer 同步方式发送数据 10S 超时
+     *
+     * @param topic   topic名称
+     * @param message producer发送的数据
+     */
+    public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
+        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * producer 异步方式发送数据 通用回调
+     *
+     * @param topic   topic名称
+     * @param message producer发送的数据
+     */
+    public void sendSynchronize(String topic, String message) {
+        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<Object>() {
+            @Override
+            public void onFailure(@NotNull Throwable throwable) {
+                log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
+            }
+
+            @Override
+            public void onSuccess(Object o) {
+                log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
+            }
+        });
+    }
+}

+ 21 - 0
src/main/java/com/ywt/biz/common/config/kafka/SendCallBack.java

@@ -0,0 +1,21 @@
+package com.ywt.biz.common.config.kafka;
+
+/**
+ * 发送回调处理
+ */
+public interface SendCallBack {
+    /**
+     * 生产成功回调
+     * @param topic topic
+     * @param msg 信息字符串
+     */
+    void sendSuccessCallBack(String topic,String msg);
+
+    /**
+     * 生产失败回调
+     * @param topic topic
+     * @param msg 信息字符串
+     * @param ex 异常
+     */
+    void sendFailCallBack(String topic,String msg,Throwable ex) ;
+}