This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d77048497a [INLONG-10213][SDK] SortSDK support unified sort 
configuration (#10219)
d77048497a is described below

commit d77048497a66a07bba7c7dd7d2b56450973bd841
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed May 15 16:40:17 2024 +0800

    [INLONG-10213][SDK] SortSDK support unified sort configuration (#10219)
    
    * [INLONG-10213][SDK] SortSDK support unified sort configuration
---
 .../common/pojo/sort/dataflow/DataFlowConfig.java  |  1 +
 .../common/pojo/sort/mq/PulsarClusterConfig.java   |  3 +
 .../listener/queue/ClusterConfigListener.java      |  1 +
 .../resource/sort/DefaultSortConfigOperator.java   |  1 +
 .../config/holder/v2/SortConfigHolder.java         | 37 ++++++++-
 .../loader/SortConfigQueryConsumeConfig.java       | 90 ++++++++++++++++++++++
 6 files changed, 130 insertions(+), 3 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
index a7bb1c36a3..a089bdc3f5 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
@@ -35,6 +35,7 @@ public class DataFlowConfig implements Serializable {
 
     private String dataflowId;
     private Integer version;
+    private String auditTag;
     private String inlongGroupId;
     private String inlongStreamId;
     private SourceConfig sourceConfig;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
index 1bb503b7a4..071c93f9c1 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
@@ -30,4 +30,7 @@ public class PulsarClusterConfig extends MqClusterConfig {
 
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private String serviceUrl;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String token;
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
index 3a09c4ecea..8c0aa037ae 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
@@ -84,6 +84,7 @@ public class ClusterConfigListener implements 
ClusterOperateListener {
                             CommonBeanUtils.copyProperties(pulsarCluster, 
PulsarClusterConfig::new);
                     
pulsarClusterConfig.setVersion(String.valueOf(pulsarCluster.getVersion()));
                     
pulsarClusterConfig.setClusterName(pulsarCluster.getName());
+                    pulsarClusterConfig.setServiceUrl(pulsarCluster.getUrl());
                     list.add(pulsarClusterConfig);
                 }
                 
clusterConfigEntity.setConfigParams(objectMapper.writeValueAsString(list));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 5c51c1abcf..670201c357 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -160,6 +160,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
         return DataFlowConfig.builder()
                 .dataflowId(String.valueOf(sink.getId()))
                 .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
+                .auditTag(String.valueOf(sink.getId()))
                 .sinkConfig(getSinkConfig(sink))
                 .inlongGroupId(groupInfo.getInlongGroupId())
                 .inlongStreamId(streamInfo.getInlongStreamId())
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index ef9106cc97..eb2bf9bf92 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -17,22 +17,29 @@
 
 package org.apache.inlong.sort.standalone.config.holder.v2;
 
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
 import org.apache.inlong.common.pojo.sort.SortConfig;
 import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
 import 
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
 import 
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
 import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 
+import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
 
@@ -45,9 +52,10 @@ public class SortConfigHolder {
     private Timer reloadTimer;
     private SortConfigLoader loader;
     private SortConfig config;
+    private Map<String, Map<String, String>> auditTagMap;
 
     private SortConfigHolder() {
-
+        this.auditTagMap = new HashMap<>();
     }
 
     private static SortConfigHolder get() {
@@ -110,9 +118,21 @@ public class SortConfigHolder {
     private void reload() {
         try {
             SortConfig newConfig = this.loader.load();
-            if (newConfig != null) {
-                this.config = newConfig;
+            if (newConfig == null) {
+                return;
             }
+
+            // <SortTaskName, <InlongId, AuditTag>>
+            this.auditTagMap = newConfig.getTasks()
+                    .stream()
+                    .collect(Collectors.toMap(SortTaskConfig::getSortTaskName,
+                            v -> v.getClusters()
+                                    .stream()
+                                    .map(SortClusterConfig::getDataFlowConfigs)
+                                    .flatMap(Collection::stream)
+                                    .collect(Collectors.toMap(flow -> 
InlongId.generateUid(flow.getInlongGroupId(),
+                                            flow.getInlongStreamId()),
+                                            DataFlowConfig::getAuditTag))));
         } catch (Throwable e) {
             log.error("failed to reload sort config", e);
         }
@@ -133,4 +153,15 @@ public class SortConfigHolder {
         }
         return null;
     }
+
+    public static String getAuditTag(
+            String sortTaskName,
+            String inlongGroupId,
+            String inlongStreamId) {
+        Map<String, String> taskMap = get().auditTagMap.get(sortTaskName);
+        if (taskMap == null) {
+            return null;
+        }
+        return taskMap.get(InlongId.generateUid(inlongGroupId, 
inlongStreamId));
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
new file mode 100644
index 0000000000..3f893041a6
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
+import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SortConfigQueryConsumeConfig implements QueryConsumeConfig {
+
+    private List<InLongTopic> subscribedTopic = new ArrayList<>();
+
+    public void reload() {
+
+    }
+
+    @Override
+    public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
+        SortTaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
+        List<InLongTopic> topics = taskConfig.getClusters()
+                .stream()
+                .map(this::parseTopics)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+        return new ConsumeConfig(topics);
+    }
+
+    public List<InLongTopic> parseTopics(SortClusterConfig clusterConfig) {
+        List<InLongTopic> topics = new ArrayList<>();
+        List<MqClusterConfig> mqClusterConfigs = 
clusterConfig.getMqClusterConfigs();
+        List<DataFlowConfig> dataFlowConfigs = 
clusterConfig.getDataFlowConfigs();
+        for (MqClusterConfig mq : mqClusterConfigs) {
+            for (DataFlowConfig flow : dataFlowConfigs) {
+                InLongTopic topic = new InLongTopic();
+                topic.setInLongCluster(this.parseCacheZone(mq));
+                topic.setTopic(flow.getSourceConfig().getTopic());
+                // only supports pulsar now
+                topic.setTopicType(InlongTopicTypeEnum.PULSAR.getName());
+                topic.setProperties(flow.getProperties() != null ? 
flow.getProperties() : new HashMap<>());
+                topics.add(topic);
+            }
+        }
+        return topics;
+    }
+
+    public CacheZoneCluster parseCacheZone(MqClusterConfig mqClusterConfig) {
+        PulsarClusterConfig pulsarClusterConfig = (PulsarClusterConfig) 
mqClusterConfig;
+        return new CacheZoneCluster(pulsarClusterConfig.getClusterName(),
+                pulsarClusterConfig.getServiceUrl(), 
pulsarClusterConfig.getToken());
+    }
+
+    @Override
+    public void configure(ClientContext context) {
+
+    }
+}

Reply via email to