This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9853017ccf [INLONG-9839][Manager] Optimize the auto assign logic of 
SortStandalone cluster (#9840)
9853017ccf is described below

commit 9853017ccfcc0aad130d9231feb41d7451ed388b
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Mar 19 18:52:46 2024 +0800

    [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone 
cluster (#9840)
---
 .../inlong/manager/dao/mapper/StreamSinkEntityMapper.java    |  2 +-
 .../src/main/resources/mappers/StreamSinkEntityMapper.xml    | 12 +++++++++---
 .../sink/AbstractStandaloneSinkResourceOperator.java         | 12 ++++++------
 3 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 0687972c0b..f997f06460 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -161,6 +161,6 @@ public interface StreamSinkEntityMapper {
      */
     int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
 
-    String selectAssignedCluster(@Param("dataNodeName") String dataNodeName);
+    String selectAssignedCluster(@Param("dataNodeName") String dataNodeName, 
@Param("clusterTag") String clusterTag);
 
 }
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 2cbc14e1dd..a7bf8ac3c7 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -394,10 +394,16 @@
     </select>
     <select id="selectAssignedCluster" resultType="java.lang.String">
         select inlong_cluster_name
-        from stream_sink
+        from stream_sink s
+        left join inlong_cluster c
+        on s.inlong_cluster_name = c.name
         <where>
-            data_node_name = #{dataNodeName, jdbcType=VARCHAR}
-            and is_deleted = 0
+            s.data_node_name = #{dataNodeName, jdbcType=VARCHAR}
+            <if test="clusterTag != null and clusterTag != ''">
+                and find_in_set(#{clusterTag, jdbcType=VARCHAR}, 
c.cluster_tags)
+            </if>
+            and s.is_deleted = 0
+            and c.is_deleted = 0
         </where>
         group by inlong_cluster_name
         order by count(*) asc
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index af16bdec28..24aeeb9224 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -100,17 +100,17 @@ public abstract class 
AbstractStandaloneSinkResourceOperator implements SinkReso
     }
 
     private String assignOneCluster(SinkInfo sinkInfo) {
+        InlongGroupEntity group = 
groupEntityMapper.selectByGroupId(sinkInfo.getInlongGroupId());
         return StringUtils
-                .firstNonBlank(assignFromExist(sinkInfo.getDataNodeName()),
-                        assignFromRelated(sinkInfo.getSinkType(), 
sinkInfo.getInlongGroupId()));
+                .firstNonBlank(assignFromExist(sinkInfo.getDataNodeName(), 
group.getInlongClusterTag()),
+                        assignFromRelated(sinkInfo.getSinkType(), group));
     }
 
-    private String assignFromExist(String dataNodeName) {
-        return sinkEntityMapper.selectAssignedCluster(dataNodeName);
+    private String assignFromExist(String dataNodeName, String clusterTag) {
+        return sinkEntityMapper.selectAssignedCluster(dataNodeName, 
clusterTag);
     }
 
-    private String assignFromRelated(String sinkType, String groupId) {
-        InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
+    private String assignFromRelated(String sinkType, InlongGroupEntity group) 
{
         String sortClusterType = SinkType.relatedSortClusterType(sinkType);
         if (StringUtils.isBlank(sortClusterType)) {
             log.error("find no relate sort cluster type for sink type={}", 
sinkType);

Reply via email to