This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 632a477ee7 [INLONG-9192][Manager] Flat Sort Cluster types (#9193) 632a477ee7 is described below commit 632a477ee73b0a98910b14310b42b7b19938a8b0 Author: vernedeng <verned...@apache.org> AuthorDate: Mon Nov 6 17:18:09 2023 +0800 [INLONG-9192][Manager] Flat Sort Cluster types (#9193) --- .../inlong/manager/common/consts/SinkType.java | 2 +- .../inlong/manager/common/enums/ClusterType.java | 11 ++- .../dao/mapper/InlongClusterEntityMapper.java | 2 - .../mappers/InlongClusterEntityMapper.xml | 10 -- .../cls/SortClsClusterInfo.java} | 19 ++-- .../cls/SortClsClusterRequest.java} | 21 ++-- .../es/SortEsClusterInfo.java} | 21 ++-- .../es/SortEsClusterRequest.java} | 14 +-- .../pulsar/SortPulsarClusterInfo.java} | 20 ++-- .../pulsar/SortPulsarClusterRequest.java} | 17 ++-- .../inlong/manager/pojo/sink/cls/ClsSink.java | 4 - .../manager/pojo/sink/es/ElasticsearchSink.java | 4 +- .../manager/pojo/sink/pulsar/PulsarSink.java | 4 - .../BaseSortClusterDTO.java} | 31 ++---- .../BaseSortClusterInfo.java} | 27 ++---- .../BaseSortClusterRequest.java} | 14 +-- .../cluster/ElasticsearchClusterOperator.java | 93 ------------------ .../service/cluster/InlongClusterOperator.java | 4 +- .../service/cluster/SortClusterOperator.java | 106 +++++++++++++++++++++ .../cluster/SortStandaloneClusterOperator.java | 81 ---------------- .../AbstractStandaloneSinkResourceOperator.java | 5 +- .../service/cluster/InlongClusterServiceTest.java | 29 ++---- .../resource/sink/StandaloneAutoAssignTest.java | 12 +-- .../sort-connectors/tubemq/pom.xml | 5 + 24 files changed, 204 insertions(+), 352 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 965e809f51..f0b215ac30 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -25,7 +25,7 @@ public class SinkType extends StreamType { public static final String HIVE = "HIVE"; public static final String CLICKHOUSE = "CLICKHOUSE"; public static final String HBASE = "HBASE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; + public static final String ELASTICSEARCH = "ES"; public static final String HDFS = "HDFS"; public static final String GREENPLUM = "GREENPLUM"; public static final String MYSQL = "MYSQL"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java index f6a33ff71e..6d010d25cc 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java @@ -32,8 +32,10 @@ public class ClusterType { public static final String PULSAR = "PULSAR"; public static final String DATAPROXY = "DATAPROXY"; public static final String KAFKA = "KAFKA"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String SORTSTANDALONE = "SORTSTANDALONE"; + + public static final String SORT_ES = "SORT_ES"; + public static final String SORT_CLS = "SORT_CLS"; + public static final String SORT_PULSAR = "SORT_PULSAR"; private static final Set<String> TYPE_SET = new HashSet<String>() { @@ -43,8 +45,9 @@ public class ClusterType { add(ClusterType.PULSAR); add(ClusterType.DATAPROXY); add(ClusterType.KAFKA); - add(ClusterType.ELASTICSEARCH); - add(ClusterType.SORTSTANDALONE); + add(ClusterType.SORT_ES); + add(ClusterType.SORT_CLS); + add(ClusterType.SORT_PULSAR); } }; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java index a0d71abc5e..ce2644691d 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java @@ -64,6 +64,4 @@ public interface InlongClusterEntityMapper { int deleteByPrimaryKey(Integer id); - List<InlongClusterEntity> selectStandaloneClusterByType(@Param("sinkType") String sinkType); - } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml index 77f0eb3769..f554faf64a 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml @@ -189,16 +189,6 @@ from inlong_cluster where is_deleted = 0 </select> - <select id="selectStandaloneClusterByType" resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity"> - select - <include refid="Base_Column_List"/> - from inlong_cluster - <where> - type = 'SORTSTANDALONE' - and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag) - and is_deleted = 0 - </where> - </select> <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity"> update inlong_cluster diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java similarity index 72% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java index 51e9536bb3..96eedc4667 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java @@ -15,28 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.cls; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; import io.swagger.annotations.ApiModel; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -/** - * Inlong cluster request for Elasticsearch - */ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@JsonTypeDefine(value = ClusterType.SORT_CLS) +@ApiModel("Inlong cluster info for SortCls") +public class SortClsClusterInfo extends BaseSortClusterInfo { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); + public SortClsClusterInfo() { + this.setType(ClusterType.SORT_CLS); } -} +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java similarity index 66% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java index fd8c10e0d0..5cf261c932 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java @@ -15,32 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.sortstandalone; +package org.apache.inlong.manager.pojo.cluster.sort.cls; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.util.Set; - @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) -@ApiModel("Inlong cluster request for SortStandalone") -public class SortStandaloneClusterRequest extends ClusterRequest { - - @ApiModelProperty(value = "Supported sink types") - private Set<String> supportedSinkTypes; +@JsonTypeDefine(value = ClusterType.SORT_CLS) +@ApiModel("Inlong cluster request for SortCls") +public class SortClsClusterRequest extends BaseSortClusterRequest { - public SortStandaloneClusterRequest() { - this.setType(ClusterType.SORTSTANDALONE); + public SortClsClusterRequest() { + this.setType(ClusterType.SORT_CLS); } - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java similarity index 65% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java index fa56998169..15eb6ef4f0 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java @@ -15,37 +15,28 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.es; import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; import io.swagger.annotations.ApiModel; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import lombok.experimental.SuperBuilder; /** * Elasticsearch cluster info */ @Data -@SuperBuilder @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) +@JsonTypeDefine(value = ClusterType.SORT_ES) @ApiModel("Inlong cluster info for Elasticsearch") -public class ElasticsearchClusterInfo extends ClusterInfo { +public class SortEsClusterInfo extends BaseSortClusterInfo { - public ElasticsearchClusterInfo() { - this.setType(ClusterType.ELASTICSEARCH); - } - - @Override - public ClusterRequest genRequest() { - return CommonBeanUtils.copyProperties(this, ElasticsearchClusterRequest::new); + public SortEsClusterInfo() { + this.setType(ClusterType.SORT_ES); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java similarity index 75% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java index 51e9536bb3..b51e0cd844 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.es; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; import io.swagger.annotations.ApiModel; import lombok.Data; @@ -32,11 +32,11 @@ import lombok.ToString; @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@JsonTypeDefine(value = ClusterType.SORT_ES) +@ApiModel("Inlong cluster request for SortEs") +public class SortEsClusterRequest extends BaseSortClusterRequest { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); + public SortEsClusterRequest() { + this.setType(ClusterType.SORT_ES); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java similarity index 72% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java index 51e9536bb3..0eaffe866a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java @@ -15,28 +15,26 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.pulsar; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; import io.swagger.annotations.ApiModel; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -/** - * Inlong cluster request for Elasticsearch - */ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@JsonTypeDefine(value = ClusterType.SORT_PULSAR) +@ApiModel("Inlong cluster info for SortPulsar") +public class SortPulsarClusterInfo extends BaseSortClusterInfo { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); + public SortPulsarClusterInfo() { + this.setType(ClusterType.SORT_PULSAR); } -} + +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java similarity index 72% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java index 51e9536bb3..f8be4ad614 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java @@ -15,28 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.pulsar; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; import io.swagger.annotations.ApiModel; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -/** - * Inlong cluster request for Elasticsearch - */ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@JsonTypeDefine(value = ClusterType.SORT_PULSAR) +@ApiModel("Inlong cluster request for SortPulsar") +public class SortPulsarClusterRequest extends BaseSortClusterRequest { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); + public SortPulsarClusterRequest() { + this.setType(ClusterType.SORT_PULSAR); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java index 1668da82b8..275a242838 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java @@ -25,18 +25,14 @@ import org.apache.inlong.manager.pojo.sink.StreamSink; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import lombok.experimental.SuperBuilder; /** * Cloud log service sink info */ @Data -@SuperBuilder -@AllArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @ApiModel(value = "Cloud log service sink info") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java index 27a10b3cf5..5f92dddbeb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java @@ -25,7 +25,6 @@ import org.apache.inlong.manager.pojo.sink.StreamSink; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -35,11 +34,10 @@ import lombok.experimental.SuperBuilder; * Elasticsearch sink info */ @Data -@SuperBuilder -@AllArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @ApiModel(value = "Elasticsearch sink info") +@SuperBuilder @JsonTypeDefine(value = SinkType.ELASTICSEARCH) public class ElasticsearchSink extends StreamSink { diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java index a29cd33460..cebb5f8066 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java @@ -25,18 +25,14 @@ import org.apache.inlong.manager.pojo.sink.StreamSink; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import lombok.experimental.SuperBuilder; /** * Pulsar sink info */ @Data -@SuperBuilder -@AllArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @ApiModel(value = "Pulsar sink info") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java similarity index 54% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java index 64c201e75f..6d70512e7d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java @@ -15,49 +15,38 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.sort; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import io.swagger.annotations.ApiModel; -import lombok.Builder; import lombok.Data; -import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; /** - * Elasticsearch cluster info + * Base sort cluster DTO */ @Data -@Builder -@NoArgsConstructor -@ApiModel("Elasticsearch cluster info") -public class ElasticsearchClusterDTO { +@ApiModel("Base sort cluster info") +public class BaseSortClusterDTO { /** * Get the dto instance from the request */ - public static ElasticsearchClusterDTO getFromRequest(ElasticsearchClusterRequest request, String extParams) { - ElasticsearchClusterDTO dto = StringUtils.isNotBlank(extParams) - ? ElasticsearchClusterDTO.getFromJson(extParams) - : new ElasticsearchClusterDTO(); + public static BaseSortClusterDTO getFromRequest(BaseSortClusterRequest request, String extParams) throws Exception { + BaseSortClusterDTO dto = StringUtils.isNotBlank(extParams) + ? BaseSortClusterDTO.getFromJson(extParams) + : new BaseSortClusterDTO(); return CommonBeanUtils.copyProperties(request, dto, true); } /** * Get the dto instance from the JSON string. */ - public static ElasticsearchClusterDTO getFromJson(@NotNull String extParams) { - try { - return JsonUtils.parseObject(extParams, ElasticsearchClusterDTO.class); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, - String.format("parse extParams of Elasticsearch Cluster failure: %s", e.getMessage())); - } + public static BaseSortClusterDTO getFromJson(@NotNull String extParams) { + return JsonUtils.parseObject(extParams, BaseSortClusterDTO.class); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java similarity index 58% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java index ea14d2a56b..c33ec2d1d0 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java @@ -15,38 +15,27 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.sortstandalone; +package org.apache.inlong.manager.pojo.sort; -import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.util.Set; - +/** + * Inlong base sort cluster info + */ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) -@ApiModel("Inlong cluster info for SortStandalone") -public class SortStandaloneClusterInfo extends ClusterInfo { - - @ApiModelProperty(value = "Supported sink types") - private Set<String> supportedSinkTypes; - - public SortStandaloneClusterInfo() { - this.setType(ClusterType.SORTSTANDALONE); - } +@ApiModel("Inlong base sort cluster info") +public class BaseSortClusterInfo extends ClusterInfo { @Override - public ClusterRequest genRequest() { - return CommonBeanUtils.copyProperties(this, SortStandaloneClusterRequest::new); + public BaseSortClusterRequest genRequest() { + return CommonBeanUtils.copyProperties(this, BaseSortClusterRequest::new); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java similarity index 68% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java index 51e9536bb3..1edec5dc5c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java @@ -15,10 +15,8 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.sort; -import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import io.swagger.annotations.ApiModel; @@ -27,16 +25,12 @@ import lombok.EqualsAndHashCode; import lombok.ToString; /** - * Inlong cluster request for Elasticsearch + * Inlong base sort cluster request */ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@ApiModel("Inlong base sort cluster request") +public class BaseSortClusterRequest extends ClusterRequest { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); - } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java deleted file mode 100644 index 9d28208098..0000000000 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.service.cluster; - -import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterDTO; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterInfo; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterRequest; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.HashMap; -import java.util.Map; - -/** - * Elasticsearch cluster operator. - */ -@Service -public class ElasticsearchClusterOperator extends AbstractClusterOperator { - - @Autowired - private ObjectMapper mapper; - - @Override - public Boolean accept(String clusterType) { - return getClusterType().equals(clusterType); - } - - @Override - public String getClusterType() { - return ClusterType.ELASTICSEARCH; - } - - @Override - protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - ElasticsearchClusterRequest esRequest = (ElasticsearchClusterRequest) request; - CommonBeanUtils.copyProperties(esRequest, targetEntity, true); - try { - ElasticsearchClusterDTO dto = - ElasticsearchClusterDTO.getFromRequest(esRequest, targetEntity.getExtParams()); - targetEntity.setExtParams(mapper.writeValueAsString(dto)); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, - String.format("serialize extParams of Elasticsearch Cluster failure: %s", e.getMessage())); - } - } - - @Override - public ClusterInfo getFromEntity(InlongClusterEntity entity) { - if (entity == null) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); - } - ElasticsearchClusterInfo info = new ElasticsearchClusterInfo(); - CommonBeanUtils.copyProperties(entity, info); - if (StringUtils.isNotBlank(entity.getExtParams())) { - ElasticsearchClusterDTO dto = ElasticsearchClusterDTO.getFromJson(entity.getExtParams()); - CommonBeanUtils.copyProperties(dto, info); - } - return info; - } - - @Override - public Object getClusterInfo(InlongClusterEntity entity) { - ElasticsearchClusterInfo elasticsearchClusterInfo = (ElasticsearchClusterInfo) this.getFromEntity(entity); - Map<String, String> map = new HashMap<>(); - map.put("url", elasticsearchClusterInfo.getUrl()); - return map; - } -} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java index 516f894328..db9fb0b336 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java @@ -36,7 +36,9 @@ public interface InlongClusterOperator { * * @return cluster type string */ - String getClusterType(); + default String getClusterType() { + return null; + } /** * Save the inlong cluster info. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java new file mode 100644 index 0000000000..dbb889565e --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.HashSet; +import java.util.Set; + +@Service +public class SortClusterOperator extends AbstractClusterOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterOperator.class); + + private static final Set<String> SORT_CLUSTER_SET = new HashSet<String>() { + + { + add(ClusterType.SORT_CLS); + add(ClusterType.SORT_PULSAR); + add(ClusterType.SORT_ES); + } + }; + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String clusterType) { + return SORT_CLUSTER_SET.contains(clusterType); + } + + @Override + protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { + BaseSortClusterRequest clusterRequest = (BaseSortClusterRequest) request; + CommonBeanUtils.copyProperties(clusterRequest, targetEntity, true); + try { + BaseSortClusterDTO dto = BaseSortClusterDTO.getFromRequest(clusterRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + LOGGER.info("success to set entity for sort cluster"); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + + @Override + public ClusterInfo getFromEntity(InlongClusterEntity entity) { + Preconditions.expectNotNull(entity, ErrorCodeEnum.CLUSTER_NOT_FOUND.getMessage()); + + ClusterInfo sortClusterInfo; + switch (entity.getType()) { + case ClusterType.SORT_CLS: + sortClusterInfo = new SortClsClusterInfo(); + break; + case ClusterType.SORT_PULSAR: + sortClusterInfo = new SortPulsarClusterInfo(); + break; + case ClusterType.SORT_ES: + sortClusterInfo = new SortEsClusterInfo(); + break; + default: + throw new BusinessException("unsupported cluster type " + entity.getType()); + } + + CommonBeanUtils.copyProperties(entity, sortClusterInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + BaseSortClusterDTO dto = BaseSortClusterDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, sortClusterInfo); + } + return sortClusterInfo; + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java deleted file mode 100644 index 3cf12837c2..0000000000 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.service.cluster; - -import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; - -import com.google.common.base.Joiner; -import com.google.common.collect.Sets; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Service; - -import java.util.Set; - -@Slf4j -@Service -public class SortStandaloneClusterOperator extends AbstractClusterOperator { - - @Override - protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - SortStandaloneClusterRequest standaloneRequest = (SortStandaloneClusterRequest) request; - CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true); - Set<String> supportedTypes = standaloneRequest.getSupportedSinkTypes(); - if (CollectionUtils.isNotEmpty(supportedTypes)) { - String extTag = Joiner.on(InlongConstants.COMMA).join(supportedTypes); - targetEntity.setExtTag(extTag); - } - } - - @Override - public Boolean accept(String clusterType) { - return getClusterType().equals(clusterType); - } - - @Override - public String getClusterType() { - return ClusterType.SORTSTANDALONE; - } - - @Override - public ClusterInfo getFromEntity(InlongClusterEntity entity) { - if (entity == null) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); - } - - SortStandaloneClusterInfo clusterInfo = new SortStandaloneClusterInfo(); - CommonBeanUtils.copyProperties(entity, clusterInfo); - String extTag = entity.getExtTag(); - if (StringUtils.isNotBlank(extTag)) { - Set<String> supportedTypes = Sets.newHashSet(extTag.split(InlongConstants.COMMA)); - clusterInfo.setSupportedSinkTypes(supportedTypes); - } - return clusterInfo; - } - -} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index a7e3fb9e62..61187e6f8d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -46,6 +46,8 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso @Autowired private InlongGroupEntityMapper groupEntityMapper; + private static final String SORT_PREFIX = "SORT_"; + private Random rand = new Random(); @VisibleForTesting @@ -77,8 +79,9 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso private String assignFromRelated(String sinkType, String groupId) { InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId); + String sortClusterType = SORT_PREFIX.concat(sinkType); List<InlongClusterEntity> clusters = clusterEntityMapper - .selectStandaloneClusterByType(sinkType).stream() + .selectByKey(null, null, sortClusterType).stream() .filter(cluster -> checkCluster(cluster.getClusterTags(), group.getInlongClusterTag())) .collect(Collectors.toList()); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index 39106fccda..353a244dfb 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -32,8 +32,8 @@ import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -45,9 +45,7 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; /** * Inlong cluster service test for {@link InlongClusterService} @@ -59,11 +57,10 @@ public class InlongClusterServiceTest extends ServiceBaseTest { @Autowired private HeartbeatManager heartbeatManager; - public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set<String> supportedSinkTypes) { - SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + public Integer saveStandaloneCluster(String clusterTag, String clusterName) { + SortClsClusterRequest request = new SortClsClusterRequest(); request.setClusterTags(clusterTag); request.setName(clusterName); - request.setSupportedSinkTypes(supportedSinkTypes); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } @@ -329,24 +326,12 @@ public class InlongClusterServiceTest extends ServiceBaseTest { public void testStandaloneCluster() { String clusterTag = "standalone_cluster"; String clusterName = "test_standalone"; - String type1 = "type1"; - String type2 = "type2"; - String type3 = "type3"; - Set<String> supportedType = new HashSet<>(); - supportedType.add(type1); - supportedType.add(type2); - supportedType.add(type3); - - Integer id = this.saveStandaloneCluster(clusterTag, clusterName, supportedType); + + Integer id = this.saveStandaloneCluster(clusterTag, clusterName); Assertions.assertNotNull(id); ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR); - Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info); - - Set<String> types = ((SortStandaloneClusterInfo) info).getSupportedSinkTypes(); - Assertions.assertTrue(types.contains(type1)); - Assertions.assertTrue(types.contains(type2)); - Assertions.assertTrue(types.contains(type3)); + Assertions.assertInstanceOf(SortClsClusterInfo.class, info); } @Test diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java index 48a1f00399..ea9a068314 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java @@ -21,20 +21,18 @@ import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.ServiceBaseTest; import org.apache.inlong.manager.service.cluster.InlongClusterService; -import com.google.common.collect.Sets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; -import java.util.Set; public class StandaloneAutoAssignTest extends ServiceBaseTest { @@ -55,8 +53,7 @@ public class StandaloneAutoAssignTest extends ServiceBaseTest { Integer id = saveClsSink(groupInfo.getInlongGroupId(), streamInfo.getInlongStreamId()); String clusterName = "clsCluster"; - Set<String> types = Sets.newHashSet(SinkType.CLS, SinkType.ELASTICSEARCH); - saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName, types); + saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName); List<SinkInfo> sinkInfos = sinkEntityMapper.selectAllConfig(groupInfo.getInlongGroupId(), null); Assertions.assertEquals(1, sinkInfos.size()); @@ -81,11 +78,10 @@ public class StandaloneAutoAssignTest extends ServiceBaseTest { return clsSinkEntity.getId(); } - public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set<String> supportedSinkTypes) { - SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + public Integer saveStandaloneCluster(String clusterTag, String clusterName) { + SortClsClusterRequest request = new SortClsClusterRequest(); request.setClusterTags(clusterTag); request.setName(clusterName); - request.setSupportedSinkTypes(supportedSinkTypes); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml index ba5cce64a9..aec6e19919 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml @@ -47,6 +47,11 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> </dependencies>