This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 41c9f405db6bd029bc8472cc5a3fae86ae850986
Author: kipshi <48468934+kip...@users.noreply.github.com>
AuthorDate: Mon Jul 17 16:32:58 2023 +0800

    [INLONG-8539][Manager] Logically remove all sources when agent heartbeat 
contains no group message (#8540)
    
    (cherry picked from commit 72006de19148479600e5178670cef752ee79b285)
---
 .../dao/mapper/StreamSourceEntityMapper.java       | 11 ++++++++
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++++++++++
 .../service/heartbeat/HeartbeatServiceImpl.java    | 24 ++++++++++++++++-
 .../service/core/impl/AgentServiceTest.java        | 30 +++-------------------
 4 files changed, 52 insertions(+), 27 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 7e3594369e..805c73206a 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -193,6 +193,17 @@ public interface StreamSourceEntityMapper {
      */
     void updateStatusByDeleted();
 
+    /**
+     * Logical delete stream source by agentIp, change status at same time.
+     *
+     * @param agentIp ip of agent cluster node
+     * @param status  status to change
+     * @param targetStatus status of stream source now
+     *
+     */
+    void logicalDeleteByAgentIp(@Param("agentIp") String agentIp, 
@Param("status") Integer status,
+            @Param("targetStatus") Integer targetStatus);
+
     /**
      * Physical delete stream sources by group id and stream id
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 43d8045e03..700d740f2c 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -563,6 +563,20 @@
             and status not in (99, 201, 301)
         </where>
     </update>
+    <update id="logicalDeleteByAgentIp">
+        update stream_source
+        <set>
+            is_deleted = id,
+            previous_status = status,
+            status = #{status, jdbcType=INTEGER},
+            version = version + 1
+        </set>
+        where is_deleted = 0
+        and agent_ip = #{agentIp, jdbcType=VARCHAR}
+        <if test="targetStatus != null">
+            and status = #{targetStatus, jdbcType=INTEGER}
+        </if>
+    </update>
     <delete id="deleteByRelatedId">
         delete
         from stream_source
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index d6f042f028..5b2cd5e46d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.heartbeat.GroupHeartbeat;
 import org.apache.inlong.common.heartbeat.StreamHeartbeat;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -30,6 +31,7 @@ import 
org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
 import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
 import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.heartbeat.ComponentHeartbeatResponse;
 import org.apache.inlong.manager.pojo.heartbeat.GroupHeartbeatResponse;
@@ -64,6 +66,8 @@ public class HeartbeatServiceImpl implements HeartbeatService 
{
     @Lazy
     private HeartbeatManager heartbeatManager;
     @Autowired
+    private StreamSourceEntityMapper sourceMapper;
+    @Autowired
     private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
     @Autowired
     private GroupHeartbeatEntityMapper groupHeartbeatMapper;
@@ -82,9 +86,10 @@ public class HeartbeatServiceImpl implements 
HeartbeatService {
         heartbeatManager.reportHeartbeat(request);
         ComponentTypeEnum componentType = 
ComponentTypeEnum.forType(request.getComponentType());
         switch (componentType) {
+            case Agent:
+                return updateAgentHeartbeatOpt(request);
             case Sort:
             case DataProxy:
-            case Agent:
             case Cache:
             case SDK:
                 return updateHeartbeatOpt(request);
@@ -220,6 +225,23 @@ public class HeartbeatServiceImpl implements 
HeartbeatService {
         }
     }
 
+    /**
+     * Update heartbeatMsg for agent , if groupMsg is empty, then logically 
remove all stream source related.
+     * If type of stream_source is file, change status from heartbeat_timeout
+     *
+     * @param request
+     * @return
+     */
+    private Boolean updateAgentHeartbeatOpt(HeartbeatReportRequest request) {
+        // If heartbeatMsg not contain any group ,just delete
+        if (CollectionUtils.isEmpty(request.getGroupHeartbeats()) && 
StringUtils.isNotBlank(request.getIp())) {
+            String agentIp = request.getIp();
+            sourceMapper.logicalDeleteByAgentIp(agentIp, 
SourceStatus.SOURCE_DISABLE.getCode(),
+                    SourceStatus.SOURCE_NORMAL.getCode());
+        }
+        return updateHeartbeatOpt(request);
+    }
+
     /**
      * Default implementation for updating heartbeat
      */
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index bfdbffba8c..6ac8682346 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -236,38 +236,16 @@ class AgentServiceTest extends ServiceBaseTest {
         // unbind group and mismatch
         bindGroup(false, "group1");
         TaskResult t1 = agent.pullTask();
-        Assertions.assertEquals(1, t1.getDataConfigs().size());
-        Assertions.assertEquals(1, t1.getDataConfigs().stream()
-                .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == 
ManagerOpEnum.FROZEN.getType())
-                .collect(Collectors.toSet())
-                .size());
-        DataConfig d1 = t1.getDataConfigs().get(0);
-        Assertions.assertEquals(sourceId, d1.getTaskId());
+        Assertions.assertEquals(0, t1.getDataConfigs().size());
 
         // bind group and rematch
         bindGroup(true, "group1");
         TaskResult t2 = agent.pullTask();
-        Assertions.assertEquals(0, t2.getDataConfigs().size());
-        Assertions.assertEquals(0, t2.getDataConfigs().stream()
-                .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == 
ManagerOpEnum.ACTIVE.getType())
-                .collect(Collectors.toSet())
-                .size());
-
-        // update group to config success
-        final String groupId = sourceService.listSource(groupStream.getLeft(), 
groupStream.getRight()).stream()
-                .filter(source -> source.getTemplateId() != null)
-                .findAny()
-                .get()
-                .getInlongGroupId();
-        groupMapper.updateStatus(groupId, 
GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
-        TaskResult t3 = agent.pullTask();
-        Assertions.assertEquals(1, t3.getDataConfigs().size());
-        Assertions.assertEquals(1, t3.getDataConfigs().stream()
-                .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == 
ManagerOpEnum.ACTIVE.getType())
+        Assertions.assertEquals(1, t2.getDataConfigs().size());
+        Assertions.assertEquals(1, t2.getDataConfigs().stream()
+                .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == 
ManagerOpEnum.ADD.getType())
                 .collect(Collectors.toSet())
                 .size());
-        DataConfig d3 = t3.getDataConfigs().get(0);
-        Assertions.assertEquals(sourceId, d3.getTaskId());
     }
 
     /**

Reply via email to