This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 17cb04edc9 [INLONG-11142][Manager] Fix the problem of data add task 
not scheduled for cleaning (#11143)
17cb04edc9 is described below

commit 17cb04edc909e7525dfd6927b6eaecb733b5c584
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Sep 19 17:23:52 2024 +0800

    [INLONG-11142][Manager] Fix the problem of data add task not scheduled for 
cleaning (#11143)
---
 .../manager/dao/mapper/StreamSourceEntityMapper.java       |  2 ++
 .../main/resources/mappers/StreamSourceEntityMapper.xml    | 10 ++++++++++
 .../inlong/manager/service/core/impl/AgentServiceImpl.java | 14 +++++++++++++-
 .../manager/service/source/file/FileSourceOperator.java    |  3 ++-
 4 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 7340d182c3..a05f15dda7 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -67,6 +67,8 @@ public interface StreamSourceEntityMapper {
      */
     int selectDataAddTaskCount(@Param("groupId") String groupId, 
@Param("streamId") String streamId);
 
+    List<StreamSourceEntity> selectByByTimeout(@Param("retentionDays") Integer 
retentionDays);
+
     /**
      * Paging query source list based on conditions
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 732a5cfd56..c68f20e167 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -133,6 +133,16 @@
             and task_map_id is not NULL
         </where>
     </select>
+    <select id="selectByByTimeout" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            is_deleted = 0
+            and task_map_id is not null
+            and modify_time &lt;= DATE_ADD(NOW(), INTERVAL -#{retentionDays, 
jdbcType=INTEGER} DAY)
+        </where>
+    </select>
     <select id="selectByCondition"
             
parameterType="org.apache.inlong.manager.pojo.source.SourcePageRequest"
             
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 595b53576b..5f7958a3e1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -67,6 +67,7 @@ import 
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.module.ModuleDTO;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.node.AgentClusterNodeOperator;
@@ -229,8 +230,19 @@ public class AgentServiceImpl implements AgentService {
             ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(factory);
             executor.scheduleWithFixedDelay(() -> {
                 try {
+                    List<StreamSourceEntity> needDeletedList = 
sourceMapper.selectByByTimeout(retentionDays);
                     sourceMapper.logicalDeleteByTimeout(retentionDays);
-                    LOGGER.info("clean sub task successfully");
+                    if (CollectionUtils.isNotEmpty(needDeletedList)) {
+                        for (StreamSourceEntity sourceEntity : 
needDeletedList) {
+                            LOGGER.info("begin to clean sub task for 
source={}", sourceEntity);
+                            StreamSourceOperator sourceOperator =
+                                    
operatorFactory.getInstance(sourceEntity.getSourceType());
+                            SourceRequest request =
+                                    
CommonBeanUtils.copyProperties(sourceEntity, SourceRequest::new, true);
+                            sourceOperator.updateAgentTaskConfig(request, 
sourceEntity.getModifier());
+                            LOGGER.info("success to clean sub task 
successfully, ={}", sourceEntity.getId());
+                        }
+                    }
                 } catch (Throwable t) {
                     LOGGER.error("clean sub task error", t);
                 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 502808602b..60db39e546 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -124,7 +124,8 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
             StreamSourceEntity dataAddTaskEntity =
                     CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
             dataAddTaskEntity.setId(null);
-            dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" 
+ (dataAddTaskSize + 1));
+            dataAddTaskEntity.setSourceName(
+                    sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1) 
+ "-" + sourceEntity.getId());
             
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
             dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
             Integer id = sourceMapper.insert(dataAddTaskEntity);

Reply via email to