|
@@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.kafka.support.SendResult;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
import org.springframework.util.concurrent.FailureCallback;
|
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
@@ -54,6 +55,9 @@ public class KafkaAnalyzeProducer {
|
|
|
* @param message producer发送的数据
|
|
|
*/
|
|
|
public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
+ if(StringUtils.hasText(topic)&&topic.contains("/")){
|
|
|
+ topic = topic.replace("/","_");
|
|
|
+ }
|
|
|
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
|
|
|
}
|
|
|
|
|
@@ -64,15 +68,19 @@ public class KafkaAnalyzeProducer {
|
|
|
* @param message producer发送的数据
|
|
|
*/
|
|
|
public void sendSynchronize(String topic, String message) {
|
|
|
+ if(StringUtils.hasText(topic)&&topic.contains("/")){
|
|
|
+ topic = topic.replace("/","_");
|
|
|
+ }
|
|
|
+ String finalTopic = topic;
|
|
|
kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<Object>() {
|
|
|
@Override
|
|
|
public void onFailure(@NotNull Throwable throwable) {
|
|
|
- log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
|
|
|
+ log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", finalTopic, message, throwable);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void onSuccess(Object o) {
|
|
|
- log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
|
|
|
+ log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", finalTopic, message);
|
|
|
}
|
|
|
});
|
|
|
}
|