This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0064f3fc5b [INLONG-10655][Sort] Kafka and Pulsar Sink support parse 
stream separator (#10671)
0064f3fc5b is described below

commit 0064f3fc5bbaaba166330ca4d12abe372e7fe7d9
Author: vernedeng <verned...@apache.org>
AuthorDate: Thu Jul 18 21:57:48 2024 +0800

    [INLONG-10655][Sort] Kafka and Pulsar Sink support parse stream separator 
(#10671)
---
 .../metrics/SortConfigMetricReporter.java          | 54 ++++++++++++++++------
 .../sort/standalone/sink/kafka/KafkaIdConfig.java  |  8 ++++
 .../standalone/sink/pulsar/PulsarIdConfig.java     |  9 +++-
 3 files changed, 57 insertions(+), 14 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
index 001759d8ea..da730cef84 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
@@ -130,30 +130,58 @@ public class SortConfigMetricReporter {
         Collection<String> intersection = 
CollectionUtils.intersection(fromTaskConfig.keySet(),
                 fromSortTaskConfig.keySet());
         List<IdConfig> diff = intersection.stream()
-                .filter(k -> 
!fromTaskConfig.get(k).equals(fromSortTaskConfig.get(k)))
+                .filter(k -> {
+                    IdConfig fromTask = fromTaskConfig.get(k);
+                    IdConfig fromSortTask = fromSortTaskConfig.get(k);
+                    if (fromTask.equals(fromSortTask)) {
+                        return false;
+                    }
+                    log.warn("find different id config, fromTaskConfig={}, 
fromSortTaskConfig={}", fromTask,
+                            fromSortTask);
+                    return true;
+                })
                 .map(fromSortTaskConfig::get)
                 .collect(Collectors.toList());
+
         // report diff
         diff.forEach(idConfig -> {
             listeners.forEach(listener -> 
listener.reportClusterDiff(sortClusterName, sortTaskName,
                     idConfig.getInlongGroupId(), 
idConfig.getInlongStreamId()));
         });
+        log.warn("different id config size = {}", diff.size());
 
         // report miss in sort cluster config
-        fromTaskConfig.forEach((k, v) -> {
-            if (!intersection.contains(k)) {
-                listeners.forEach(listener -> 
listener.reportMissInSortClusterConfig(sortClusterName, sortTaskName,
-                        v.getInlongGroupId(), v.getInlongStreamId()));
-            }
-        });
+        List<String> missInSortClusterConfig = 
fromTaskConfig.entrySet().stream()
+                .filter(entry -> {
+                    String k = entry.getKey();
+                    IdConfig v = entry.getValue();
+                    if (!intersection.contains(k)) {
+                        listeners.forEach(listener -> 
listener.reportMissInSortClusterConfig(sortClusterName,
+                                sortTaskName, v.getInlongGroupId(), 
v.getInlongStreamId()));
+                        return true;
+                    }
+                    return false;
+                })
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toList());
 
         // report miss in sort config
-        fromSortTaskConfig.forEach((k, v) -> {
-            if (!intersection.contains(k)) {
-                listeners.forEach(listener -> 
listener.reportMissInSortConfig(sortClusterName, sortTaskName,
-                        v.getInlongGroupId(), v.getInlongStreamId()));
-            }
-        });
+        List<String> missInSortConfig = fromSortTaskConfig.entrySet().stream()
+                .filter(entry -> {
+                    String k = entry.getKey();
+                    IdConfig v = entry.getValue();
+                    if (!intersection.contains(k)) {
+                        listeners.forEach(listener -> 
listener.reportMissInSortConfig(sortClusterName,
+                                sortTaskName, v.getInlongGroupId(), 
v.getInlongStreamId()));
+                        return true;
+                    }
+                    return false;
+                })
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toList());
+
+        log.warn("report cluster diff, intersection={}, 
missInSortClusterConfig={}, missInSortConfig={}",
+                intersection, missInSortClusterConfig, missInSortConfig);
     }
 
     public static void reportSourceDiff(
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index ecbd8ad6f3..a1c01bdb73 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -60,6 +62,11 @@ public class KafkaIdConfig extends IdConfig {
 
     public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) {
         KafkaSinkConfig sinkConfig = (KafkaSinkConfig) 
dataFlowConfig.getSinkConfig();
+        DataTypeConfig dataTypeConfig = 
dataFlowConfig.getSourceConfig().getDataTypeConfig();
+        String separator = DEFAULT_SEPARATOR;
+        if (dataTypeConfig instanceof CsvConfig) {
+            separator = String.valueOf(((CsvConfig) 
dataTypeConfig).getDelimiter());
+        }
 
         return KafkaIdConfig.builder()
                 .inlongGroupId(dataFlowConfig.getInlongGroupId())
@@ -67,6 +74,7 @@ public class KafkaIdConfig extends IdConfig {
                 .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
                 .topic(sinkConfig.getTopicName())
                 .dataType(DataTypeEnum.TEXT)
+                .separator(separator)
                 .build();
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index c8fc0c33b2..7d4f30f752 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -62,13 +64,18 @@ public class PulsarIdConfig extends IdConfig {
 
     public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
         PulsarSinkConfig sinkConfig = (PulsarSinkConfig) 
dataFlowConfig.getSinkConfig();
-
+        DataTypeConfig dataTypeConfig = 
dataFlowConfig.getSourceConfig().getDataTypeConfig();
+        String separator = DEFAULT_SEPARATOR;
+        if (dataTypeConfig instanceof CsvConfig) {
+            separator = String.valueOf(((CsvConfig) 
dataTypeConfig).getDelimiter());
+        }
         return PulsarIdConfig.builder()
                 .inlongGroupId(dataFlowConfig.getInlongGroupId())
                 .inlongStreamId(dataFlowConfig.getInlongStreamId())
                 .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
                 .topic(sinkConfig.getTopic())
                 .dataType(DataTypeEnum.TEXT)
+                .separator(separator)
                 .build();
 
     }

Reply via email to