This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ab74bf549 [INLONG-7325][Manager] Format the topic name and data separator for Kafka (#7326) ab74bf549 is described below commit ab74bf549dcf4e1837a38e28979875f769473c04 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Tue Feb 7 15:54:34 2023 +0800 [INLONG-7325][Manager] Format the topic name and data separator for Kafka (#7326) --- .../resource/queue/kafka/KafkaResourceOperators.java | 9 ++++++++- .../service/source/kafka/KafkaSourceOperator.java | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java index 1f7180d3b..f8d43f423 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.resource.queue.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.enums.ClusterType; @@ -134,7 +135,13 @@ public class KafkaResourceOperators implements QueueResourceOperator { log.info("begin to delete kafka resource for groupId={} streamId={}", groupId, streamId); try { - this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource()); + String topicName = streamInfo.getMqResource(); + if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) { + // the default mq resource (stream id) is not sufficient to discriminate different kafka topics + topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, + groupInfo.getMqResource(), streamInfo.getMqResource()); + } + this.deleteKafkaTopic(groupInfo, topicName); log.info("success to delete kafka topic for groupId={}, streamId={}", groupId, streamId); } catch (Exception e) { String msg = String.format("failed to delete kafka topic for groupId=%s, streamId=%s", groupId, streamId); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java index 97cedf6a2..4338a588f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; @@ -113,6 +114,13 @@ public class KafkaSourceOperator extends AbstractSourceOperator { kafkaSource.setBootstrapServers(bootstrapServers); kafkaSource.setTopic(streamInfo.getMqResource()); String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType(); + String topicName = streamInfo.getMqResource(); + if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) { + // the default mq resource (stream id) is not sufficient to discriminate different kafka topics + topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, + groupInfo.getMqResource(), streamInfo.getMqResource()); + } + kafkaSource.setTopic(topicName); kafkaSource.setSerializationType(serializationType); kafkaSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); @@ -126,6 +134,17 @@ public class KafkaSourceOperator extends AbstractSourceOperator { } } + // if the SerializationType is still null, set it to the CSV + if (StringUtils.isBlank(kafkaSource.getSerializationType())) { + kafkaSource.setSerializationType(DataTypeEnum.CSV.getType()); + } + if (DataTypeEnum.CSV.getType().equalsIgnoreCase(kafkaSource.getSerializationType())) { + kafkaSource.setDataSeparator(streamInfo.getDataSeparator()); + if (StringUtils.isBlank(kafkaSource.getDataSeparator())) { + kafkaSource.setDataSeparator(String.valueOf((int) ',')); + } + } + kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg()); kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());