This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5dea04d65 [INLONG-7912][Manager] Only response DataProxy nodes in normal status (#7913) 5dea04d65 is described below commit 5dea04d6526bd77dccaf28c5d4cce847c028f154 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri May 5 21:25:23 2023 +0800 [INLONG-7912][Manager] Only response DataProxy nodes in normal status (#7913) --- .../service/cluster/InlongClusterServiceImpl.java | 6 +++++ .../cluster/node/AbstractClusterNodeOperator.java | 2 ++ .../service/cluster/InlongClusterServiceTest.java | 27 ++++++++++++++-------- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index be32bed28..d8fb62fbc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -39,6 +39,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.enums.UserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -1168,6 +1169,11 @@ public class InlongClusterServiceImpl implements InlongClusterService { // TODO consider the data proxy load and re-balance List<DataProxyNodeInfo> nodeList = new ArrayList<>(); for (InlongClusterNodeEntity nodeEntity : nodeEntities) { + if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) { + LOGGER.debug("dataproxy node was timeout, parentId={} ip={} port={}", nodeEntity.getParentId(), + nodeEntity.getIp(), nodeEntity.getPort()); + continue; + } DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo(); nodeInfo.setId(nodeEntity.getId()); nodeInfo.setIp(nodeEntity.getIp()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java index b32c9ca75..85200c0bf 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.cluster.node; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; @@ -49,6 +50,7 @@ public abstract class AbstractClusterNodeOperator implements InlongClusterNodeOp entity.setCreator(operator); entity.setModifier(operator); + entity.setStatus(NodeStatus.HEARTBEAT_TIMEOUT.getStatus()); clusterNodeMapper.insert(entity); return entity.getId(); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index c4dabbbee..36aa96e0d 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -174,12 +174,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest { return clusterService.updateNode(request, GLOBAL_OPERATOR); } - private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type) { + private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type, + String protocolType) { HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(ip); heartbeatMsg.setPort(port); heartbeatMsg.setClusterTag("default_cluster"); - heartbeatMsg.setProtocolType(ProtocolType.HTTP); + heartbeatMsg.setProtocolType(protocolType); heartbeatMsg.setLoad(0xFFFF); heartbeatMsg.setComponentType(type); heartbeatMsg.setReportTime(System.currentTimeMillis()); @@ -330,14 +331,6 @@ public class InlongClusterServiceTest extends ServiceBaseTest { Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP); Assertions.assertNotNull(nodeId2); - // report heartbeat - HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1), - ComponentTypeEnum.DataProxy.getType()); - heartbeatManager.reportHeartbeat(msg1); - HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2), - ComponentTypeEnum.DataProxy.getType()); - heartbeatManager.reportHeartbeat(msg2); - // create an inlong group which use the clusterTag String inlongGroupId = "test_cluster_tag_group"; InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId, MQType.PULSAR); @@ -345,6 +338,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest { updateGroupInfo.setInlongClusterTag(clusterTag); groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR); + // report heartbeat + HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP); + heartbeatManager.reportHeartbeat(msg1); + HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP); + heartbeatManager.reportHeartbeat(msg2); // get the data proxy nodes, the first port should is p1, second port is p2 DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP); List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList(); @@ -353,6 +353,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest { Assertions.assertEquals(port1, nodeInfoList.get(0).getPort()); Assertions.assertEquals(port2, nodeInfoList.get(1).getPort()); + // report heartbeat + HeartbeatMsg msg3 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP); + heartbeatManager.reportHeartbeat(msg3); + HeartbeatMsg msg4 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP); + heartbeatManager.reportHeartbeat(msg4); nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP); nodeInfoList = nodeResponse.getNodeList(); nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));