This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2e865155e [INLONG-7026][Manager] Fixed the problem of not checked before creating the kafka topic (#7027) 2e865155e is described below commit 2e865155e236fdeb59cb8a8154c063e5d05b3a99 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Dec 22 20:22:08 2022 +0800 [INLONG-7026][Manager] Fixed the problem of not checked before creating the kafka topic (#7027) --- .../inlong/manager/service/resource/queue/kafka/KafkaOperator.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java index 6691b55d0..7e626ec10 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java @@ -49,6 +49,11 @@ public class KafkaOperator { NewTopic topic = new NewTopic(topicName, inlongKafkaInfo.getNumPartitions(), inlongKafkaInfo.getReplicationFactor()); + // Topic will be returned if it exists, and created if it does not exist + if (topicIsExists(kafkaClusterInfo, topicName)) { + LOGGER.warn("kafka topic={} already exists", topicName); + return; + } CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(topic)); // To prevent the client from disconnecting too quickly and causing the Topic to not be created successfully Thread.sleep(500);