This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 97e12593e2 [INLONG-10233][Manager] Report heartbeat removal port 
restriction (#10234)
97e12593e2 is described below

commit 97e12593e2b622b2f0ff59f7b98b1bf1d1f8c57f
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon May 20 09:40:29 2024 +0800

    [INLONG-10233][Manager] Report heartbeat removal port restriction (#10234)
---
 .../mappers/InlongClusterNodeEntityMapper.xml      |  7 ++++-
 .../service/heartbeat/HeartbeatManager.java        | 33 +++++++++++++++-------
 2 files changed, 29 insertions(+), 11 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index e3d54df45a..d0016a28e3 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -95,7 +95,12 @@
         and parent_id = #{parentId,jdbcType=INTEGER}
         and type = #{type, jdbcType=VARCHAR}
         and ip = #{ip, jdbcType=VARCHAR}
-        and port = #{port, jdbcType=INTEGER}
+        <if test="port != null and port != ''">
+            and port = #{port, jdbcType=INTEGER}
+        </if>
+        <if test="port == null or port == ''">
+            and port is null
+        </if>
         <if test="protocolType != null and protocolType != ''">
             and protocol_type = #{protocolType, jdbcType=VARCHAR}
         </if>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 53466f13cc..fd64f17a64 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -165,30 +165,39 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         }
 
         // protocolType may be null, and the protocolTypes' length may be less 
than ports' length
-        String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
         String[] ips = heartbeat.getIp().split(InlongConstants.COMMA);
+        String port = heartbeat.getPort();
+        String[] ports = null;
+        if (StringUtils.isNotBlank(port)) {
+            ports = port.split(InlongConstants.COMMA);
+            if (ports.length < ips.length) {
+                ports = null;
+            }
+        }
         String[] reportSourceTypes = null;
-        if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) && 
ports.length > 1) {
+        if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) && 
ips.length > 1) {
             reportSourceTypes = 
heartbeat.getReportSourceType().split(InlongConstants.COMMA);
-            if (reportSourceTypes.length < ports.length) {
+            if (reportSourceTypes.length < ips.length) {
                 reportSourceTypes = null;
             }
         }
         String protocolType = heartbeat.getProtocolType();
         String[] protocolTypes = null;
-        if (StringUtils.isNotBlank(protocolType) && ports.length > 1) {
+        if (StringUtils.isNotBlank(protocolType) && ips.length > 1) {
             protocolTypes = protocolType.split(InlongConstants.COMMA);
-            if (protocolTypes.length < ports.length) {
+            if (protocolTypes.length < ips.length) {
                 protocolTypes = null;
             }
         }
 
         int handlerNum = 0;
-        for (int i = 0; i < ports.length; i++) {
+        for (int i = 0; i < ips.length; i++) {
             // deep clone the heartbeat
             HeartbeatMsg heartbeatMsg = 
JsonUtils.parseObject(JsonUtils.toJsonByte(heartbeat), HeartbeatMsg.class);
             assert heartbeatMsg != null;
-            heartbeatMsg.setPort(ports[i].trim());
+            if (StringUtils.isNotBlank(port)) {
+                heartbeatMsg.setPort(ports[i].trim());
+            }
             heartbeatMsg.setIp(ips[i].trim());
             if (reportSourceTypes != null) {
                 heartbeatMsg.setReportSourceType(reportSourceTypes[i].trim());
@@ -228,7 +237,7 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         }
 
         // if the heartbeat already exists, or does not exist but 
insert/update success, then put it into the cache
-        if (lastHeartbeat == null || handlerNum == ports.length) {
+        if (lastHeartbeat == null || handlerNum == ips.length) {
             heartbeatCache.put(componentHeartbeat, heartbeat);
         }
     }
@@ -292,7 +301,9 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         nodeRequest.setParentId(clusterInfo.getId());
         nodeRequest.setType(heartbeat.getComponentType());
         nodeRequest.setIp(heartbeat.getIp());
-        nodeRequest.setPort(Integer.valueOf(heartbeat.getPort()));
+        if (StringUtils.isNotBlank(heartbeat.getPort())) {
+            nodeRequest.setPort(Integer.valueOf(heartbeat.getPort()));
+        }
         nodeRequest.setProtocolType(heartbeat.getProtocolType());
         return clusterNodeMapper.selectByUniqueKey(nodeRequest);
     }
@@ -302,7 +313,9 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         clusterNode.setParentId(clusterInfo.getId());
         clusterNode.setType(heartbeat.getComponentType());
         clusterNode.setIp(heartbeat.getIp());
-        clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
+        if (StringUtils.isNotBlank(heartbeat.getPort())) {
+            clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
+        }
         clusterNode.setProtocolType(heartbeat.getProtocolType());
         clusterNode.setNodeLoad(heartbeat.getLoad());
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());

Reply via email to