This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 17cb04edc9 [INLONG-11142][Manager] Fix the problem of data add task not scheduled for cleaning (#11143) 17cb04edc9 is described below commit 17cb04edc909e7525dfd6927b6eaecb733b5c584 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Sep 19 17:23:52 2024 +0800 [INLONG-11142][Manager] Fix the problem of data add task not scheduled for cleaning (#11143) --- .../manager/dao/mapper/StreamSourceEntityMapper.java | 2 ++ .../main/resources/mappers/StreamSourceEntityMapper.xml | 10 ++++++++++ .../inlong/manager/service/core/impl/AgentServiceImpl.java | 14 +++++++++++++- .../manager/service/source/file/FileSourceOperator.java | 3 ++- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index 7340d182c3..a05f15dda7 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -67,6 +67,8 @@ public interface StreamSourceEntityMapper { */ int selectDataAddTaskCount(@Param("groupId") String groupId, @Param("streamId") String streamId); + List<StreamSourceEntity> selectByByTimeout(@Param("retentionDays") Integer retentionDays); + /** * Paging query source list based on conditions */ diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 732a5cfd56..c68f20e167 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -133,6 +133,16 @@ and task_map_id is not NULL </where> </select> + <select id="selectByByTimeout" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> + select + <include refid="Base_Column_List"/> + from stream_source + <where> + is_deleted = 0 + and task_map_id is not null + and modify_time <= DATE_ADD(NOW(), INTERVAL -#{retentionDays, jdbcType=INTEGER} DAY) + </where> + </select> <select id="selectByCondition" parameterType="org.apache.inlong.manager.pojo.source.SourcePageRequest" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 595b53576b..5f7958a3e1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -67,6 +67,7 @@ import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.module.ModuleDTO; +import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.cluster.node.AgentClusterNodeOperator; @@ -229,8 +230,19 @@ public class AgentServiceImpl implements AgentService { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(factory); executor.scheduleWithFixedDelay(() -> { try { + List<StreamSourceEntity> needDeletedList = sourceMapper.selectByByTimeout(retentionDays); sourceMapper.logicalDeleteByTimeout(retentionDays); - LOGGER.info("clean sub task successfully"); + if (CollectionUtils.isNotEmpty(needDeletedList)) { + for (StreamSourceEntity sourceEntity : needDeletedList) { + LOGGER.info("begin to clean sub task for source={}", sourceEntity); + StreamSourceOperator sourceOperator = + operatorFactory.getInstance(sourceEntity.getSourceType()); + SourceRequest request = + CommonBeanUtils.copyProperties(sourceEntity, SourceRequest::new, true); + sourceOperator.updateAgentTaskConfig(request, sourceEntity.getModifier()); + LOGGER.info("success to clean sub task successfully, ={}", sourceEntity.getId()); + } + } } catch (Throwable t) { LOGGER.error("clean sub task error", t); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java index 502808602b..60db39e546 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java @@ -124,7 +124,8 @@ public class FileSourceOperator extends AbstractSourceOperator { StreamSourceEntity dataAddTaskEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); dataAddTaskEntity.setId(null); - dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1)); + dataAddTaskEntity.setSourceName( + sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1) + "-" + sourceEntity.getId()); dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto)); dataAddTaskEntity.setTaskMapId(sourceEntity.getId()); Integer id = sourceMapper.insert(dataAddTaskEntity);