This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dea04d65 [INLONG-7912][Manager] Only response DataProxy nodes in 
normal status (#7913)
5dea04d65 is described below

commit 5dea04d6526bd77dccaf28c5d4cce847c028f154
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Fri May 5 21:25:23 2023 +0800

    [INLONG-7912][Manager] Only response DataProxy nodes in normal status 
(#7913)
---
 .../service/cluster/InlongClusterServiceImpl.java  |  6 +++++
 .../cluster/node/AbstractClusterNodeOperator.java  |  2 ++
 .../service/cluster/InlongClusterServiceTest.java  | 27 ++++++++++++++--------
 3 files changed, 25 insertions(+), 10 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index be32bed28..d8fb62fbc 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -1168,6 +1169,11 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         // TODO consider the data proxy load and re-balance
         List<DataProxyNodeInfo> nodeList = new ArrayList<>();
         for (InlongClusterNodeEntity nodeEntity : nodeEntities) {
+            if (Objects.equals(nodeEntity.getStatus(), 
NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) {
+                LOGGER.debug("dataproxy node was timeout, parentId={} ip={} 
port={}", nodeEntity.getParentId(),
+                        nodeEntity.getIp(), nodeEntity.getPort());
+                continue;
+            }
             DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
             nodeInfo.setId(nodeEntity.getId());
             nodeInfo.setIp(nodeEntity.getIp());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
index b32c9ca75..85200c0bf 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.cluster.node;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
@@ -49,6 +50,7 @@ public abstract class AbstractClusterNodeOperator implements 
InlongClusterNodeOp
 
         entity.setCreator(operator);
         entity.setModifier(operator);
+        entity.setStatus(NodeStatus.HEARTBEAT_TIMEOUT.getStatus());
         clusterNodeMapper.insert(entity);
 
         return entity.getId();
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index c4dabbbee..36aa96e0d 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -174,12 +174,13 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
         return clusterService.updateNode(request, GLOBAL_OPERATOR);
     }
 
-    private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, 
String port, String type) {
+    private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, 
String port, String type,
+            String protocolType) {
         HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
         heartbeatMsg.setIp(ip);
         heartbeatMsg.setPort(port);
         heartbeatMsg.setClusterTag("default_cluster");
-        heartbeatMsg.setProtocolType(ProtocolType.HTTP);
+        heartbeatMsg.setProtocolType(protocolType);
         heartbeatMsg.setLoad(0xFFFF);
         heartbeatMsg.setComponentType(type);
         heartbeatMsg.setReportTime(System.currentTimeMillis());
@@ -330,14 +331,6 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
         Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, 
port2, ProtocolType.TCP);
         Assertions.assertNotNull(nodeId2);
 
-        // report heartbeat
-        HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, 
String.valueOf(port1),
-                ComponentTypeEnum.DataProxy.getType());
-        heartbeatManager.reportHeartbeat(msg1);
-        HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, 
String.valueOf(port2),
-                ComponentTypeEnum.DataProxy.getType());
-        heartbeatManager.reportHeartbeat(msg2);
-
         // create an inlong group which use the clusterTag
         String inlongGroupId = "test_cluster_tag_group";
         InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId, 
MQType.PULSAR);
@@ -345,6 +338,13 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
         updateGroupInfo.setInlongClusterTag(clusterTag);
         groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR);
 
+        // report heartbeat
+        HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, 
String.valueOf(port1),
+                ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
+        heartbeatManager.reportHeartbeat(msg1);
+        HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, 
String.valueOf(port2),
+                ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
+        heartbeatManager.reportHeartbeat(msg2);
         // get the data proxy nodes, the first port should is p1, second port 
is p2
         DataProxyNodeResponse nodeResponse = 
clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP);
         List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList();
@@ -353,6 +353,13 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
         Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
         Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());
 
+        // report heartbeat
+        HeartbeatMsg msg3 = createHeartbeatMsg(clusterName, ip, 
String.valueOf(port1),
+                ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
+        heartbeatManager.reportHeartbeat(msg3);
+        HeartbeatMsg msg4 = createHeartbeatMsg(clusterName, ip, 
String.valueOf(port2),
+                ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
+        heartbeatManager.reportHeartbeat(msg4);
         nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, 
ProtocolType.HTTP);
         nodeInfoList = nodeResponse.getNodeList();
         nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));

Reply via email to