This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e865155e [INLONG-7026][Manager] Fixed the problem of not checked 
before creating the kafka topic (#7027)
2e865155e is described below

commit 2e865155e236fdeb59cb8a8154c063e5d05b3a99
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Dec 22 20:22:08 2022 +0800

    [INLONG-7026][Manager] Fixed the problem of not checked before creating the 
kafka topic (#7027)
---
 .../inlong/manager/service/resource/queue/kafka/KafkaOperator.java   | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 6691b55d0..7e626ec10 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -49,6 +49,11 @@ public class KafkaOperator {
         NewTopic topic = new NewTopic(topicName,
                 inlongKafkaInfo.getNumPartitions(),
                 inlongKafkaInfo.getReplicationFactor());
+        // Topic will be returned if it exists, and created if it does not 
exist
+        if (topicIsExists(kafkaClusterInfo, topicName)) {
+            LOGGER.warn("kafka topic={} already exists", topicName);
+            return;
+        }
         CreateTopicsResult result = 
adminClient.createTopics(Collections.singletonList(topic));
         // To prevent the client from disconnecting too quickly and causing 
the Topic to not be created successfully
         Thread.sleep(500);

Reply via email to