This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eec67bc72 [INLONG-10529][Sort] PulsarSink support switch metadata 
acquire mode (#10554)
8eec67bc72 is described below

commit 8eec67bc7218bd1a5303de9c6eee967363a4927d
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Jul 2 19:02:09 2024 +0800

    [INLONG-10529][Sort] PulsarSink support switch metadata acquire mode 
(#10554)
---
 .../sink/pulsar/PulsarFederationSinkContext.java   | 54 ++++++++++++++++------
 .../standalone/sink/pulsar/PulsarIdConfig.java     | 13 ++++++
 2 files changed, 53 insertions(+), 14 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 770f028da0..a31d9f90ba 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -20,8 +20,10 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
 import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
@@ -36,6 +38,7 @@ import org.slf4j.Logger;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -53,36 +56,59 @@ public class PulsarFederationSinkContext extends 
SinkContext {
 
     public void reload() {
         try {
-            TaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
-            if (newSortTaskConfig == null) {
+            TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            if (newTaskConfig == null && newSortTaskConfig == null) {
                 LOG.error("newSortTaskConfig is null.");
                 return;
             }
-            if (this.taskConfig != null && 
this.taskConfig.equals(newSortTaskConfig)) {
+            if ((this.taskConfig != null && 
this.taskConfig.equals(newTaskConfig))
+                    && (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig))) {
                 LOG.info("Same sortTaskConfig, do nothing.");
                 return;
             }
-            this.taskConfig = newSortTaskConfig;
 
-            PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) 
newSortTaskConfig.getNodeConfig();
+            PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) 
newTaskConfig.getNodeConfig();
             if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > 
pulsarNodeConfig.getVersion()) {
                 this.pulsarNodeConfig = requestNodeConfig;
             }
+            this.taskConfig = newTaskConfig;
+            this.sortTaskConfig = newSortTaskConfig;
 
-            this.idConfigMap = this.taskConfig.getClusterTagConfigs()
-                    .stream()
-                    .map(ClusterTagConfig::getDataFlowConfigs)
-                    .flatMap(Collection::stream)
-                    .map(PulsarIdConfig::create)
-                    .collect(Collectors.toMap(
-                            config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
-                            v -> v,
-                            (v1, v2) -> v1));
+            Map<String, PulsarIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
+            Map<String, PulsarIdConfig> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
+            idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
+    public Map<String, PulsarIdConfig> fromTaskConfig(TaskConfig taskConfig) {
+        return taskConfig.getClusterTagConfigs()
+                .stream()
+                .map(ClusterTagConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .map(PulsarIdConfig::create)
+                .collect(Collectors.toMap(
+                        config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+                        v -> v,
+                        (v1, v2) -> v1));
+    }
+
+    public Map<String, PulsarIdConfig> fromSortTaskConfig(SortTaskConfig 
sortTaskConfig) {
+        Map<String, PulsarIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+        List<Map<String, String>> idList = sortTaskConfig.getIdParams();
+        for (Map<String, String> idParam : idList) {
+            try {
+                PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
+                newIdConfigMap.put(idConfig.getUid(), idConfig);
+            } catch (Exception e) {
+                LOG.error("fail to parse pulsar id config", e);
+            }
+        }
+        return newIdConfigMap;
+    }
+
     public String getTopic(String uid) {
         PulsarIdConfig idConfig = this.idConfigMap.get(uid);
         if (idConfig == null) {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index cb6c651982..69b042506d 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -21,12 +21,15 @@ import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.Constants;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.Map;
+
 @Data
 @Builder
 @NoArgsConstructor
@@ -46,6 +49,16 @@ public class PulsarIdConfig {
     private String topic;
     private DataTypeEnum dataType = DataTypeEnum.TEXT;
 
+    public PulsarIdConfig(Map<String, String> idParam) {
+        this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+        this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID, 
DEFAULT_INLONG_STREAM);
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, 
PulsarIdConfig.DEFAULT_SEPARATOR);
+        this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
+        this.dataType = DataTypeEnum
+                .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, 
DataTypeEnum.TEXT.getType()));
+    }
+
     public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
         PulsarSinkConfig sinkConfig = (PulsarSinkConfig) 
dataFlowConfig.getSinkConfig();
 

Reply via email to