This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d2c510ee5 [INLONG-6802][DataProxy][Manager] Adapt Apache Kafka as 
cache cluster (#6803)
d2c510ee5 is described below

commit d2c510ee507a85352a07b69a6c78a1709852fd84
Author: woofyzhao <490467...@qq.com>
AuthorDate: Fri Dec 9 16:59:34 2022 +0800

    [INLONG-6802][DataProxy][Manager] Adapt Apache Kafka as cache cluster 
(#6803)
---
 .../apache/inlong/common/constant/Constants.java    |  3 +++
 inlong-dataproxy/bin/dataproxy-start.sh             |  7 ++++++-
 .../conf/{dataproxy-pulsar.conf => dataproxy.conf}  |  0
 .../inlong/dataproxy/sink/common/SinkContext.java   |  1 +
 .../dataproxy/sink/mq/MessageQueueZoneProducer.java |  4 +---
 .../dataproxy/sink/mq/kafka/KafkaHandler.java       | 21 +++++++++++++++++++--
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java     |  2 ++
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java  |  1 +
 .../dataproxy/source/ServerMessageHandler.java      |  2 +-
 .../inlong/manager/pojo/cluster/ClusterRequest.java |  4 ++--
 .../manager/pojo/cluster/kafka/KafkaClusterDTO.java |  9 +++++++++
 .../pojo/cluster/pulsar/PulsarClusterDTO.java       |  5 +++++
 .../manager/pojo/cluster/tubemq/TubeClusterDTO.java |  3 +++
 .../service/cluster/InlongClusterServiceImpl.java   | 18 +++++++++++++++++-
 .../service/cluster/KafkaClusterOperator.java       |  1 +
 .../service/cluster/PulsarClusterOperator.java      |  1 +
 .../queue/kafka/KafkaResourceOperators.java         |  9 ++++++++-
 17 files changed, 80 insertions(+), 11 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
index 0c2714ad0..983ac3988 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
@@ -26,4 +26,7 @@ public class Constants {
 
     public static final int RESULT_FAIL = 1;
 
+    // default kafka topic is {groupId}.{streamId}
+    public static final String DEFAULT_KAFKA_TOPIC_FORMAT = "%s.%s";
+
 }
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh 
b/inlong-dataproxy/bin/dataproxy-start.sh
index a0de3894c..9bc2cfc30 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -47,6 +47,10 @@ if [ -n "$1" ]; then
 fi
 
 CONFIG_FILE="dataproxy-${MQ_TYPE}.conf"
+if [ "${MQ_TYPE}" == "pulsar" ] || [ "${MQ_TYPE}" == "kafka" ]; then
+  CONFIG_FILE="dataproxy.conf"
+fi
+
 CONFIG_FILE_WITH_COFING_PATH="conf/${CONFIG_FILE}"
 CONFIG_FILE_WITH_PATH="${basedir}/${CONFIG_FILE}"
 
@@ -54,4 +58,5 @@ if [ -f "$CONFIG_FILE_WITH_PATH" ]; then
   nohup bash +x bin/dataproxy-ng agent --conf conf/ -f 
"${CONFIG_FILE_WITH_COFING_PATH}" -n agent1 --no-reload-conf  > /dev/null 2>&1 &
 else
    error "${CONFIG_FILE_WITH_PATH} is not exist! start failed!" 1
-fi
\ No newline at end of file
+fi
+
diff --git a/inlong-dataproxy/conf/dataproxy-pulsar.conf 
b/inlong-dataproxy/conf/dataproxy.conf
similarity index 100%
rename from inlong-dataproxy/conf/dataproxy-pulsar.conf
rename to inlong-dataproxy/conf/dataproxy.conf
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 11298296e..a28493bad 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -221,6 +221,7 @@ public class SinkContext {
     public MessageQueueHandler createMessageQueueHandler(CacheClusterConfig 
config) {
         String strHandlerClass = 
config.getParams().getOrDefault(KEY_MESSAGE_QUEUE_HANDLER,
                 PulsarHandler.class.getName());
+        LOG.info("mq handler class = {}", strHandlerClass);
         try {
             Class<?> handlerClass = ClassUtils.getClass(strHandlerClass);
             Object handlerObject = 
handlerClass.getDeclaredConstructor().newInstance();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index 5c221f892..e8bf16a8e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -108,9 +108,7 @@ public class MessageQueueZoneProducer {
     public void reload() {
         try {
             // stop deleted cluster
-            deletingClusterList.forEach(item -> {
-                item.stop();
-            });
+            deletingClusterList.forEach(MessageQueueClusterProducer::stop);
             deletingClusterList.clear();
             // update cluster list
             List<CacheClusterConfig> configList = 
this.context.getCacheHolder().getConfigList();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index 7d030393d..c0cde55d8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.dataproxy.sink.mq.kafka;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
+import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.sink.common.EventHandler;
@@ -45,6 +47,7 @@ import java.util.Properties;
 public class KafkaHandler implements MessageQueueHandler {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaHandler.class);
+    public static final String KEY_NAMESPACE = "namespace";
 
     private CacheClusterConfig config;
     private MessageQueueZoneSinkContext sinkContext;
@@ -92,6 +95,7 @@ public class KafkaHandler implements MessageQueueHandler {
     public void stop() {
         // kafka producer
         this.producer.close();
+        LOG.info("kafka handler stopped");
     }
 
     /**
@@ -109,12 +113,14 @@ public class KafkaHandler implements MessageQueueHandler {
                 sinkContext.getDispatchQueue().release(event.getSize());
                 return false;
             }
-            String topic = idConfig.getTopicName();
-            if (topic == null) {
+            String baseTopic = idConfig.getTopicName();
+            if (baseTopic == null) {
                 sinkContext.addSendResultMetric(event, event.getUid(), false, 
0);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 return false;
             }
+            String topic = getProducerTopic(baseTopic, idConfig);
+
             // metric
             sinkContext.addSendMetric(event, topic);
             // create producer failed
@@ -138,6 +144,17 @@ public class KafkaHandler implements MessageQueueHandler {
         }
     }
 
+    /**
+     * getProducerTopic
+     */
+    private String getProducerTopic(String baseTopic, IdTopicConfig config) {
+        String namespace = config.getParams().get(KEY_NAMESPACE);
+        if (StringUtils.isNotEmpty(namespace)) {
+            return String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, 
namespace, baseTopic);
+        }
+        return baseTopic;
+    }
+
     /**
      * sendProfileV1
      */
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 49eccce4f..f34e195ab 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -150,6 +150,7 @@ public class PulsarHandler implements MessageQueueHandler {
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
+        LOG.info("pulsar handler started");
     }
 
     /**
@@ -169,6 +170,7 @@ public class PulsarHandler implements MessageQueueHandler {
         } catch (PulsarClientException e) {
             LOG.error(e.getMessage(), e);
         }
+        LOG.info("pulsar handler stopped");
     }
 
     /**
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 1b1471dbb..225c488c8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -147,6 +147,7 @@ public class TubeHandler implements MessageQueueHandler {
                 LOG.error(e.getMessage(), e);
             }
         }
+        LOG.info("tube handler stopped");
     }
 
     /**
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 31c0b4ee8..920268af1 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -437,7 +437,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 } else {
                     
commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
                             
DataProxyErrCode.UNCONFIGURED_GROUPID_OR_STREAMID.getErrCodeStr());
-                    logger.debug("Topic for message is null , inlongGroupId = 
{}, inlongStreamId = {}",
+                    logger.error("Topic for message is null , inlongGroupId = 
{}, inlongStreamId = {}",
                             groupId, streamId);
                     return false;
                 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
index 37cf3378f..90f07a315 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
@@ -47,7 +47,7 @@ public abstract class ClusterRequest {
     private String name;
 
     @NotBlank(message = "cluster type cannot be blank")
-    @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, 
DATAPROXY, etc.")
+    @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, KAFKA, 
DATAPROXY, etc.")
     private String type;
 
     @ApiModelProperty(value = "Cluster url")
@@ -58,7 +58,7 @@ public abstract class ClusterRequest {
     private String clusterTags;
 
     @ApiModelProperty(value = "Extension tag")
-    private String extTag;
+    private String extTag = "default=true";
 
     @ApiModelProperty(value = "Cluster token")
     private String token;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
index 620385a1d..9f5e25ca4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.pojo.cluster.kafka;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -33,9 +35,16 @@ import javax.validation.constraints.NotNull;
 @Data
 @Builder
 @NoArgsConstructor
+@AllArgsConstructor
 @ApiModel("Kafka cluster info")
 public class KafkaClusterDTO {
 
+    @Builder.Default
+    private String messageQueueHandler = 
"org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler";
+
+    @JsonProperty("bootstrap.servers")
+    private String bootstrapServers;
+
     /**
      * Get the dto instance from the request
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
index d1c93a629..d80d3da2b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -46,6 +46,11 @@ public class PulsarClusterDTO {
     @Builder.Default
     private String tenant = "public";
 
+    @Builder.Default
+    private String messageQueueHandler = 
"org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler";
+
+    private String serviceUrl;
+
     /**
      * Get the dto instance from the request
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
index 811f850df..3006e8302 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
@@ -45,6 +45,9 @@ public class TubeClusterDTO {
     @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080";, notes = 
"TubeMQ master RPC URL is the 'url' field of the cluster")
     private String masterWebUrl;
 
+    @Builder.Default
+    private String messageQueueHandler = 
"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler";
+
     /**
      * Get the dto instance from the JSON string.
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 5cafede6f..aa6aeb9e7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
@@ -843,13 +844,28 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
                 topicConfig.setInlongGroupId(groupId);
                 topicConfig.setTopic(mqResource);
                 topicList.add(topicConfig);
+            } else if (MQType.KAFKA.equals(mqType)) {
+                List<InlongStreamBriefInfo> streamList = 
streamMapper.selectBriefList(groupId);
+                for (InlongStreamBriefInfo streamInfo : streamList) {
+                    String streamId = streamInfo.getInlongStreamId();
+                    String topic = streamInfo.getMqResource();
+                    if (topic.equals(streamId)) {
+                        // the default mq resource (stream id) is not 
sufficient to discriminate different kafka topics
+                        topic = 
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+                                mqResource, streamInfo.getMqResource());
+                    }
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId + "/" + streamId);
+                    topicConfig.setTopic(topic);
+                    topicList.add(topicConfig);
+                }
             }
         }
 
         // get mq cluster info
         LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", 
clusterTagList);
         List<MQClusterInfo> mqSet = new ArrayList<>();
-        List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, 
ClusterType.PULSAR);
+        List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, 
ClusterType.PULSAR, ClusterType.KAFKA);
         ClusterPageRequest pageRequest = ClusterPageRequest.builder()
                 .typeList(typeList)
                 .clusterTagList(clusterTagList)
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 4578f9e39..8c4eb37b7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -78,6 +78,7 @@ public class KafkaClusterOperator extends 
AbstractClusterOperator {
         CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
         try {
             KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest);
+            dto.setBootstrapServers(kafkaRequest.getUrl());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
             LOGGER.info("success to set entity for kafka cluster");
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index c9ef2d562..3f7770d40 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -78,6 +78,7 @@ public class PulsarClusterOperator extends 
AbstractClusterOperator {
         CommonBeanUtils.copyProperties(pulsarRequest, targetEntity, true);
         try {
             PulsarClusterDTO dto = 
PulsarClusterDTO.getFromRequest(pulsarRequest);
+            dto.setServiceUrl(request.getUrl());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
             LOGGER.info("success to set entity for pulsar cluster");
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 058694a05..21b0c5bdc 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -107,7 +108,13 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
         try {
             InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
             // create kafka topic
-            this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
+            String topicName = streamInfo.getMqResource();
+            if (topicName.equals(streamId)) {
+                // the default mq resource (stream id) is not sufficient to 
discriminate different kafka topics
+                topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+                        inlongKafkaInfo.getMqResource(), 
streamInfo.getMqResource());
+            }
+            this.createKafkaTopic(inlongKafkaInfo, topicName);
         } catch (Exception e) {
             String msg = String.format("failed to create kafka topic for 
groupId=%s, streamId=%s", groupId, streamId);
             log.error(msg, e);

Reply via email to