This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9853017ccf [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone cluster (#9840) 9853017ccf is described below commit 9853017ccfcc0aad130d9231feb41d7451ed388b Author: vernedeng <verned...@apache.org> AuthorDate: Tue Mar 19 18:52:46 2024 +0800 [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone cluster (#9840) --- .../inlong/manager/dao/mapper/StreamSinkEntityMapper.java | 2 +- .../src/main/resources/mappers/StreamSinkEntityMapper.xml | 12 +++++++++--- .../sink/AbstractStandaloneSinkResourceOperator.java | 12 ++++++------ 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java index 0687972c0b..f997f06460 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java @@ -161,6 +161,6 @@ public interface StreamSinkEntityMapper { */ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList); - String selectAssignedCluster(@Param("dataNodeName") String dataNodeName); + String selectAssignedCluster(@Param("dataNodeName") String dataNodeName, @Param("clusterTag") String clusterTag); } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml index 2cbc14e1dd..a7bf8ac3c7 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml @@ -394,10 +394,16 @@ </select> <select id="selectAssignedCluster" resultType="java.lang.String"> select inlong_cluster_name - from stream_sink + from stream_sink s + left join inlong_cluster c + on s.inlong_cluster_name = c.name <where> - data_node_name = #{dataNodeName, jdbcType=VARCHAR} - and is_deleted = 0 + s.data_node_name = #{dataNodeName, jdbcType=VARCHAR} + <if test="clusterTag != null and clusterTag != ''"> + and find_in_set(#{clusterTag, jdbcType=VARCHAR}, c.cluster_tags) + </if> + and s.is_deleted = 0 + and c.is_deleted = 0 </where> group by inlong_cluster_name order by count(*) asc diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index af16bdec28..24aeeb9224 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -100,17 +100,17 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso } private String assignOneCluster(SinkInfo sinkInfo) { + InlongGroupEntity group = groupEntityMapper.selectByGroupId(sinkInfo.getInlongGroupId()); return StringUtils - .firstNonBlank(assignFromExist(sinkInfo.getDataNodeName()), - assignFromRelated(sinkInfo.getSinkType(), sinkInfo.getInlongGroupId())); + .firstNonBlank(assignFromExist(sinkInfo.getDataNodeName(), group.getInlongClusterTag()), + assignFromRelated(sinkInfo.getSinkType(), group)); } - private String assignFromExist(String dataNodeName) { - return sinkEntityMapper.selectAssignedCluster(dataNodeName); + private String assignFromExist(String dataNodeName, String clusterTag) { + return sinkEntityMapper.selectAssignedCluster(dataNodeName, clusterTag); } - private String assignFromRelated(String sinkType, String groupId) { - InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId); + private String assignFromRelated(String sinkType, InlongGroupEntity group) { String sortClusterType = SinkType.relatedSortClusterType(sinkType); if (StringUtils.isBlank(sortClusterType)) { log.error("find no relate sort cluster type for sink type={}", sinkType);