This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 632a477ee7 [INLONG-9192][Manager] Flat Sort Cluster types (#9193)
632a477ee7 is described below

commit 632a477ee73b0a98910b14310b42b7b19938a8b0
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Nov 6 17:18:09 2023 +0800

    [INLONG-9192][Manager] Flat Sort Cluster types (#9193)
---
 .../inlong/manager/common/consts/SinkType.java     |   2 +-
 .../inlong/manager/common/enums/ClusterType.java   |  11 ++-
 .../dao/mapper/InlongClusterEntityMapper.java      |   2 -
 .../mappers/InlongClusterEntityMapper.xml          |  10 --
 .../cls/SortClsClusterInfo.java}                   |  19 ++--
 .../cls/SortClsClusterRequest.java}                |  21 ++--
 .../es/SortEsClusterInfo.java}                     |  21 ++--
 .../es/SortEsClusterRequest.java}                  |  14 +--
 .../pulsar/SortPulsarClusterInfo.java}             |  20 ++--
 .../pulsar/SortPulsarClusterRequest.java}          |  17 ++--
 .../inlong/manager/pojo/sink/cls/ClsSink.java      |   4 -
 .../manager/pojo/sink/es/ElasticsearchSink.java    |   4 +-
 .../manager/pojo/sink/pulsar/PulsarSink.java       |   4 -
 .../BaseSortClusterDTO.java}                       |  31 ++----
 .../BaseSortClusterInfo.java}                      |  27 ++----
 .../BaseSortClusterRequest.java}                   |  14 +--
 .../cluster/ElasticsearchClusterOperator.java      |  93 ------------------
 .../service/cluster/InlongClusterOperator.java     |   4 +-
 .../service/cluster/SortClusterOperator.java       | 106 +++++++++++++++++++++
 .../cluster/SortStandaloneClusterOperator.java     |  81 ----------------
 .../AbstractStandaloneSinkResourceOperator.java    |   5 +-
 .../service/cluster/InlongClusterServiceTest.java  |  29 ++----
 .../resource/sink/StandaloneAutoAssignTest.java    |  12 +--
 .../sort-connectors/tubemq/pom.xml                 |   5 +
 24 files changed, 204 insertions(+), 352 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 965e809f51..f0b215ac30 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -25,7 +25,7 @@ public class SinkType extends StreamType {
     public static final String HIVE = "HIVE";
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String HBASE = "HBASE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
+    public static final String ELASTICSEARCH = "ES";
     public static final String HDFS = "HDFS";
     public static final String GREENPLUM = "GREENPLUM";
     public static final String MYSQL = "MYSQL";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index f6a33ff71e..6d010d25cc 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -32,8 +32,10 @@ public class ClusterType {
     public static final String PULSAR = "PULSAR";
     public static final String DATAPROXY = "DATAPROXY";
     public static final String KAFKA = "KAFKA";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String SORTSTANDALONE = "SORTSTANDALONE";
+
+    public static final String SORT_ES = "SORT_ES";
+    public static final String SORT_CLS = "SORT_CLS";
+    public static final String SORT_PULSAR = "SORT_PULSAR";
 
     private static final Set<String> TYPE_SET = new HashSet<String>() {
 
@@ -43,8 +45,9 @@ public class ClusterType {
             add(ClusterType.PULSAR);
             add(ClusterType.DATAPROXY);
             add(ClusterType.KAFKA);
-            add(ClusterType.ELASTICSEARCH);
-            add(ClusterType.SORTSTANDALONE);
+            add(ClusterType.SORT_ES);
+            add(ClusterType.SORT_CLS);
+            add(ClusterType.SORT_PULSAR);
         }
     };
 
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index a0d71abc5e..ce2644691d 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -64,6 +64,4 @@ public interface InlongClusterEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
-    List<InlongClusterEntity> selectStandaloneClusterByType(@Param("sinkType") 
String sinkType);
-
 }
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index 77f0eb3769..f554faf64a 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -189,16 +189,6 @@
         from inlong_cluster
         where is_deleted = 0
     </select>
-    <select id="selectStandaloneClusterByType" 
resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from inlong_cluster
-        <where>
-            type = 'SORTSTANDALONE'
-            and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag)
-            and is_deleted = 0
-        </where>
-    </select>
 
     <update id="updateById" 
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
         update inlong_cluster
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java
similarity index 72%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java
index 51e9536bb3..96eedc4667 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java
@@ -15,28 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.cluster.sort.cls;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-/**
- * Inlong cluster request for Elasticsearch
- */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
-@ApiModel("Inlong cluster request for Elasticsearch")
-public class ElasticsearchClusterRequest extends ClusterRequest {
+@JsonTypeDefine(value = ClusterType.SORT_CLS)
+@ApiModel("Inlong cluster info for SortCls")
+public class SortClsClusterInfo extends BaseSortClusterInfo {
 
-    public ElasticsearchClusterRequest() {
-        this.setType(ClusterType.ELASTICSEARCH);
+    public SortClsClusterInfo() {
+        this.setType(ClusterType.SORT_CLS);
     }
-}
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java
similarity index 66%
rename from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
rename to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java
index fd8c10e0d0..5cf261c932 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java
@@ -15,32 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+package org.apache.inlong.manager.pojo.cluster.sort.cls;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;
 
 import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-import java.util.Set;
-
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
-@ApiModel("Inlong cluster request for SortStandalone")
-public class SortStandaloneClusterRequest extends ClusterRequest {
-
-    @ApiModelProperty(value = "Supported sink types")
-    private Set<String> supportedSinkTypes;
+@JsonTypeDefine(value = ClusterType.SORT_CLS)
+@ApiModel("Inlong cluster request for SortCls")
+public class SortClsClusterRequest extends BaseSortClusterRequest {
 
-    public SortStandaloneClusterRequest() {
-        this.setType(ClusterType.SORTSTANDALONE);
+    public SortClsClusterRequest() {
+        this.setType(ClusterType.SORT_CLS);
     }
-
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java
similarity index 65%
rename from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java
rename to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java
index fa56998169..15eb6ef4f0 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java
@@ -15,37 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.cluster.sort.es;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import lombok.experimental.SuperBuilder;
 
 /**
  * Elasticsearch cluster info
  */
 @Data
-@SuperBuilder
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
+@JsonTypeDefine(value = ClusterType.SORT_ES)
 @ApiModel("Inlong cluster info for Elasticsearch")
-public class ElasticsearchClusterInfo extends ClusterInfo {
+public class SortEsClusterInfo extends BaseSortClusterInfo {
 
-    public ElasticsearchClusterInfo() {
-        this.setType(ClusterType.ELASTICSEARCH);
-    }
-
-    @Override
-    public ClusterRequest genRequest() {
-        return CommonBeanUtils.copyProperties(this, 
ElasticsearchClusterRequest::new);
+    public SortEsClusterInfo() {
+        this.setType(ClusterType.SORT_ES);
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java
similarity index 75%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java
index 51e9536bb3..b51e0cd844 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.cluster.sort.es;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
@@ -32,11 +32,11 @@ import lombok.ToString;
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
-@ApiModel("Inlong cluster request for Elasticsearch")
-public class ElasticsearchClusterRequest extends ClusterRequest {
+@JsonTypeDefine(value = ClusterType.SORT_ES)
+@ApiModel("Inlong cluster request for SortEs")
+public class SortEsClusterRequest extends BaseSortClusterRequest {
 
-    public ElasticsearchClusterRequest() {
-        this.setType(ClusterType.ELASTICSEARCH);
+    public SortEsClusterRequest() {
+        this.setType(ClusterType.SORT_ES);
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java
similarity index 72%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java
index 51e9536bb3..0eaffe866a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java
@@ -15,28 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.cluster.sort.pulsar;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-/**
- * Inlong cluster request for Elasticsearch
- */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
-@ApiModel("Inlong cluster request for Elasticsearch")
-public class ElasticsearchClusterRequest extends ClusterRequest {
+@JsonTypeDefine(value = ClusterType.SORT_PULSAR)
+@ApiModel("Inlong cluster info for SortPulsar")
+public class SortPulsarClusterInfo extends BaseSortClusterInfo {
 
-    public ElasticsearchClusterRequest() {
-        this.setType(ClusterType.ELASTICSEARCH);
+    public SortPulsarClusterInfo() {
+        this.setType(ClusterType.SORT_PULSAR);
     }
-}
+
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java
similarity index 72%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java
index 51e9536bb3..f8be4ad614 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java
@@ -15,28 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.cluster.sort.pulsar;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-/**
- * Inlong cluster request for Elasticsearch
- */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
-@ApiModel("Inlong cluster request for Elasticsearch")
-public class ElasticsearchClusterRequest extends ClusterRequest {
+@JsonTypeDefine(value = ClusterType.SORT_PULSAR)
+@ApiModel("Inlong cluster request for SortPulsar")
+public class SortPulsarClusterRequest extends BaseSortClusterRequest {
 
-    public ElasticsearchClusterRequest() {
-        this.setType(ClusterType.ELASTICSEARCH);
+    public SortPulsarClusterRequest() {
+        this.setType(ClusterType.SORT_PULSAR);
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
index 1668da82b8..275a242838 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
@@ -25,18 +25,14 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import lombok.experimental.SuperBuilder;
 
 /**
  * Cloud log service sink info
  */
 @Data
-@SuperBuilder
-@AllArgsConstructor
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Cloud log service sink info")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
index 27a10b3cf5..5f92dddbeb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
@@ -25,7 +25,6 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -35,11 +34,10 @@ import lombok.experimental.SuperBuilder;
  * Elasticsearch sink info
  */
 @Data
-@SuperBuilder
-@AllArgsConstructor
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Elasticsearch sink info")
+@SuperBuilder
 @JsonTypeDefine(value = SinkType.ELASTICSEARCH)
 public class ElasticsearchSink extends StreamSink {
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
index a29cd33460..cebb5f8066 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
@@ -25,18 +25,14 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import lombok.experimental.SuperBuilder;
 
 /**
  * Pulsar sink info
  */
 @Data
-@SuperBuilder
-@AllArgsConstructor
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Pulsar sink info")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java
similarity index 54%
rename from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java
rename to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java
index 64c201e75f..6d70512e7d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java
@@ -15,49 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.sort;
 
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
 import io.swagger.annotations.ApiModel;
-import lombok.Builder;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 
 import javax.validation.constraints.NotNull;
 
 /**
- * Elasticsearch cluster info
+ * Base sort cluster DTO
  */
 @Data
-@Builder
-@NoArgsConstructor
-@ApiModel("Elasticsearch cluster info")
-public class ElasticsearchClusterDTO {
+@ApiModel("Base sort cluster info")
+public class BaseSortClusterDTO {
 
     /**
      * Get the dto instance from the request
      */
-    public static ElasticsearchClusterDTO 
getFromRequest(ElasticsearchClusterRequest request, String extParams) {
-        ElasticsearchClusterDTO dto = StringUtils.isNotBlank(extParams)
-                ? ElasticsearchClusterDTO.getFromJson(extParams)
-                : new ElasticsearchClusterDTO();
+    public static BaseSortClusterDTO getFromRequest(BaseSortClusterRequest 
request, String extParams) throws Exception {
+        BaseSortClusterDTO dto = StringUtils.isNotBlank(extParams)
+                ? BaseSortClusterDTO.getFromJson(extParams)
+                : new BaseSortClusterDTO();
         return CommonBeanUtils.copyProperties(request, dto, true);
     }
 
     /**
      * Get the dto instance from the JSON string.
      */
-    public static ElasticsearchClusterDTO getFromJson(@NotNull String 
extParams) {
-        try {
-            return JsonUtils.parseObject(extParams, 
ElasticsearchClusterDTO.class);
-        } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
-                    String.format("parse extParams of Elasticsearch Cluster 
failure: %s", e.getMessage()));
-        }
+    public static BaseSortClusterDTO getFromJson(@NotNull String extParams) {
+        return JsonUtils.parseObject(extParams, BaseSortClusterDTO.class);
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java
similarity index 58%
rename from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
rename to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java
index ea14d2a56b..c33ec2d1d0 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java
@@ -15,38 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+package org.apache.inlong.manager.pojo.sort;
 
-import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 
 import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-import java.util.Set;
-
+/**
+ * Inlong base sort cluster info
+ */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
-@ApiModel("Inlong cluster info for SortStandalone")
-public class SortStandaloneClusterInfo extends ClusterInfo {
-
-    @ApiModelProperty(value = "Supported sink types")
-    private Set<String> supportedSinkTypes;
-
-    public SortStandaloneClusterInfo() {
-        this.setType(ClusterType.SORTSTANDALONE);
-    }
+@ApiModel("Inlong base sort cluster info")
+public class BaseSortClusterInfo extends ClusterInfo {
 
     @Override
-    public ClusterRequest genRequest() {
-        return CommonBeanUtils.copyProperties(this, 
SortStandaloneClusterRequest::new);
+    public BaseSortClusterRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
BaseSortClusterRequest::new);
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java
similarity index 68%
rename from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
rename to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java
index 51e9536bb3..1edec5dc5c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster.es;
+package org.apache.inlong.manager.pojo.sort;
 
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 
 import io.swagger.annotations.ApiModel;
@@ -27,16 +25,12 @@ import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
 /**
- * Inlong cluster request for Elasticsearch
+ * Inlong base sort cluster request
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
-@ApiModel("Inlong cluster request for Elasticsearch")
-public class ElasticsearchClusterRequest extends ClusterRequest {
+@ApiModel("Inlong base sort cluster request")
+public class BaseSortClusterRequest extends ClusterRequest {
 
-    public ElasticsearchClusterRequest() {
-        this.setType(ClusterType.ELASTICSEARCH);
-    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java
deleted file mode 100644
index 9d28208098..0000000000
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.cluster;
-
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterDTO;
-import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterRequest;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Elasticsearch cluster operator.
- */
-@Service
-public class ElasticsearchClusterOperator extends AbstractClusterOperator {
-
-    @Autowired
-    private ObjectMapper mapper;
-
-    @Override
-    public Boolean accept(String clusterType) {
-        return getClusterType().equals(clusterType);
-    }
-
-    @Override
-    public String getClusterType() {
-        return ClusterType.ELASTICSEARCH;
-    }
-
-    @Override
-    protected void setTargetEntity(ClusterRequest request, InlongClusterEntity 
targetEntity) {
-        ElasticsearchClusterRequest esRequest = (ElasticsearchClusterRequest) 
request;
-        CommonBeanUtils.copyProperties(esRequest, targetEntity, true);
-        try {
-            ElasticsearchClusterDTO dto =
-                    ElasticsearchClusterDTO.getFromRequest(esRequest, 
targetEntity.getExtParams());
-            targetEntity.setExtParams(mapper.writeValueAsString(dto));
-        } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
-                    String.format("serialize extParams of Elasticsearch 
Cluster failure: %s", e.getMessage()));
-        }
-    }
-
-    @Override
-    public ClusterInfo getFromEntity(InlongClusterEntity entity) {
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
-        }
-        ElasticsearchClusterInfo info = new ElasticsearchClusterInfo();
-        CommonBeanUtils.copyProperties(entity, info);
-        if (StringUtils.isNotBlank(entity.getExtParams())) {
-            ElasticsearchClusterDTO dto = 
ElasticsearchClusterDTO.getFromJson(entity.getExtParams());
-            CommonBeanUtils.copyProperties(dto, info);
-        }
-        return info;
-    }
-
-    @Override
-    public Object getClusterInfo(InlongClusterEntity entity) {
-        ElasticsearchClusterInfo elasticsearchClusterInfo = 
(ElasticsearchClusterInfo) this.getFromEntity(entity);
-        Map<String, String> map = new HashMap<>();
-        map.put("url", elasticsearchClusterInfo.getUrl());
-        return map;
-    }
-}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
index 516f894328..db9fb0b336 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
@@ -36,7 +36,9 @@ public interface InlongClusterOperator {
      *
      * @return cluster type string
      */
-    String getClusterType();
+    default String getClusterType() {
+        return null;
+    }
 
     /**
      * Save the inlong cluster info.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
new file mode 100644
index 0000000000..dbb889565e
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo;
+import 
org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Service
+public class SortClusterOperator extends AbstractClusterOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SortClusterOperator.class);
+
+    private static final Set<String> SORT_CLUSTER_SET = new HashSet<String>() {
+
+        {
+            add(ClusterType.SORT_CLS);
+            add(ClusterType.SORT_PULSAR);
+            add(ClusterType.SORT_ES);
+        }
+    };
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String clusterType) {
+        return SORT_CLUSTER_SET.contains(clusterType);
+    }
+
+    @Override
+    protected void setTargetEntity(ClusterRequest request, InlongClusterEntity 
targetEntity) {
+        BaseSortClusterRequest clusterRequest = (BaseSortClusterRequest) 
request;
+        CommonBeanUtils.copyProperties(clusterRequest, targetEntity, true);
+        try {
+            BaseSortClusterDTO dto = 
BaseSortClusterDTO.getFromRequest(clusterRequest, targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            LOGGER.info("success to set entity for sort cluster");
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+
+    @Override
+    public ClusterInfo getFromEntity(InlongClusterEntity entity) {
+        Preconditions.expectNotNull(entity, 
ErrorCodeEnum.CLUSTER_NOT_FOUND.getMessage());
+
+        ClusterInfo sortClusterInfo;
+        switch (entity.getType()) {
+            case ClusterType.SORT_CLS:
+                sortClusterInfo = new SortClsClusterInfo();
+                break;
+            case ClusterType.SORT_PULSAR:
+                sortClusterInfo = new SortPulsarClusterInfo();
+                break;
+            case ClusterType.SORT_ES:
+                sortClusterInfo = new SortEsClusterInfo();
+                break;
+            default:
+                throw new BusinessException("unsupported cluster type " + 
entity.getType());
+        }
+
+        CommonBeanUtils.copyProperties(entity, sortClusterInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            BaseSortClusterDTO dto = 
BaseSortClusterDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, sortClusterInfo);
+        }
+        return sortClusterInfo;
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
deleted file mode 100644
index 3cf12837c2..0000000000
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.cluster;
-
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
-import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
-import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.stereotype.Service;
-
-import java.util.Set;
-
-@Slf4j
-@Service
-public class SortStandaloneClusterOperator extends AbstractClusterOperator {
-
-    @Override
-    protected void setTargetEntity(ClusterRequest request, InlongClusterEntity 
targetEntity) {
-        SortStandaloneClusterRequest standaloneRequest = 
(SortStandaloneClusterRequest) request;
-        CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true);
-        Set<String> supportedTypes = standaloneRequest.getSupportedSinkTypes();
-        if (CollectionUtils.isNotEmpty(supportedTypes)) {
-            String extTag = 
Joiner.on(InlongConstants.COMMA).join(supportedTypes);
-            targetEntity.setExtTag(extTag);
-        }
-    }
-
-    @Override
-    public Boolean accept(String clusterType) {
-        return getClusterType().equals(clusterType);
-    }
-
-    @Override
-    public String getClusterType() {
-        return ClusterType.SORTSTANDALONE;
-    }
-
-    @Override
-    public ClusterInfo getFromEntity(InlongClusterEntity entity) {
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
-        }
-
-        SortStandaloneClusterInfo clusterInfo = new 
SortStandaloneClusterInfo();
-        CommonBeanUtils.copyProperties(entity, clusterInfo);
-        String extTag = entity.getExtTag();
-        if (StringUtils.isNotBlank(extTag)) {
-            Set<String> supportedTypes = 
Sets.newHashSet(extTag.split(InlongConstants.COMMA));
-            clusterInfo.setSupportedSinkTypes(supportedTypes);
-        }
-        return clusterInfo;
-    }
-
-}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index a7e3fb9e62..61187e6f8d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -46,6 +46,8 @@ public abstract class AbstractStandaloneSinkResourceOperator 
implements SinkReso
     @Autowired
     private InlongGroupEntityMapper groupEntityMapper;
 
+    private static final String SORT_PREFIX = "SORT_";
+
     private Random rand = new Random();
 
     @VisibleForTesting
@@ -77,8 +79,9 @@ public abstract class AbstractStandaloneSinkResourceOperator 
implements SinkReso
 
     private String assignFromRelated(String sinkType, String groupId) {
         InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
+        String sortClusterType = SORT_PREFIX.concat(sinkType);
         List<InlongClusterEntity> clusters = clusterEntityMapper
-                .selectStandaloneClusterByType(sinkType).stream()
+                .selectByKey(null, null, sortClusterType).stream()
                 .filter(cluster -> checkCluster(cluster.getClusterTags(), 
group.getInlongClusterTag()))
                 .collect(Collectors.toList());
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 39106fccda..353a244dfb 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -32,8 +32,8 @@ import 
org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
 import 
org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
-import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
-import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
+import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -45,9 +45,7 @@ import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Inlong cluster service test for {@link InlongClusterService}
@@ -59,11 +57,10 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
     @Autowired
     private HeartbeatManager heartbeatManager;
 
-    public Integer saveStandaloneCluster(String clusterTag, String 
clusterName, Set<String> supportedSinkTypes) {
-        SortStandaloneClusterRequest request = new 
SortStandaloneClusterRequest();
+    public Integer saveStandaloneCluster(String clusterTag, String 
clusterName) {
+        SortClsClusterRequest request = new SortClsClusterRequest();
         request.setClusterTags(clusterTag);
         request.setName(clusterName);
-        request.setSupportedSinkTypes(supportedSinkTypes);
         request.setInCharges(GLOBAL_OPERATOR);
         return clusterService.save(request, GLOBAL_OPERATOR);
     }
@@ -329,24 +326,12 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
     public void testStandaloneCluster() {
         String clusterTag = "standalone_cluster";
         String clusterName = "test_standalone";
-        String type1 = "type1";
-        String type2 = "type2";
-        String type3 = "type3";
-        Set<String> supportedType = new HashSet<>();
-        supportedType.add(type1);
-        supportedType.add(type2);
-        supportedType.add(type3);
-
-        Integer id = this.saveStandaloneCluster(clusterTag, clusterName, 
supportedType);
+
+        Integer id = this.saveStandaloneCluster(clusterTag, clusterName);
         Assertions.assertNotNull(id);
 
         ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR);
-        Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info);
-
-        Set<String> types = ((SortStandaloneClusterInfo) 
info).getSupportedSinkTypes();
-        Assertions.assertTrue(types.contains(type1));
-        Assertions.assertTrue(types.contains(type2));
-        Assertions.assertTrue(types.contains(type3));
+        Assertions.assertInstanceOf(SortClsClusterInfo.class, info);
     }
 
     @Test
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
index 48a1f00399..ea9a068314 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
@@ -21,20 +21,18 @@ import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
-import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
+import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 
-import com.google.common.collect.Sets;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.List;
-import java.util.Set;
 
 public class StandaloneAutoAssignTest extends ServiceBaseTest {
 
@@ -55,8 +53,7 @@ public class StandaloneAutoAssignTest extends ServiceBaseTest 
{
         Integer id = saveClsSink(groupInfo.getInlongGroupId(), 
streamInfo.getInlongStreamId());
 
         String clusterName = "clsCluster";
-        Set<String> types = Sets.newHashSet(SinkType.CLS, 
SinkType.ELASTICSEARCH);
-        saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName, 
types);
+        saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName);
 
         List<SinkInfo> sinkInfos = 
sinkEntityMapper.selectAllConfig(groupInfo.getInlongGroupId(), null);
         Assertions.assertEquals(1, sinkInfos.size());
@@ -81,11 +78,10 @@ public class StandaloneAutoAssignTest extends 
ServiceBaseTest {
         return clsSinkEntity.getId();
     }
 
-    public Integer saveStandaloneCluster(String clusterTag, String 
clusterName, Set<String> supportedSinkTypes) {
-        SortStandaloneClusterRequest request = new 
SortStandaloneClusterRequest();
+    public Integer saveStandaloneCluster(String clusterTag, String 
clusterName) {
+        SortClsClusterRequest request = new SortClsClusterRequest();
         request.setClusterTags(clusterTag);
         request.setName(clusterName);
-        request.setSupportedSinkTypes(supportedSinkTypes);
         request.setInCharges(GLOBAL_OPERATOR);
         return clusterService.save(request, GLOBAL_OPERATOR);
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index ba5cce64a9..aec6e19919 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -47,6 +47,11 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
     </dependencies>
 

Reply via email to