|
@@ -56,7 +56,7 @@ public class KafkaAnalyzeProducer {
|
|
|
*/
|
|
|
public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
if(StringUtils.hasText(topic)&&topic.contains("/")){
|
|
|
- topic = topic.replace("/","_");
|
|
|
+ topic = topic.replaceAll("/","_");
|
|
|
}
|
|
|
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
|
|
|
}
|
|
@@ -69,7 +69,7 @@ public class KafkaAnalyzeProducer {
|
|
|
*/
|
|
|
public void sendSynchronize(String topic, String message) {
|
|
|
if(StringUtils.hasText(topic)&&topic.contains("/")){
|
|
|
- topic = topic.replace("/","_");
|
|
|
+ topic = topic.replaceAll("/","_");
|
|
|
}
|
|
|
String finalTopic = topic;
|
|
|
kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<Object>() {
|