This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0064f3fc5b [INLONG-10655][Sort] Kafka and Pulsar Sink support parse stream separator (#10671) 0064f3fc5b is described below commit 0064f3fc5bbaaba166330ca4d12abe372e7fe7d9 Author: vernedeng <verned...@apache.org> AuthorDate: Thu Jul 18 21:57:48 2024 +0800 [INLONG-10655][Sort] Kafka and Pulsar Sink support parse stream separator (#10671) --- .../metrics/SortConfigMetricReporter.java | 54 ++++++++++++++++------ .../sort/standalone/sink/kafka/KafkaIdConfig.java | 8 ++++ .../standalone/sink/pulsar/PulsarIdConfig.java | 9 +++- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java index 001759d8ea..da730cef84 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java @@ -130,30 +130,58 @@ public class SortConfigMetricReporter { Collection<String> intersection = CollectionUtils.intersection(fromTaskConfig.keySet(), fromSortTaskConfig.keySet()); List<IdConfig> diff = intersection.stream() - .filter(k -> !fromTaskConfig.get(k).equals(fromSortTaskConfig.get(k))) + .filter(k -> { + IdConfig fromTask = fromTaskConfig.get(k); + IdConfig fromSortTask = fromSortTaskConfig.get(k); + if (fromTask.equals(fromSortTask)) { + return false; + } + log.warn("find different id config, fromTaskConfig={}, fromSortTaskConfig={}", fromTask, + fromSortTask); + return true; + }) .map(fromSortTaskConfig::get) .collect(Collectors.toList()); + // report diff diff.forEach(idConfig -> { listeners.forEach(listener -> listener.reportClusterDiff(sortClusterName, sortTaskName, idConfig.getInlongGroupId(), idConfig.getInlongStreamId())); }); + log.warn("different id config size = {}", diff.size()); // report miss in sort cluster config - fromTaskConfig.forEach((k, v) -> { - if (!intersection.contains(k)) { - listeners.forEach(listener -> listener.reportMissInSortClusterConfig(sortClusterName, sortTaskName, - v.getInlongGroupId(), v.getInlongStreamId())); - } - }); + List<String> missInSortClusterConfig = fromTaskConfig.entrySet().stream() + .filter(entry -> { + String k = entry.getKey(); + IdConfig v = entry.getValue(); + if (!intersection.contains(k)) { + listeners.forEach(listener -> listener.reportMissInSortClusterConfig(sortClusterName, + sortTaskName, v.getInlongGroupId(), v.getInlongStreamId())); + return true; + } + return false; + }) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); // report miss in sort config - fromSortTaskConfig.forEach((k, v) -> { - if (!intersection.contains(k)) { - listeners.forEach(listener -> listener.reportMissInSortConfig(sortClusterName, sortTaskName, - v.getInlongGroupId(), v.getInlongStreamId())); - } - }); + List<String> missInSortConfig = fromSortTaskConfig.entrySet().stream() + .filter(entry -> { + String k = entry.getKey(); + IdConfig v = entry.getValue(); + if (!intersection.contains(k)) { + listeners.forEach(listener -> listener.reportMissInSortConfig(sortClusterName, + sortTaskName, v.getInlongGroupId(), v.getInlongStreamId())); + return true; + } + return false; + }) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + log.warn("report cluster diff, intersection={}, missInSortClusterConfig={}, missInSortConfig={}", + intersection, missInSortClusterConfig, missInSortConfig); } public static void reportSourceDiff( diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java index ecbd8ad6f3..a1c01bdb73 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java @@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.kafka; import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig; +import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig; import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig; import org.apache.inlong.sort.standalone.config.pojo.IdConfig; import org.apache.inlong.sort.standalone.config.pojo.InlongId; @@ -60,6 +62,11 @@ public class KafkaIdConfig extends IdConfig { public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) { KafkaSinkConfig sinkConfig = (KafkaSinkConfig) dataFlowConfig.getSinkConfig(); + DataTypeConfig dataTypeConfig = dataFlowConfig.getSourceConfig().getDataTypeConfig(); + String separator = DEFAULT_SEPARATOR; + if (dataTypeConfig instanceof CsvConfig) { + separator = String.valueOf(((CsvConfig) dataTypeConfig).getDelimiter()); + } return KafkaIdConfig.builder() .inlongGroupId(dataFlowConfig.getInlongGroupId()) @@ -67,6 +74,7 @@ public class KafkaIdConfig extends IdConfig { .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), dataFlowConfig.getInlongStreamId())) .topic(sinkConfig.getTopicName()) .dataType(DataTypeEnum.TEXT) + .separator(separator) .build(); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java index c8fc0c33b2..7d4f30f752 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java @@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar; import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig; +import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig; import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig; import org.apache.inlong.sort.standalone.config.pojo.IdConfig; import org.apache.inlong.sort.standalone.config.pojo.InlongId; @@ -62,13 +64,18 @@ public class PulsarIdConfig extends IdConfig { public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) { PulsarSinkConfig sinkConfig = (PulsarSinkConfig) dataFlowConfig.getSinkConfig(); - + DataTypeConfig dataTypeConfig = dataFlowConfig.getSourceConfig().getDataTypeConfig(); + String separator = DEFAULT_SEPARATOR; + if (dataTypeConfig instanceof CsvConfig) { + separator = String.valueOf(((CsvConfig) dataTypeConfig).getDelimiter()); + } return PulsarIdConfig.builder() .inlongGroupId(dataFlowConfig.getInlongGroupId()) .inlongStreamId(dataFlowConfig.getInlongStreamId()) .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), dataFlowConfig.getInlongStreamId())) .topic(sinkConfig.getTopic()) .dataType(DataTypeEnum.TEXT) + .separator(separator) .build(); }