This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f1395eb0d8 [INLONG-9297][Manager] Support configuring multiple sink 
types of tasks under a single stream (#9298)
f1395eb0d8 is described below

commit f1395eb0d8bd9f6b4d5f4fd3fa322422f2c27aaa
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Nov 28 20:11:20 2023 +0800

    [INLONG-9297][Manager] Support configuring multiple sink types of tasks 
under a single stream (#9298)
---
 .../inlong/manager/common/consts/SinkType.java     |  61 +++++++-
 .../consts/{StreamType.java => SortType.java}      |  13 +-
 .../inlong/manager/common/consts/StreamType.java   |  13 ++
 .../{StreamType.java => SupportSortType.java}      |  23 +--
 .../inlong/manager/common/enums/GroupStatus.java   |   3 +-
 .../plugin/listener/DeleteSortListener.java        |  90 ++++++-----
 .../plugin/listener/RestartSortListener.java       | 123 ++++++++-------
 .../plugin/listener/StartupSortListener.java       | 125 ++++++++-------
 .../plugin/listener/StartupStreamListener.java     | 123 ++++++++++++++-
 .../plugin/listener/SuspendSortListener.java       |  93 ++++++-----
 .../manager/plugin/poller/SortStatusPoller.java    |  31 ++--
 .../inlong/manager/pojo/sort/SortStatusInfo.java   |   3 +
 .../manager/service/core/impl/SortServiceImpl.java |  12 +-
 .../service/group/InlongGroupServiceImpl.java      |  12 +-
 .../service/listener/sort/SortConfigListener.java  |  17 +-
 .../listener/sort/StreamSortConfigListener.java    |  31 ++--
 .../resource/sort/DefaultSortConfigOperator.java   | 173 +++++++++++----------
 .../service/resource/sort/SortConfigOperator.java  |   8 +-
 .../resource/sort/SortConfigOperatorFactory.java   |  13 +-
 .../resources/application-unit-test.properties     |   2 +
 .../manager/workflow/plugin/sort/SortPoller.java   |   6 +-
 21 files changed, 637 insertions(+), 338 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index f0b215ac30..1d53c71fc2 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -17,26 +17,85 @@
 
 package org.apache.inlong.manager.common.consts;
 
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
 /**
  * Constants of sink type.
  */
 public class SinkType extends StreamType {
 
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String HIVE = "HIVE";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String CLICKHOUSE = "CLICKHOUSE";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String HBASE = "HBASE";
-    public static final String ELASTICSEARCH = "ES";
+
+    @SupportSortType(sortType = SortType.SORT_STANDALONE)
+    public static final String ELASTICSEARCH = "ELASTICSEARCH";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String HDFS = "HDFS";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String GREENPLUM = "GREENPLUM";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String MYSQL = "MYSQL";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String DORIS = "DORIS";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String STARROCKS = "STARROCKS";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String KUDU = "KUDU";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String REDIS = "REDIS";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
+    public static final String TUBEMQ = "TUBEMQ";
+
     /**
      * Tencent cloud log service
      * Details: <a href="https://www.tencentcloud.com/products/cls";>CLS</a>
      */
+    @SupportSortType(sortType = SortType.SORT_STANDALONE)
     public static final String CLS = "CLS";
+
+    public static final Set<String> SORT_FLINK_SINK = new HashSet<>();
+
+    public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>();
+
+    static {
+        SinkType obj = new SinkType();
+        Class<? extends SinkType> clazz = obj.getClass();
+        Field[] fields = clazz.getFields();
+        for (Field field : fields) {
+            if (field.isAnnotationPresent(SupportSortType.class)) {
+                SupportSortType annotation = 
field.getAnnotation(SupportSortType.class);
+                if (Objects.equals(annotation.sortType(), 
SortType.SORT_STANDALONE)) {
+                    SORT_STANDALONE_SINK.add(field.getName());
+                } else {
+                    SORT_FLINK_SINK.add(field.getName());
+                }
+            }
+        }
+    }
+
+    public static boolean containSortFlinkSink(List<String> sinkTypes) {
+        return sinkTypes.stream().anyMatch(SORT_STANDALONE_SINK::contains);
+    }
+
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java
similarity index 63%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
copy to 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java
index afbd57bc50..e40c2922e8 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java
@@ -18,16 +18,11 @@
 package org.apache.inlong.manager.common.consts;
 
 /**
- * Constant for stream types, indicating that both StreamSource and StreamSink 
support these types.
+ * Sort task type, including sort flink and sort standalone
  */
-public class StreamType {
+public enum SortType {
 
-    public static final String KAFKA = "KAFKA";
-    public static final String HUDI = "HUDI";
-    public static final String POSTGRESQL = "POSTGRESQL";
-    public static final String SQLSERVER = "SQLSERVER";
-    public static final String ORACLE = "ORACLE";
-    public static final String PULSAR = "PULSAR";
-    public static final String ICEBERG = "ICEBERG";
+    SORT_FLINK,
 
+    SORT_STANDALONE
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index afbd57bc50..f8f70dfe19 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -22,12 +22,25 @@ package org.apache.inlong.manager.common.consts;
  */
 public class StreamType {
 
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String KAFKA = "KAFKA";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String HUDI = "HUDI";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String POSTGRESQL = "POSTGRESQL";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String SQLSERVER = "SQLSERVER";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String ORACLE = "ORACLE";
+
+    @SupportSortType(sortType = SortType.SORT_STANDALONE)
     public static final String PULSAR = "PULSAR";
+
+    @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String ICEBERG = "ICEBERG";
 
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java
similarity index 58%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
copy to 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java
index afbd57bc50..00c9048aac 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java
@@ -17,17 +17,22 @@
 
 package org.apache.inlong.manager.common.consts;
 
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
 /**
- * Constant for stream types, indicating that both StreamSource and StreamSink 
support these types.
+ * This annotation is used to indicate the type of inbound task used for 
inbound operations, including sort flink and
+ * sort standalone. On the user's SinkType class field, this annotation is 
used to identify which type of sort task each
+ * SinkType uses.
  */
-public class StreamType {
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target({ElementType.FIELD})
+public @interface SupportSortType {
 
-    public static final String KAFKA = "KAFKA";
-    public static final String HUDI = "HUDI";
-    public static final String POSTGRESQL = "POSTGRESQL";
-    public static final String SQLSERVER = "SQLSERVER";
-    public static final String ORACLE = "ORACLE";
-    public static final String PULSAR = "PULSAR";
-    public static final String ICEBERG = "ICEBERG";
+    SortType sortType();
 
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index d0bb62f7bf..79e89ad53b 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -75,7 +75,8 @@ public enum GroupStatus {
         GROUP_STATE_AUTOMATON.put(CONFIGURATION_OFFLINE, 
Sets.newHashSet(CONFIGURATION_OFFLINE, CONFIG_ONLINE_ING,
                 CONFIG_DELETING));
 
-        GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING, 
Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED));
+        GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING,
+                Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED, 
CONFIG_SUCCESSFUL));
 
         GROUP_STATE_AUTOMATON.put(CONFIG_DELETING, 
Sets.newHashSet(CONFIG_DELETING, CONFIG_DELETED, CONFIG_FAILED));
         GROUP_STATE_AUTOMATON.put(CONFIG_DELETED, 
Sets.newHashSet(CONFIG_DELETED));
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index d7f33fd77d..a7918c640a 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -24,8 +24,9 @@ import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -34,6 +35,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobStatus;
 
@@ -84,46 +86,54 @@ public class DeleteSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceProcessForm = 
(GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = 
groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = new HashMap<>();
-        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-        if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for 
groupId=%s", groupId);
-            return ListenerResult.fail(message);
-        }
-
-        FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-        try {
-            flinkOperation.delete(flinkInfo);
-            log.info("job delete success for jobId={}", jobId);
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+        List<InlongStreamInfo> streamInfos = 
groupResourceProcessForm.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamInfos) {
+            List<StreamSink> sinkList = streamInfo.getSinkList();
+            if (CollectionUtils.isEmpty(sinkList)) {
+                continue;
+            }
+            List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+            log.info("stream sink ext info: {}", extList);
+
+            Map<String, String> kvConf = new HashMap<>();
+            extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
+            String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+            if (StringUtils.isNotEmpty(sortExt)) {
+                Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(
+                        JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                        });
+                kvConf.putAll(result);
+            }
+
+            String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+            if (StringUtils.isBlank(jobId)) {
+                String message = String.format("sort job id is empty for 
groupId=%s, streamId=%s", groupId,
+                        streamInfo.getInlongStreamId());
+                return ListenerResult.fail(message);
+            }
+
+            FlinkInfo flinkInfo = new FlinkInfo();
+            flinkInfo.setJobId(jobId);
+            String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+            flinkInfo.setEndpoint(sortUrl);
+
+            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
+            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+            try {
+                flinkOperation.delete(flinkInfo);
+                log.info("job delete success for jobId={}", jobId);
+            } catch (Exception e) {
+                flinkInfo.setException(true);
+                flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+                flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+
+                String message = String.format("delete sort failed for 
groupId=%s, streamId=%s", groupId,
+                        streamInfo.getInlongStreamId());
+                log.error(message, e);
+                return ListenerResult.fail(message + ": " + e.getMessage());
+            }
             flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
-
-            String message = String.format("delete sort failed for 
groupId=%s", groupId);
-            log.error(message, e);
-            return ListenerResult.fail(message + ": " + e.getMessage());
         }
-        flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index c8a223e0af..66bf88b149 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -25,8 +25,9 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -35,6 +36,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobStatus;
 
@@ -85,66 +87,79 @@ public class RestartSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceProcessForm = 
(GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = 
groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = new HashMap<>();
-        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-        if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId 
[%s]", groupId);
-            return ListenerResult.fail(message);
-        }
-        String dataflow = kvConf.get(InlongConstants.DATAFLOW);
-        if (StringUtils.isEmpty(dataflow)) {
-            String message = String.format("dataflow is empty for groupId 
[%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
 
-        FlinkInfo flinkInfo = new FlinkInfo();
-        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
-        flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-        try {
-            flinkOperation.genPath(flinkInfo, dataflow);
-            // todo Currently, savepoint is not being used to restart, but 
will be improved in the future
-            flinkOperation.start(flinkInfo);
-            log.info("job restart success for [{}]", jobId);
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+        List<InlongStreamInfo> streamInfos = 
groupResourceProcessForm.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamInfos) {
+            List<StreamSink> sinkList = streamInfo.getSinkList();
+            if (CollectionUtils.isEmpty(sinkList)) {
+                continue;
+            }
+            List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+            log.info("stream ext info: {}", extList);
+
+            Map<String, String> kvConf = new HashMap<>();
+            extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
+            String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+            if (StringUtils.isNotEmpty(sortExt)) {
+                Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(
+                        JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                        });
+                kvConf.putAll(result);
+            }
+
+            String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+            if (StringUtils.isBlank(jobId)) {
+                String message = String.format("sort job id is empty for 
groupId [%s], streamId [%s]", groupId,
+                        streamInfo.getInlongStreamId());
+                return ListenerResult.fail(message);
+            }
+            String dataflow = kvConf.get(InlongConstants.DATAFLOW);
+            if (StringUtils.isEmpty(dataflow)) {
+                String message = String.format("dataflow is empty for groupId 
[%s], streamId [%s]", groupId,
+                        streamInfo.getInlongStreamId());
+                log.error(message);
+                return ListenerResult.fail(message);
+            }
+
+            FlinkInfo flinkInfo = new FlinkInfo();
+            String jobName = 
Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + 
streamInfo.getInlongStreamId();
+            flinkInfo.setJobName(jobName);
+            String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+            flinkInfo.setEndpoint(sortUrl);
+
+            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
+            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+            try {
+                flinkOperation.genPath(flinkInfo, dataflow);
+                // todo Currently, savepoint is not being used to restart, but 
will be improved in the future
+                flinkOperation.start(flinkInfo);
+                log.info("job restart success for groupId = {}, streamId = {} 
jobId = {}", groupId,
+                        streamInfo.getInlongStreamId(), jobId);
+            } catch (Exception e) {
+                flinkInfo.setException(true);
+                flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+                flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+
+                String message = String.format("restart sort failed for 
groupId [%s], streamId [%s] ", groupId,
+                        streamInfo.getInlongStreamId());
+                log.error(message, e);
+                return ListenerResult.fail(message + e.getMessage());
+            }
+            extList.forEach(groupExtInfo -> 
kvConf.remove(InlongConstants.SORT_JOB_ID));
+            saveInfo(streamInfo, InlongConstants.SORT_JOB_ID, 
flinkInfo.getJobId(), extList);
             flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
-
-            String message = String.format("restart sort failed for groupId 
[%s] ", groupId);
-            log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
         }
-        extList.forEach(groupExtInfo -> 
kvConf.remove(InlongConstants.SORT_JOB_ID));
-        saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), 
extList);
-        flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
         return ListenerResult.success();
     }
 
     /**
-     * Save ext info into list.
+     * Save stream ext info into list.
      */
-    private void saveInfo(String inlongGroupId, String keyName, String 
keyValue, List<InlongGroupExtInfo> extInfoList) {
-        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
-        extInfo.setInlongGroupId(inlongGroupId);
+    private void saveInfo(InlongStreamInfo streamInfo, String keyName, String 
keyValue,
+            List<InlongStreamExtInfo> extInfoList) {
+        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
         extInfo.setKeyName(keyName);
         extInfo.setKeyValue(keyValue);
         extInfoList.add(extInfo);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index b65fb40ea6..0dc3cf08f3 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -25,8 +26,8 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -36,9 +37,11 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobStatus;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -71,7 +74,7 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         log.info("add startup group listener for groupId [{}]", groupId);
-        return true;
+        return 
InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
     }
 
     @Override
@@ -94,65 +97,77 @@ public class StartupSortListener implements 
SortOperateListener {
             return ListenerResult.success();
         }
 
-        InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = extList.stream().filter(v -> 
StringUtils.isNotEmpty(v.getKeyName())
-                && 
StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
-                        InlongGroupExtInfo::getKeyName,
-                        InlongGroupExtInfo::getKeyValue));
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String dataflow = kvConf.get(InlongConstants.DATAFLOW);
-        if (StringUtils.isEmpty(dataflow)) {
-            String message = String.format("dataflow is empty for groupId 
[%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        FlinkInfo flinkInfo = new FlinkInfo();
-
-        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
-        flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-        flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-
-        try {
-            flinkOperation.genPath(flinkInfo, dataflow);
-            flinkOperation.start(flinkInfo);
-            log.info("job submit success, jobId is [{}]", 
flinkInfo.getJobId());
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+        for (InlongStreamInfo streamInfo : streamInfos) {
+            List<StreamSink> sinkList = streamInfo.getSinkList();
+            List<String> sinkTypes = 
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+            if (CollectionUtils.isEmpty(sinkList) || 
!SinkType.containSortFlinkSink(sinkTypes)) {
+                return ListenerResult.success();
+            }
+
+            List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+            log.info("stream ext info: {}", extList);
+            Map<String, String> kvConf = extList.stream().filter(v -> 
StringUtils.isNotEmpty(v.getKeyName())
+                    && 
StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
+                            InlongStreamExtInfo::getKeyName,
+                            InlongStreamExtInfo::getKeyValue));
+
+            String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+            if (StringUtils.isNotEmpty(sortExt)) {
+                Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(
+                        JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                        });
+                kvConf.putAll(result);
+            }
+
+            String dataflow = kvConf.get(InlongConstants.DATAFLOW);
+            if (StringUtils.isEmpty(dataflow)) {
+                String message = String.format("dataflow is empty for groupId 
[%s], streamId [%s]", groupId,
+                        streamInfo.getInlongStreamId());
+                log.error(message);
+                return ListenerResult.fail(message);
+            }
+
+            FlinkInfo flinkInfo = new FlinkInfo();
+
+            String jobName = 
Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + 
streamInfo.getInlongStreamId();
+            flinkInfo.setJobName(jobName);
+            String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+            flinkInfo.setEndpoint(sortUrl);
+            
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+
+            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
+            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+
+            try {
+                flinkOperation.genPath(flinkInfo, dataflow);
+                flinkOperation.start(flinkInfo);
+                log.info("job submit success for groupId = {}, streamId = {}, 
jobId = {}", groupId,
+                        streamInfo.getInlongStreamId(), flinkInfo.getJobId());
+            } catch (Exception e) {
+                flinkInfo.setException(true);
+                flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+                flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+
+                String message = String.format("startup sort failed for 
groupId [%s], streamId [%s]", groupId,
+                        streamInfo.getInlongStreamId());
+                log.error(message, e);
+                return ListenerResult.fail(message + e.getMessage());
+            }
+
+            saveInfo(streamInfo, InlongConstants.SORT_JOB_ID, 
flinkInfo.getJobId(), extList);
             flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
-
-            String message = String.format("startup sort failed for groupId 
[%s] ", groupId);
-            log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
         }
-
-        saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), 
extList);
-        flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
         return ListenerResult.success();
     }
 
     /**
-     * Save ext info into list.
+     * Save stream ext info into list.
      */
-    private void saveInfo(String inlongGroupId, String keyName, String 
keyValue, List<InlongGroupExtInfo> extInfoList) {
-        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
-        extInfo.setInlongGroupId(inlongGroupId);
+    private void saveInfo(InlongStreamInfo streamInfo, String keyName, String 
keyValue,
+            List<InlongStreamExtInfo> extInfoList) {
+        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
         extInfo.setKeyName(keyName);
         extInfo.setKeyValue(keyValue);
         extInfoList.add(extInfo);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 1f2a8fabef..b3a341c37d 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -17,12 +17,36 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
  * Listener for startup the Sort task for InlongStream
@@ -41,13 +65,108 @@ public class StartupStreamListener implements 
SortOperateListener {
      */
     @Override
     public boolean accept(WorkflowContext workflowContext) {
-        log.info("not need to start the sort task for InlongStream");
-        return false;
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add startup stream listener, not 
StreamResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        StreamResourceProcessForm streamProcessForm = 
(StreamResourceProcessForm) processForm;
+        String streamId = 
streamProcessForm.getStreamInfo().getInlongStreamId();
+        if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+            log.info("not add startup stream listener, as the operate was not 
INIT for groupId [{}] streamId [{}]",
+                    groupId, streamId);
+            return false;
+        }
+
+        log.info("add startup stream listener for groupId [{}] streamId [{}]", 
groupId, streamId);
+        return 
InlongConstants.STANDARD_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode());
     }
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
+        ProcessForm processForm = context.getProcessForm();
+        StreamResourceProcessForm streamResourceProcessForm = 
(StreamResourceProcessForm) processForm;
+        InlongStreamInfo streamInfo = 
streamResourceProcessForm.getStreamInfo();
+        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+        log.info("inlong stream :{} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+
+        List<StreamSink> sinkList = streamInfo.getSinkList();
+        List<String> sinkTypes = 
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(sinkList) || 
!SinkType.containSortFlinkSink(sinkTypes)) {
+            log.warn("not any sink configured for group {} and stream {}, skip 
launching sort job", groupId, streamId);
+            return ListenerResult.success();
+        }
+
+        List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+        log.info("stream ext info: {}", extList);
+        Map<String, String> kvConf = extList.stream().filter(v -> 
StringUtils.isNotEmpty(v.getKeyName())
+                && 
StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
+                        InlongStreamExtInfo::getKeyName,
+                        InlongStreamExtInfo::getKeyValue));
+
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
+        }
+
+        String dataflow = kvConf.get(InlongConstants.DATAFLOW);
+        if (StringUtils.isEmpty(dataflow)) {
+            String message = String.format("dataflow is empty for groupId 
[%s], streamId [%s]", groupId,
+                    streamInfo.getInlongStreamId());
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
+
+        FlinkInfo flinkInfo = new FlinkInfo();
+
+        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) 
+ streamInfo.getInlongStreamId();
+        flinkInfo.setJobName(jobName);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+        flinkInfo.setEndpoint(sortUrl);
+        
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+
+        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
+        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+
+        try {
+            flinkOperation.genPath(flinkInfo, dataflow);
+            flinkOperation.start(flinkInfo);
+            log.info("job submit success for groupId = {}, streamId = {}, 
jobId = {}", groupId,
+                    streamInfo.getInlongStreamId(), flinkInfo.getJobId());
+        } catch (Exception e) {
+            flinkInfo.setException(true);
+            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+            flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+
+            String message = String.format("startup sort failed for groupId 
[%s], streamId [%s]", groupId,
+                    streamInfo.getInlongStreamId());
+            log.error(message, e);
+            return ListenerResult.fail(message + e.getMessage());
+        }
+
+        saveInfo(streamInfo, InlongConstants.SORT_JOB_ID, 
flinkInfo.getJobId(), extList);
+        flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
         return ListenerResult.success();
     }
 
+    /**
+     * Save stream ext info into list.
+     */
+    private void saveInfo(InlongStreamInfo streamInfo, String keyName, String 
keyValue,
+            List<InlongStreamExtInfo> extInfoList) {
+        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
+        extInfo.setKeyName(keyName);
+        extInfo.setKeyValue(keyValue);
+        extInfoList.add(extInfo);
+    }
+
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index affe9bbd22..06a76e1bf7 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -24,8 +24,9 @@ import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -34,6 +35,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobStatus;
 
@@ -84,47 +86,56 @@ public class SuspendSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceProcessForm = 
(GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = 
groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = new HashMap<>();
-        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-        if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId 
[%s]", groupId);
-            return ListenerResult.fail(message);
-        }
-
-        FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-        try {
-            // todo Currently, savepoint is not being used to stop, but will 
be improved in the future
-            flinkOperation.delete(flinkInfo);
-            log.info("job suspend success for [{}]", jobId);
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+        List<InlongStreamInfo> streamInfos = 
groupResourceProcessForm.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamInfos) {
+            List<StreamSink> sinkList = streamInfo.getSinkList();
+            if (CollectionUtils.isEmpty(sinkList)) {
+                continue;
+            }
+            List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+            log.info("stream ext info: {}", extList);
+
+            Map<String, String> kvConf = new HashMap<>();
+            extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
+            String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+            if (StringUtils.isNotEmpty(sortExt)) {
+                Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(
+                        JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                        });
+                kvConf.putAll(result);
+            }
+
+            String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+            if (StringUtils.isBlank(jobId)) {
+                String message = String.format("sort job id is empty for 
groupId [%s]", groupId,
+                        streamInfo.getInlongStreamId());
+                return ListenerResult.fail(message);
+            }
+
+            FlinkInfo flinkInfo = new FlinkInfo();
+            flinkInfo.setJobId(jobId);
+            String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+            flinkInfo.setEndpoint(sortUrl);
+
+            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
+            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+            try {
+                // todo Currently, savepoint is not being used to stop, but 
will be improved in the future
+                flinkOperation.delete(flinkInfo);
+                log.info("job suspend success for groupId = {}, streamId ={}, 
jobId = {}", groupId,
+                        streamInfo.getInlongStreamId(), jobId);
+            } catch (Exception e) {
+                flinkInfo.setException(true);
+                flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+                flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+
+                String message = String.format("suspend sort failed for 
groupId [%s], streamId [%s]", groupId,
+                        streamInfo.getInlongStreamId());
+                log.error(message, e);
+                return ListenerResult.fail(message + e.getMessage());
+            }
             flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
-
-            String message = String.format("suspend sort failed for groupId 
[%s] ", groupId);
-            log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
         }
-        flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
         return ListenerResult.success();
 
     }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
index 0f689c0e3a..84509b8a0e 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
@@ -21,9 +21,9 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -66,22 +66,21 @@ public class SortStatusPoller implements SortPoller {
     }
 
     @Override
-    public List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> 
groupInfos, String credentials) {
-        log.debug("begin to poll sort status for inlong groups");
-        if (CollectionUtils.isEmpty(groupInfos)) {
-            log.debug("end to poll sort status, as the inlong groups is 
empty");
+    public List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo> 
streamInfos, String credentials) {
+        log.debug("begin to poll sort status for stream");
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            log.debug("end to poll sort status, as the stream list is empty");
             return Collections.emptyList();
         }
 
-        List<SortStatusInfo> statusInfos = new ArrayList<>(groupInfos.size());
-        for (InlongGroupInfo groupInfo : groupInfos) {
-            String groupId = groupInfo.getInlongGroupId();
+        List<SortStatusInfo> statusInfos = new ArrayList<>(streamInfos.size());
+        for (InlongStreamInfo streamInfo : streamInfos) {
             try {
-                List<InlongGroupExtInfo> extList = groupInfo.getExtList();
-                log.debug("inlong group {} ext info: {}", groupId, extList);
+                List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+                log.debug("stream {} ext info: {}", 
streamInfo.getInlongStreamId(), extList);
 
                 Map<String, String> kvConf = new HashMap<>();
-                extList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
+                extList.forEach(v -> kvConf.put(v.getKeyName(), 
v.getKeyValue()));
                 String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
                 if (StringUtils.isNotEmpty(sortExt)) {
                     Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(
@@ -91,7 +90,10 @@ public class SortStatusPoller implements SortPoller {
                 }
 
                 String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-                SortStatusInfo statusInfo = 
SortStatusInfo.builder().inlongGroupId(groupId).build();
+                SortStatusInfo statusInfo = SortStatusInfo.builder()
+                        .inlongGroupId(streamInfo.getInlongGroupId())
+                        .inlongStreamId(streamInfo.getInlongStreamId())
+                        .build();
                 if (StringUtils.isBlank(jobId)) {
                     statusInfo.setSortStatus(SortStatus.NOT_EXISTS);
                     statusInfos.add(statusInfo);
@@ -104,7 +106,8 @@ public class SortStatusPoller implements SortPoller {
                         
JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), 
SortStatus.UNKNOWN));
                 statusInfos.add(statusInfo);
             } catch (Exception e) {
-                log.error("polling sort status failed for groupId=" + groupId, 
e);
+                log.error("polling sort status failed for groupId=" + 
streamInfo.getInlongGroupId() + " streamId="
+                        + streamInfo.getInlongStreamId(), e);
             }
         }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
index 6fd04ba855..7950c9cc1c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
@@ -41,6 +41,9 @@ public class SortStatusInfo {
     @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty(value = "Inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty(value = "Sort status info")
     private SortStatus sortStatus;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 437e80c035..0072a877f1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -26,10 +26,12 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.core.SortClusterService;
 import org.apache.inlong.manager.service.core.SortService;
 import org.apache.inlong.manager.service.core.SortSourceService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
 import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
 
@@ -38,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -58,6 +61,8 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
     private SortClusterService sortClusterService;
     @Autowired
     private InlongGroupService groupService;
+    @Autowired
+    private InlongStreamService streamService;
 
     /**
      * The plugin poller will be initialed after the application starts.
@@ -92,8 +97,11 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
                     })
                     .filter(Objects::nonNull)
                     .collect(Collectors.toList());
-
-            List<SortStatusInfo> statusInfos = 
sortPoller.pollSortStatus(groupInfoList, request.getCredentials());
+            List<InlongStreamInfo> streamInfos = new ArrayList<>();
+            groupInfoList.forEach(groupInfo -> {
+                
streamInfos.addAll(streamService.list(groupInfo.getInlongGroupId()));
+            });
+            List<SortStatusInfo> statusInfos = 
sortPoller.pollSortStatus(streamInfos, request.getCredentials());
             log.debug("success to list sort status for request={}, result={}", 
request, statusInfos);
             return statusInfos;
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 4e994f8be1..62f9ee7da3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -29,9 +29,11 @@ import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
@@ -107,6 +109,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     @Autowired
     private StreamSourceEntityMapper streamSourceMapper;
     @Autowired
+    private InlongStreamExtEntityMapper streamExtMapper;
+    @Autowired
     private InlongClusterService clusterService;
 
     @Autowired
@@ -211,7 +215,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         List<InlongGroupExtEntity> extEntityList = 
groupExtMapper.selectByGroupId(groupId);
         List<InlongGroupExtInfo> extList = 
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
         groupInfo.setExtList(extList);
-        BaseSortConf sortConf = buildSortConfig(extList);
+        List<InlongStreamExtEntity> streamExtEntities = 
streamExtMapper.selectByRelatedId(groupId, null);
+        BaseSortConf sortConf = buildSortConfig(streamExtEntities);
         groupInfo.setSortConf(sortConf);
 
         LOGGER.debug("success to get inlong group for groupId={}", groupId);
@@ -232,7 +237,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         List<InlongGroupExtEntity> extEntityList = 
groupExtMapper.selectByGroupId(groupId);
         List<InlongGroupExtInfo> extList = 
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
         groupInfo.setExtList(extList);
-        BaseSortConf sortConf = buildSortConfig(extList);
+        List<InlongStreamExtEntity> streamExtEntities = 
streamExtMapper.selectByRelatedId(groupId, null);
+        BaseSortConf sortConf = buildSortConfig(streamExtEntities);
         groupInfo.setSortConf(sortConf);
         return groupInfo;
     }
@@ -595,7 +601,7 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         return true;
     }
 
-    private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
+    private BaseSortConf buildSortConfig(List<InlongStreamExtEntity> extInfos) 
{
         Map<String, String> extMap = new HashMap<>();
         extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), 
extInfo.getKeyValue()));
         String type = extMap.get(InlongConstants.SORT_TYPE);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index b8d776c6e7..ebed9e72e3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -17,11 +17,13 @@
 
 package org.apache.inlong.manager.service.listener.sort;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -41,6 +43,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Event listener of build the Sort config,
@@ -121,8 +124,18 @@ public class SortConfigListener implements 
SortOperateListener {
         }
 
         try {
-            SortConfigOperator operator = 
operatorFactory.getInstance(groupInfo.getEnableZookeeper());
-            operator.buildConfig(groupInfo, streamInfos, false);
+            for (InlongStreamInfo streamInfo : streamInfos) {
+                List<StreamSink> sinkList = streamInfo.getSinkList();
+                if (CollectionUtils.isEmpty(sinkList)) {
+                    continue;
+                }
+                List<String> sinkTypeList = 
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+                List<SortConfigOperator> operatorList = 
operatorFactory.getInstance(sinkTypeList);
+                for (SortConfigOperator operator : operatorList) {
+                    operator.buildConfig(groupInfo, streamInfo,
+                            
InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode()));
+                }
+            }
         } catch (Exception e) {
             String msg = String.format("failed to build sort config for 
groupId=%s, ", groupId);
             LOGGER.error(msg + "streamInfos=" + streamInfos, e);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
index 39ac951535..070f71c5a6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.sort;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.TaskEvent;
@@ -41,8 +42,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Event listener of build the Sort config for one inlong stream,
@@ -85,6 +86,17 @@ public class StreamSortConfigListener implements 
SortOperateListener {
         InlongStreamInfo streamInfo = form.getStreamInfo();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
+        // Read the current information
+        InlongGroupInfo groupInfo = groupService.get(groupId);
+        if (groupInfo == null) {
+            String msg = "inlong group not found with groupId=" + groupId;
+            LOGGER.error(msg);
+            throw new WorkflowListenerException(msg);
+        }
+        form.setGroupInfo(groupInfo);
+        form.setStreamInfo(streamService.get(groupId, streamId));
+        groupInfo = form.getGroupInfo();
+        streamInfo = form.getStreamInfo();
         LOGGER.info("begin to build sort config for groupId={}, streamId={}", 
groupId, streamId);
 
         GroupOperateType operateType = form.getGroupOperateType();
@@ -94,7 +106,6 @@ public class StreamSortConfigListener implements 
SortOperateListener {
             return ListenerResult.success();
         }
 
-        InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         Preconditions.expectTrue(GroupStatus.CONFIG_FAILED != groupStatus,
                 String.format("group status=%s not support start stream for 
groupId=%s", groupStatus, groupId));
@@ -103,17 +114,19 @@ public class StreamSortConfigListener implements 
SortOperateListener {
             LOGGER.warn("not build sort config for groupId={}, streamId={}, as 
not found any sinks", groupId, streamId);
             return ListenerResult.success();
         }
-        // Read the current information
-        form.setGroupInfo(groupInfo);
-        form.setStreamInfo(streamService.get(groupId, streamId));
 
-        List<InlongStreamInfo> streamInfos = 
Collections.singletonList(streamInfo);
         try {
-            SortConfigOperator operator = 
operatorFactory.getInstance(groupInfo.getEnableZookeeper());
-            operator.buildConfig(groupInfo, streamInfos, true);
+
+            List<String> sinkTypeList = 
streamSinks.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+            List<SortConfigOperator> operatorList = 
operatorFactory.getInstance(sinkTypeList);
+            for (SortConfigOperator operator : operatorList) {
+                operator.buildConfig(groupInfo, streamInfo,
+                        
InlongConstants.SYNC_SEND.equals(groupInfo.getInlongGroupMode()));
+            }
+
         } catch (Exception e) {
             String msg = String.format("failed to build sort config for 
groupId=%s, streamId=%s, ", groupId, streamId);
-            LOGGER.error(msg + "streamInfos=" + streamInfos, e);
+            LOGGER.error(msg + "streamInfo=" + streamInfo, e);
             throw new WorkflowListenerException(msg + e.getMessage());
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 11c4e3d6f6..08bc53cbf0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -18,13 +18,15 @@
 package org.apache.inlong.manager.service.resource.sort;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sort.node.NodeFactory;
 import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
 import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
@@ -48,6 +50,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -78,111 +81,120 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     private AuditService auditService;
 
     @Override
-    public Boolean accept(Integer enableZk) {
-        return InlongConstants.DISABLE_ZK.equals(enableZk);
+    public Boolean accept(List<String> sinkTypeList) {
+        for (String sinkType : sinkTypeList) {
+            if (SinkType.SORT_FLINK_SINK.contains(sinkType)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
-    public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> 
streamInfos, boolean isStream)
+    public void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo 
streamInfo, boolean isStream)
             throws Exception {
         if (isStream) {
             LOGGER.warn("no need to build sort config for stream process when 
disable zk");
             return;
         }
-        if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
-            LOGGER.warn("no need to build sort config as the group is null or 
streams is empty when disable zk");
+        if (groupInfo == null || streamInfo == null) {
+            LOGGER.warn("no need to build sort config as the group is null or 
stream is empty when disable zk");
             return;
         }
-
-        GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos);
+        List<StreamSink> sinkList = new ArrayList<>();
+        for (StreamSink sink : streamInfo.getSinkList()) {
+            if (SinkType.SORT_FLINK_SINK.contains(sink.getSinkType())) {
+                sinkList.add(sink);
+            }
+        }
+        if (CollectionUtils.isEmpty(sinkList)) {
+            return;
+        }
+        GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfo, 
sinkList);
         String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo);
-        this.addToGroupExt(groupInfo, dataflow);
+        this.addToStreamExt(streamInfo, dataflow);
 
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("success to build sort config, isStream={}, 
dataflow={}", isStream, dataflow);
         }
     }
 
-    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List<InlongStreamInfo> streamInfoList) {
+    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo 
inlongStreamInfo,
+            List<StreamSink> sinkInfos) {
+        String streamId = inlongStreamInfo.getInlongStreamId();
         // get source info
-        Map<String, List<StreamSource>> sourceMap = 
sourceService.getSourcesMap(groupInfo, streamInfoList);
-        // get sink info
-        Map<String, List<StreamSink>> sinkMap = 
sinkService.getSinksMap(groupInfo, streamInfoList);
-        List<TransformResponse> transformList = 
transformService.listTransform(groupInfo.getInlongGroupId(), null);
+        Map<String, List<StreamSource>> sourceMap = 
sourceService.getSourcesMap(groupInfo,
+                Collections.singletonList(inlongStreamInfo));
+        List<TransformResponse> transformList = 
transformService.listTransform(groupInfo.getInlongGroupId(), streamId);
         Map<String, List<TransformResponse>> transformMap = 
transformList.stream()
                 
.collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, 
HashMap::new,
                         Collectors.toCollection(ArrayList::new)));
 
         List<StreamInfo> sortStreamInfos = new ArrayList<>();
-        for (InlongStreamInfo inlongStream : streamInfoList) {
-            String streamId = inlongStream.getInlongStreamId();
-            Map<String, StreamField> fieldMap = new HashMap<>();
-            inlongStream.getSourceList().forEach(
-                    source -> parseConstantFieldMap(source.getSourceName(), 
source.getFieldList(), fieldMap));
+        Map<String, StreamField> fieldMap = new HashMap<>();
+        inlongStreamInfo.getSourceList().forEach(
+                source -> parseConstantFieldMap(source.getSourceName(), 
source.getFieldList(), fieldMap));
+
+        List<TransformResponse> transformResponses = 
transformMap.get(streamId);
+        if (CollectionUtils.isNotEmpty(transformResponses)) {
+            transformResponses.forEach(
+                    trans -> parseConstantFieldMap(trans.getTransformName(), 
trans.getFieldList(), fieldMap));
+        }
 
-            List<TransformResponse> transformResponses = 
transformMap.get(streamId);
-            if (CollectionUtils.isNotEmpty(transformResponses)) {
-                transformResponses.forEach(
-                        trans -> 
parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), 
fieldMap));
-            }
+        // build a stream info from the nodes and relations
+        List<StreamSource> sources = sourceMap.get(streamId);
+        for (StreamSink sinkInfo : sinkInfos) {
+            CommonBeanUtils.copyProperties(inlongStreamInfo, sinkInfo, true);
+            addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), true);
+        }
 
-            // build a stream info from the nodes and relations
-            List<StreamSource> sources = sourceMap.get(streamId);
-            List<StreamSink> sinks = sinkMap.get(streamId);
+        for (StreamSource source : sources) {
+            source.setFieldList(inlongStreamInfo.getFieldList());
+        }
+        List<NodeRelation> relations;
 
-            for (StreamSink sink : sinks) {
-                addAuditId(sink.getProperties(), sink.getSinkType(), true);
-            }
-            for (StreamSource source : sources) {
-                source.setFieldList(inlongStream.getFieldList());
+        if 
(InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) {
+            if (CollectionUtils.isNotEmpty(transformResponses)) {
+                relations = 
NodeRelationUtils.createNodeRelations(inlongStreamInfo);
+                // in standard mode(include Data Ingestion and 
Synchronization), replace upstream source node and
+                // transform input fields node to MQ node (which is InLong 
stream id)
+                String mqNodeName = sources.get(0).getSourceName();
+                Set<String> nodeNameSet = getInputNodeNames(sources, 
transformResponses);
+                adjustTransformField(transformResponses, nodeNameSet, 
mqNodeName);
+                adjustNodeRelations(relations, nodeNameSet, mqNodeName);
+            } else {
+                relations = NodeRelationUtils.createNodeRelations(sources, 
sinkInfos);
             }
-            List<NodeRelation> relations;
-
-            if 
(InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) {
-                if (CollectionUtils.isNotEmpty(transformResponses)) {
-                    relations = 
NodeRelationUtils.createNodeRelations(inlongStream);
-                    // in standard mode(include Data Ingestion and 
Synchronization), replace upstream source node and
-                    // transform input fields node to MQ node (which is InLong 
stream id)
-                    String mqNodeName = sources.get(0).getSourceName();
-                    Set<String> nodeNameSet = getInputNodeNames(sources, 
transformResponses);
-                    adjustTransformField(transformResponses, nodeNameSet, 
mqNodeName);
-                    adjustNodeRelations(relations, nodeNameSet, mqNodeName);
-                } else {
-                    relations = NodeRelationUtils.createNodeRelations(sources, 
sinks);
-                }
 
-                if (sources.size() == sinks.size()) {
-                    for (int i = 0; i < sinks.size(); i++) {
-                        addAuditId(sources.get(i).getProperties(), 
sinks.get(i).getSinkType(), false);
-                    }
-                }
+            for (int i = 0; i < sources.size(); i++) {
+                addAuditId(sources.get(i).getProperties(), 
sinkInfos.get(0).getSinkType(), false);
+            }
+        } else {
+            if (CollectionUtils.isNotEmpty(transformResponses)) {
+                List<String> sourcesNames = 
sources.stream().map(StreamSource::getSourceName)
+                        .collect(Collectors.toList());
+                List<String> transFormNames = 
transformResponses.stream().map(TransformResponse::getTransformName)
+                        .collect(Collectors.toList());
+                relations = 
Arrays.asList(NodeRelationUtils.createNodeRelation(sourcesNames, 
transFormNames),
+                        NodeRelationUtils.createNodeRelation(transFormNames,
+                                
sinkInfos.stream().map(StreamSink::getSinkName).collect(Collectors.toList())));
             } else {
-                if (CollectionUtils.isNotEmpty(transformResponses)) {
-                    List<String> sourcesNames = 
sources.stream().map(StreamSource::getSourceName)
-                            .collect(Collectors.toList());
-                    List<String> transFormNames = 
transformResponses.stream().map(TransformResponse::getTransformName)
-                            .collect(Collectors.toList());
-                    List<String> sinkNames = 
sinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
-                    relations = 
Arrays.asList(NodeRelationUtils.createNodeRelation(sourcesNames, 
transFormNames),
-                            
NodeRelationUtils.createNodeRelation(transFormNames, sinkNames));
-                } else {
-                    relations = NodeRelationUtils.createNodeRelations(sources, 
sinks);
-                }
+                relations = NodeRelationUtils.createNodeRelations(sources, 
sinkInfos);
+            }
 
-                for (StreamSource source : sources) {
-                    addAuditId(source.getProperties(), source.getSourceType(), 
false);
-                }
+            for (StreamSource source : sources) {
+                addAuditId(source.getProperties(), source.getSourceType(), 
false);
             }
+        }
 
-            // create extract-transform-load nodes
-            List<Node> nodes = this.createNodes(sources, transformResponses, 
sinks, fieldMap);
+        // create extract-transform-load nodes
+        List<Node> nodes = this.createNodes(sources, transformResponses, 
sinkInfos, fieldMap);
 
-            StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
-            sortStreamInfos.add(streamInfo);
+        StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
+        sortStreamInfos.add(streamInfo);
 
-            // rebuild joinerNode relation
-            NodeRelationUtils.optimizeNodeRelation(streamInfo, 
transformResponses);
-        }
+        // rebuild joinerNode relation
+        NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses);
 
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
@@ -265,20 +277,21 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     }
 
     /**
-     * Add config into inlong group ext info
+     * Add config into inlong stream ext info
      */
-    private void addToGroupExt(InlongGroupInfo groupInfo, String value) {
-        if (groupInfo.getExtList() == null) {
-            groupInfo.setExtList(new ArrayList<>());
+    private void addToStreamExt(InlongStreamInfo streamInfo, String value) {
+        if (streamInfo.getExtList() == null) {
+            streamInfo.setExtList(new ArrayList<>());
         }
 
-        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
-        extInfo.setInlongGroupId(groupInfo.getInlongGroupId());
+        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
         extInfo.setKeyName(InlongConstants.DATAFLOW);
         extInfo.setKeyValue(value);
 
-        groupInfo.getExtList().removeIf(ext -> 
extInfo.getKeyName().equals(ext.getKeyName()));
-        groupInfo.getExtList().add(extInfo);
+        streamInfo.getExtList().removeIf(ext -> 
extInfo.getKeyName().equals(ext.getKeyName()));
+        streamInfo.getExtList().add(extInfo);
     }
 
     private void addAuditId(Map<String, Object> properties, String type, 
boolean isSent) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
index ce0e31f589..bd4f3ee712 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
@@ -30,17 +30,17 @@ public interface SortConfigOperator {
     /**
      * Determines whether the current instance matches the specified type.
      *
-     * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: 
disable
+     * @param sinkTypeList sink type list
      */
-    Boolean accept(Integer enableZk);
+    Boolean accept(List<String> sinkTypeList);
 
     /**
      * Build Sort config.
      *
      * @param groupInfo inlong group info
-     * @param streamInfos inlong stream info list
+     * @param streamInfo inlong stream info
      * @param isStream is the config built for inlong stream
      */
-    void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> 
streamInfos, boolean isStream) throws Exception;
+    void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, 
boolean isStream) throws Exception;
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
index 95a9c5c39f..baa9708f68 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
@@ -17,12 +17,11 @@
 
 package org.apache.inlong.manager.service.resource.sort;
 
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Factory for {@link SortConfigOperator}.
@@ -36,14 +35,10 @@ public class SortConfigOperatorFactory {
     /**
      * Get a Sort config operator instance.
      *
-     * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: 
disable
+     * @param sinkTypeList sink type
      */
-    public SortConfigOperator getInstance(Integer enableZk) {
-        return operatorList.stream()
-                .filter(inst -> inst.accept(enableZk))
-                .findFirst()
-                .orElseThrow(() -> new BusinessException("not found any 
instance of SortConfigOperator when enableZk="
-                        + enableZk));
+    public List<SortConfigOperator> getInstance(List<String> sinkTypeList) {
+        return operatorList.stream().filter(inst -> 
inst.accept(sinkTypeList)).collect(Collectors.toList());
     }
 
 }
diff --git 
a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
 
b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
index aa9155e735..17e8ccd2ae 100644
--- 
a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
+++ 
b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
@@ -70,3 +70,5 @@ common.http-client.connectionRequestTimeout=3000
 
 # tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
+
+
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
index 68dd242850..a602e0e979 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.manager.workflow.plugin.sort;
 
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 
 import java.util.List;
 
@@ -30,11 +30,11 @@ public interface SortPoller {
     /**
      * Poll the Sort status infos by the given inlong groups
      *
-     * @param groupInfos inlong group infos
+     * @param streamInfos stream sink infos
      * @param credentials credential info
      * @return list of Sort status infos
      * @throws Exception any exception if occurred
      */
-    List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, 
String credentials) throws Exception;
+    List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo> streamInfos, 
String credentials) throws Exception;
 
 }

Reply via email to