This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 97e12593e2 [INLONG-10233][Manager] Report heartbeat removal port restriction (#10234) 97e12593e2 is described below commit 97e12593e2b622b2f0ff59f7b98b1bf1d1f8c57f Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Mon May 20 09:40:29 2024 +0800 [INLONG-10233][Manager] Report heartbeat removal port restriction (#10234) --- .../mappers/InlongClusterNodeEntityMapper.xml | 7 ++++- .../service/heartbeat/HeartbeatManager.java | 33 +++++++++++++++------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index e3d54df45a..d0016a28e3 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -95,7 +95,12 @@ and parent_id = #{parentId,jdbcType=INTEGER} and type = #{type, jdbcType=VARCHAR} and ip = #{ip, jdbcType=VARCHAR} - and port = #{port, jdbcType=INTEGER} + <if test="port != null and port != ''"> + and port = #{port, jdbcType=INTEGER} + </if> + <if test="port == null or port == ''"> + and port is null + </if> <if test="protocolType != null and protocolType != ''"> and protocol_type = #{protocolType, jdbcType=VARCHAR} </if> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 53466f13cc..fd64f17a64 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -165,30 +165,39 @@ public class HeartbeatManager implements AbstractHeartbeatManager { } // protocolType may be null, and the protocolTypes' length may be less than ports' length - String[] ports = heartbeat.getPort().split(InlongConstants.COMMA); String[] ips = heartbeat.getIp().split(InlongConstants.COMMA); + String port = heartbeat.getPort(); + String[] ports = null; + if (StringUtils.isNotBlank(port)) { + ports = port.split(InlongConstants.COMMA); + if (ports.length < ips.length) { + ports = null; + } + } String[] reportSourceTypes = null; - if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) && ports.length > 1) { + if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) && ips.length > 1) { reportSourceTypes = heartbeat.getReportSourceType().split(InlongConstants.COMMA); - if (reportSourceTypes.length < ports.length) { + if (reportSourceTypes.length < ips.length) { reportSourceTypes = null; } } String protocolType = heartbeat.getProtocolType(); String[] protocolTypes = null; - if (StringUtils.isNotBlank(protocolType) && ports.length > 1) { + if (StringUtils.isNotBlank(protocolType) && ips.length > 1) { protocolTypes = protocolType.split(InlongConstants.COMMA); - if (protocolTypes.length < ports.length) { + if (protocolTypes.length < ips.length) { protocolTypes = null; } } int handlerNum = 0; - for (int i = 0; i < ports.length; i++) { + for (int i = 0; i < ips.length; i++) { // deep clone the heartbeat HeartbeatMsg heartbeatMsg = JsonUtils.parseObject(JsonUtils.toJsonByte(heartbeat), HeartbeatMsg.class); assert heartbeatMsg != null; - heartbeatMsg.setPort(ports[i].trim()); + if (StringUtils.isNotBlank(port)) { + heartbeatMsg.setPort(ports[i].trim()); + } heartbeatMsg.setIp(ips[i].trim()); if (reportSourceTypes != null) { heartbeatMsg.setReportSourceType(reportSourceTypes[i].trim()); @@ -228,7 +237,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { } // if the heartbeat already exists, or does not exist but insert/update success, then put it into the cache - if (lastHeartbeat == null || handlerNum == ports.length) { + if (lastHeartbeat == null || handlerNum == ips.length) { heartbeatCache.put(componentHeartbeat, heartbeat); } } @@ -292,7 +301,9 @@ public class HeartbeatManager implements AbstractHeartbeatManager { nodeRequest.setParentId(clusterInfo.getId()); nodeRequest.setType(heartbeat.getComponentType()); nodeRequest.setIp(heartbeat.getIp()); - nodeRequest.setPort(Integer.valueOf(heartbeat.getPort())); + if (StringUtils.isNotBlank(heartbeat.getPort())) { + nodeRequest.setPort(Integer.valueOf(heartbeat.getPort())); + } nodeRequest.setProtocolType(heartbeat.getProtocolType()); return clusterNodeMapper.selectByUniqueKey(nodeRequest); } @@ -302,7 +313,9 @@ public class HeartbeatManager implements AbstractHeartbeatManager { clusterNode.setParentId(clusterInfo.getId()); clusterNode.setType(heartbeat.getComponentType()); clusterNode.setIp(heartbeat.getIp()); - clusterNode.setPort(Integer.valueOf(heartbeat.getPort())); + if (StringUtils.isNotBlank(heartbeat.getPort())) { + clusterNode.setPort(Integer.valueOf(heartbeat.getPort())); + } clusterNode.setProtocolType(heartbeat.getProtocolType()); clusterNode.setNodeLoad(heartbeat.getLoad()); clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());