This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 41c9f405db6bd029bc8472cc5a3fae86ae850986 Author: kipshi <48468934+kip...@users.noreply.github.com> AuthorDate: Mon Jul 17 16:32:58 2023 +0800 [INLONG-8539][Manager] Logically remove all sources when agent heartbeat contains no group message (#8540) (cherry picked from commit 72006de19148479600e5178670cef752ee79b285) --- .../dao/mapper/StreamSourceEntityMapper.java | 11 ++++++++ .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++++++++++ .../service/heartbeat/HeartbeatServiceImpl.java | 24 ++++++++++++++++- .../service/core/impl/AgentServiceTest.java | 30 +++------------------- 4 files changed, 52 insertions(+), 27 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index 7e3594369e..805c73206a 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -193,6 +193,17 @@ public interface StreamSourceEntityMapper { */ void updateStatusByDeleted(); + /** + * Logical delete stream source by agentIp, change status at same time. + * + * @param agentIp ip of agent cluster node + * @param status status to change + * @param targetStatus status of stream source now + * + */ + void logicalDeleteByAgentIp(@Param("agentIp") String agentIp, @Param("status") Integer status, + @Param("targetStatus") Integer targetStatus); + /** * Physical delete stream sources by group id and stream id */ diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 43d8045e03..700d740f2c 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -563,6 +563,20 @@ and status not in (99, 201, 301) </where> </update> + <update id="logicalDeleteByAgentIp"> + update stream_source + <set> + is_deleted = id, + previous_status = status, + status = #{status, jdbcType=INTEGER}, + version = version + 1 + </set> + where is_deleted = 0 + and agent_ip = #{agentIp, jdbcType=VARCHAR} + <if test="targetStatus != null"> + and status = #{targetStatus, jdbcType=INTEGER} + </if> + </update> <delete id="deleteByRelatedId"> delete from stream_source diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java index d6f042f028..5b2cd5e46d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.inlong.common.enums.ComponentTypeEnum; import org.apache.inlong.common.heartbeat.GroupHeartbeat; import org.apache.inlong.common.heartbeat.StreamHeartbeat; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; @@ -30,6 +31,7 @@ import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity; import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper; import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper; +import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.heartbeat.ComponentHeartbeatResponse; import org.apache.inlong.manager.pojo.heartbeat.GroupHeartbeatResponse; @@ -64,6 +66,8 @@ public class HeartbeatServiceImpl implements HeartbeatService { @Lazy private HeartbeatManager heartbeatManager; @Autowired + private StreamSourceEntityMapper sourceMapper; + @Autowired private ComponentHeartbeatEntityMapper componentHeartbeatMapper; @Autowired private GroupHeartbeatEntityMapper groupHeartbeatMapper; @@ -82,9 +86,10 @@ public class HeartbeatServiceImpl implements HeartbeatService { heartbeatManager.reportHeartbeat(request); ComponentTypeEnum componentType = ComponentTypeEnum.forType(request.getComponentType()); switch (componentType) { + case Agent: + return updateAgentHeartbeatOpt(request); case Sort: case DataProxy: - case Agent: case Cache: case SDK: return updateHeartbeatOpt(request); @@ -220,6 +225,23 @@ public class HeartbeatServiceImpl implements HeartbeatService { } } + /** + * Update heartbeatMsg for agent , if groupMsg is empty, then logically remove all stream source related. + * If type of stream_source is file, change status from heartbeat_timeout + * + * @param request + * @return + */ + private Boolean updateAgentHeartbeatOpt(HeartbeatReportRequest request) { + // If heartbeatMsg not contain any group ,just delete + if (CollectionUtils.isEmpty(request.getGroupHeartbeats()) && StringUtils.isNotBlank(request.getIp())) { + String agentIp = request.getIp(); + sourceMapper.logicalDeleteByAgentIp(agentIp, SourceStatus.SOURCE_DISABLE.getCode(), + SourceStatus.SOURCE_NORMAL.getCode()); + } + return updateHeartbeatOpt(request); + } + /** * Default implementation for updating heartbeat */ diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java index bfdbffba8c..6ac8682346 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java @@ -236,38 +236,16 @@ class AgentServiceTest extends ServiceBaseTest { // unbind group and mismatch bindGroup(false, "group1"); TaskResult t1 = agent.pullTask(); - Assertions.assertEquals(1, t1.getDataConfigs().size()); - Assertions.assertEquals(1, t1.getDataConfigs().stream() - .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == ManagerOpEnum.FROZEN.getType()) - .collect(Collectors.toSet()) - .size()); - DataConfig d1 = t1.getDataConfigs().get(0); - Assertions.assertEquals(sourceId, d1.getTaskId()); + Assertions.assertEquals(0, t1.getDataConfigs().size()); // bind group and rematch bindGroup(true, "group1"); TaskResult t2 = agent.pullTask(); - Assertions.assertEquals(0, t2.getDataConfigs().size()); - Assertions.assertEquals(0, t2.getDataConfigs().stream() - .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == ManagerOpEnum.ACTIVE.getType()) - .collect(Collectors.toSet()) - .size()); - - // update group to config success - final String groupId = sourceService.listSource(groupStream.getLeft(), groupStream.getRight()).stream() - .filter(source -> source.getTemplateId() != null) - .findAny() - .get() - .getInlongGroupId(); - groupMapper.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR); - TaskResult t3 = agent.pullTask(); - Assertions.assertEquals(1, t3.getDataConfigs().size()); - Assertions.assertEquals(1, t3.getDataConfigs().stream() - .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == ManagerOpEnum.ACTIVE.getType()) + Assertions.assertEquals(1, t2.getDataConfigs().size()); + Assertions.assertEquals(1, t2.getDataConfigs().stream() + .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) == ManagerOpEnum.ADD.getType()) .collect(Collectors.toSet()) .size()); - DataConfig d3 = t3.getDataConfigs().get(0); - Assertions.assertEquals(sourceId, d3.getTaskId()); } /**