This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab74bf549 [INLONG-7325][Manager] Format the topic name and data 
separator for Kafka (#7326)
ab74bf549 is described below

commit ab74bf549dcf4e1837a38e28979875f769473c04
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Tue Feb 7 15:54:34 2023 +0800

    [INLONG-7325][Manager] Format the topic name and data separator for Kafka 
(#7326)
---
 .../resource/queue/kafka/KafkaResourceOperators.java  |  9 ++++++++-
 .../service/source/kafka/KafkaSourceOperator.java     | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 1f7180d3b..f8d43f423 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -134,7 +135,13 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
         log.info("begin to delete kafka resource for groupId={} streamId={}", 
groupId, streamId);
 
         try {
-            this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
+            String topicName = streamInfo.getMqResource();
+            if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
+                // the default mq resource (stream id) is not sufficient to 
discriminate different kafka topics
+                topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+                        groupInfo.getMqResource(), streamInfo.getMqResource());
+            }
+            this.deleteKafkaTopic(groupInfo, topicName);
             log.info("success to delete kafka topic for groupId={}, 
streamId={}", groupId, streamId);
         } catch (Exception e) {
             String msg = String.format("failed to delete kafka topic for 
groupId=%s, streamId=%s", groupId, streamId);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 97cedf6a2..4338a588f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -113,6 +114,13 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
             kafkaSource.setBootstrapServers(bootstrapServers);
             kafkaSource.setTopic(streamInfo.getMqResource());
             String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+            String topicName = streamInfo.getMqResource();
+            if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
+                // the default mq resource (stream id) is not sufficient to 
discriminate different kafka topics
+                topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+                        groupInfo.getMqResource(), streamInfo.getMqResource());
+            }
+            kafkaSource.setTopic(topicName);
             kafkaSource.setSerializationType(serializationType);
             kafkaSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
@@ -126,6 +134,17 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
                 }
             }
 
+            // if the SerializationType is still null, set it to the CSV
+            if (StringUtils.isBlank(kafkaSource.getSerializationType())) {
+                kafkaSource.setSerializationType(DataTypeEnum.CSV.getType());
+            }
+            if 
(DataTypeEnum.CSV.getType().equalsIgnoreCase(kafkaSource.getSerializationType()))
 {
+                kafkaSource.setDataSeparator(streamInfo.getDataSeparator());
+                if (StringUtils.isBlank(kafkaSource.getDataSeparator())) {
+                    kafkaSource.setDataSeparator(String.valueOf((int) ','));
+                }
+            }
+
             
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
 
             kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());

Reply via email to