This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e42f5ec276 [INLONG-9873][Manager] Support adding data add tasks for 
file collection (#9874)
e42f5ec276 is described below

commit e42f5ec27688089eb50ae48695c8fe39684eefb7
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Mar 28 21:42:59 2024 +0800

    [INLONG-9873][Manager] Support adding data add tasks for file collection 
(#9874)
    
    * [INLONG-9873][Manager] Support adding supplementary recording tasks for 
file collection
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix code style
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
    
    * [INLONG-9873][Manager] Fix error
---
 .../manager/client/api/InlongGroupContext.java     |  2 +-
 .../manager/dao/entity/StreamSourceEntity.java     |  4 +-
 .../dao/mapper/StreamSourceEntityMapper.java       | 15 +++++---
 .../resources/mappers/StreamSourceEntityMapper.xml | 37 +++++++++++-------
 .../{SubSourceDTO.java => DataAddTaskDTO.java}     | 16 ++++----
 .../manager/pojo/source/DataAddTaskRequest.java    | 44 ++++++++++++++++++++++
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../inlong/manager/pojo/source/StreamSource.java   |  8 ++--
 .../pojo/source/file/FileDataAddTaskRequest.java}  | 27 ++++++++++---
 .../manager/pojo/source/pulsar/PulsarSource.java   |  3 ++
 .../pojo/source/pulsar/PulsarSourceDTO.java        |  3 ++
 .../service/core/impl/AgentServiceImpl.java        | 33 +++++++++++++---
 .../source/AbstractSourceOperateListener.java      |  2 +-
 .../listener/source/SourceRestartListener.java     |  2 +-
 .../listener/source/SourceStopListener.java        |  2 +-
 .../service/source/AbstractSourceOperator.java     |  7 ++++
 .../service/source/StreamSourceOperator.java       | 10 +++++
 .../service/source/StreamSourceService.java        | 10 +++++
 .../service/source/StreamSourceServiceImpl.java    | 15 +++++++-
 .../service/source/file/FileSourceOperator.java    | 43 +++++++++++++++++++--
 .../manager/service/task/DataCleansingTask.java    |  8 ++--
 .../service/task/DeleteStreamSourceTask.java       |  2 +-
 .../service/core/impl/AgentServiceTest.java        |  8 ++--
 .../main/resources/h2/apache_inlong_manager.sql    |  4 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  4 +-
 inlong-manager/manager-web/sql/changes-1.12.0.sql  |  3 ++
 .../web/controller/StreamSourceController.java     |  7 ++++
 .../src/main/resources/application-dev.properties  | 21 +++++++----
 .../src/main/resources/application-prod.properties | 21 +++++++----
 .../src/main/resources/application-test.properties | 21 +++++++----
 30 files changed, 294 insertions(+), 92 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 8600ecdafa..fe2c5ed04c 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -102,7 +102,7 @@ public class InlongGroupContext implements Serializable {
                     StreamSource source = entry.getValue();
                     // when template id is null it is considered as normal 
source other than template source
                     // sub sources are filtered because they are already 
collected in template source's sub source list
-                    if (source != null && source.getTemplateId() == null) {
+                    if (source != null && source.getTaskMapId() == null) {
                         groupSources.add(source);
                     }
                 }
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 4b28525333..b577fd48da 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -37,7 +37,7 @@ public class StreamSourceEntity implements Serializable {
     private String inlongStreamId;
     private String sourceType;
     private String sourceName;
-    private Integer templateId;
+    private Integer taskMapId;
     private String agentIp;
     private String uuid;
 
@@ -74,7 +74,7 @@ public class StreamSourceEntity implements Serializable {
                 + ", inlongStreamId='" + inlongStreamId + '\''
                 + ", sourceType='" + sourceType + '\''
                 + ", sourceName='" + sourceName + '\''
-                + ", templateId=" + templateId
+                + ", templateId=" + taskMapId
                 + ", agentIp='" + agentIp + '\''
                 + ", uuid='" + uuid + '\''
                 + ", dataNodeName='" + dataNodeName + '\''
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ca984fb04b..53d28bbbab 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -43,13 +43,13 @@ public interface StreamSourceEntityMapper {
     StreamSourceEntity selectForAgentTask(Integer id);
 
     /**
-     * Select one sub source by template id and agent ip.
+     * Select one data add task by task map id and agent ip.
      *
-     * @param templateId template id
+     * @param taskMapId template id
      * @param agentIp agent ip
      * @return stream source info
      */
-    StreamSourceEntity selectOneByTemplatedIdAndAgentIp(@Param("templateId") 
Integer templateId,
+    StreamSourceEntity selectOneByTaskMapIdAndAgentIp(@Param("taskMapId") 
Integer taskMapId,
             @Param("agentIp") String agentIp);
 
     /**
@@ -111,9 +111,9 @@ public interface StreamSourceEntityMapper {
     List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList") 
List<String> groupIdList);
 
     /**
-     * Select all sub sources by template id
+     * Select all data add task by task map id
      */
-    List<StreamSourceEntity> selectByTemplateId(@Param("templateId") Integer 
templateId);
+    List<StreamSourceEntity> selectByTaskMapId(@Param("taskMapId") Integer 
taskMapId);
 
     /**
      * Get the distinct source type from the given groupId and streamId
@@ -190,6 +190,11 @@ public interface StreamSourceEntityMapper {
      */
     void updateStatusByDeleted();
 
+    /**
+     * Logic delete the data add task by modifiy time
+     */
+    void logicalDeleteByTimeout(@Param("retentionDays") Integer retentionDays);
+
     int logicalDeleteByRelatedId(@Param("groupId") String groupId, 
@Param("streamId") String streamId,
             @Param("status") Integer status);
 
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 59d97d8a7f..c1101ed2f6 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -26,7 +26,7 @@
         <result column="inlong_stream_id" jdbcType="VARCHAR" 
property="inlongStreamId"/>
         <result column="source_type" jdbcType="VARCHAR" property="sourceType"/>
         <result column="source_name" jdbcType="VARCHAR" property="sourceName"/>
-        <result column="template_id" jdbcType="INTEGER" property="templateId"/>
+        <result column="task_map_id" jdbcType="INTEGER" property="taskMapId"/>
         <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
         <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
         <result column="data_node_name" jdbcType="VARCHAR" 
property="dataNodeName"/>
@@ -47,7 +47,7 @@
         <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, inlong_group_id, inlong_stream_id, source_type, source_name, 
template_id, agent_ip, uuid,
+        id, inlong_group_id, inlong_stream_id, source_type, source_name, 
task_map_id, agent_ip, uuid,
         data_node_name, inlong_cluster_name, inlong_cluster_node_group, 
serialization_type, snapshot, report_time,
         data_time_zone, ext_params, version, status, previous_status, 
is_deleted, creator, modifier, create_time, modify_time
     </sql>
@@ -55,13 +55,13 @@
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         insert into stream_source (inlong_group_id, inlong_stream_id,
-                                   source_type, source_name, template_id, 
agent_ip,
+                                   source_type, source_name, task_map_id, 
agent_ip,
                                    uuid, data_node_name, inlong_cluster_name, 
inlong_cluster_node_group,
                                    serialization_type, snapshot, report_time,
                                    data_time_zone, ext_params, status,
                                    previous_status, creator, modifier)
         values (#{inlongGroupId,jdbcType=VARCHAR}, 
#{inlongStreamId,jdbcType=VARCHAR},
-                #{sourceType,jdbcType=VARCHAR}, 
#{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
+                #{sourceType,jdbcType=VARCHAR}, 
#{sourceName,jdbcType=VARCHAR}, #{taskMapId,jdbcType=INTEGER},
                 #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, 
#{dataNodeName,jdbcType=VARCHAR},
                 #{inlongClusterName,jdbcType=VARCHAR}, 
#{inlongClusterNodeGroup,jdbcType=VARCHAR},
                 #{serializationType,jdbcType=VARCHAR}, 
#{snapshot,jdbcType=LONGVARCHAR},#{modifyTime,jdbcType=TIMESTAMP},
@@ -76,11 +76,11 @@
         where id = #{id,jdbcType=INTEGER}
         and is_deleted = 0
     </select>
-    <select id="selectByTemplateId" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByTaskMapId" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
-        where template_id = #{templateId,jdbcType=INTEGER}
+        where task_map_id = #{taskMapId,jdbcType=INTEGER}
         and is_deleted = 0
     </select>
     <select id="selectByIdForUpdate" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -98,11 +98,11 @@
         where id = #{id,jdbcType=INTEGER}
         for update
     </select>
-    <select id="selectOneByTemplatedIdAndAgentIp" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectOneByTaskMapIdAndAgentIp" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
-        where template_id = #{templateId,jdbcType=INTEGER}
+        where task_map_id = #{taskMapId,jdbcType=INTEGER}
         and agent_ip = #{agentIp, jdbcType=VARCHAR}
         and is_deleted = 0
         limit 1
@@ -159,7 +159,7 @@
                     #{status}
                 </foreach>
             </if>
-            and template_id is NULL
+            and task_map_id is NULL
         </where>
         <choose>
             <when test="request.orderField != null and request.orderField != 
'' and request.orderType != null and request.orderType != ''">
@@ -271,7 +271,7 @@
                 </foreach>
             </if>
             and agent_ip is NULL
-            and template_id is NULL
+            and task_map_id is NULL
             and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
         </where>
     </select>
@@ -303,7 +303,7 @@
                     #{item}
                 </foreach>
             </if>
-            and template_id is NULL
+            and task_map_id is NULL
         </where>
     </select>
     <select id="selectSourceType" resultType="java.lang.String">
@@ -549,7 +549,18 @@
             and status not in (99, 201, 301)
         </where>
     </update>
-
+    <update id="logicalDeleteByTimeout">
+        update stream_source
+        <set>
+            is_deleted = id,
+            status = 99
+        </set>
+        <where>
+            is_deleted = 0
+            and task_map_id is not null
+            and modify_time &lt;= DATE_ADD(NOW(), INTERVAL -#{retentionDays, 
jdbcType=INTEGER} DAY)
+        </where>
+    </update>
     <update id="logicalDeleteByRelatedId">
         update stream_source
         <set>
@@ -586,7 +597,7 @@
         </set>
         where is_deleted = 0
         and agent_ip = #{agentIp, jdbcType=VARCHAR}
-        and template_id is not null
+        and task_map_id is not null
         <if test="targetStatus != null">
             and status = #{targetStatus, jdbcType=INTEGER}
         </if>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java
similarity index 79%
rename from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
rename to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java
index 50d46d6e8a..544627db34 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java
@@ -30,29 +30,29 @@ import lombok.NoArgsConstructor;
 import javax.validation.constraints.NotNull;
 
 /**
- * Sub source information data per agent
+ * Data add task information
  */
 @Builder
 @AllArgsConstructor
 @NoArgsConstructor
 @Data
-public class SubSourceDTO {
+public class DataAddTaskDTO {
 
     @ApiModelProperty("stream source id")
     private Integer id;
 
-    @ApiModelProperty("Template source id this sub source belongs to")
-    private Integer templateId;
+    @ApiModelProperty("Main source id this data add task belongs to")
+    private Integer taskMapId;
 
-    @ApiModelProperty("Agent ip of sub source")
+    @ApiModelProperty("Agent ip of data add task")
     private String agentIp;
 
-    @ApiModelProperty("Status of sub source")
+    @ApiModelProperty("Status of data add task")
     private Integer status;
 
-    public static SubSourceDTO getFromJson(@NotNull String extParams) {
+    public static DataAddTaskDTO getFromJson(@NotNull String extParams) {
         try {
-            return JsonUtils.parseObject(extParams, SubSourceDTO.class);
+            return JsonUtils.parseObject(extParams, DataAddTaskDTO.class);
         } catch (Exception e) {
             throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
new file mode 100644
index 0000000000..521551825a
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import org.hibernate.validator.constraints.Length;
+
+import javax.validation.constraints.NotBlank;
+
+/**
+ * Data add task information
+ */
+@Data
+@ApiModel("Data add task request")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = 
"sourceType")
+public class DataAddTaskRequest {
+
+    @ApiModelProperty(value = "Source ID")
+    private Integer sourceId;
+
+    @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
+    @NotBlank(message = "sourceType cannot be blank")
+    @Length(min = 1, max = 20, message = "length must be between 1 and 20")
+    private String sourceType;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 0bcfade77b..6a31f9039f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -118,7 +118,7 @@ public class SourceRequest {
     private Map<String, Object> properties = new LinkedHashMap<>();
 
     @JsonIgnore
-    @ApiModelProperty("Sub source information of existing agents")
-    private List<SubSourceDTO> subSourceList;
+    @ApiModelProperty("Data add task information of existing agents")
+    private List<DataAddTaskDTO> dataAddTaskList;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 9793189d26..cc56ecb6a7 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -111,11 +111,11 @@ public abstract class StreamSource extends StreamNode {
     @ApiModelProperty("Properties for source")
     private Map<String, Object> properties = new LinkedHashMap<>();
 
-    @ApiModelProperty("Null if not a sub source")
-    private Integer templateId;
+    @ApiModelProperty("Null if not a data add task")
+    private Integer taskMapId;
 
-    @ApiModelProperty("Sub source information of existing agents")
-    private List<SubSourceDTO> subSourceList;
+    @ApiModelProperty("Data add task information of existing agents")
+    private List<DataAddTaskDTO> dataAddTaskList;
 
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value, true as default")
     private Boolean ignoreParseError;
diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
similarity index 51%
copy from inlong-manager/manager-web/sql/changes-1.12.0.sql
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
index 82e8af66e5..bcf292c1f3 100644
--- a/inlong-manager/manager-web/sql/changes-1.12.0.sql
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
@@ -15,14 +15,29 @@
  * limitations under the License.
  */
 
--- This is the SQL change file from version 1.9.0 to the current version 
1.10.0.
--- When upgrading to version 1.10.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.pojo.source.file;
 
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 
-USE `apache_inlong_manager`;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
-ALTER TABLE `stream_source` ADD COLUMN  `data_time_zone` varchar(256) DEFAULT 
NULL COMMENT 'Data time zone';
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.FILE)
+@ApiModel(value = "File data add task request")
+public class FileDataAddTaskRequest extends DataAddTaskRequest {
 
+    @ApiModelProperty("Start time")
+    private Long startTime;
 
+    @ApiModelProperty("End time")
+    private Long endTime;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index cdab4d59cf..8c14d8118b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -88,6 +88,9 @@ public class PulsarSource extends StreamSource {
     @Builder.Default
     private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();
 
+    @ApiModelProperty("Reset subscription time")
+    private Long resetTime;
+
     public PulsarSource() {
         this.setSourceType(SourceType.PULSAR);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 6fc1751d72..6c0ba66208 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -80,6 +80,9 @@ public class PulsarSourceDTO {
     @ApiModelProperty(value = "The message body wrap  wrap type, including: 
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 
+    @ApiModelProperty("Reset subscription time")
+    private Long resetTime;
+
     @ApiModelProperty("Properties for Pulsar")
     private Map<String, Object> properties;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 953df33e96..3fa86b02fe 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -136,10 +136,17 @@ public class AgentServiceImpl implements AgentService {
     private Integer beforeSeconds;
     @Value("${source.update.interval:60}")
     private Integer updateTaskInterval;
-    @Value("${source.cleansing.enabled:false}")
+    @Value("${source.clean.enabled:false}")
     private Boolean sourceCleanEnabled;
-    @Value("${source.cleansing.interval:600}")
+    @Value("${source.clean.interval.seconds:600}")
     private Integer cleanInterval;
+    @Value("${add.task.clean.enabled:false}")
+    private Boolean dataAddTaskCleanEnabled;
+    @Value("${add.task.clean.interval.seconds:10}")
+    private Integer dataAddTaskCleanInterval;
+    @Value("${add.task.retention.days:7}")
+    private Integer retentionDays;
+
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
@@ -202,6 +209,22 @@ public class AgentServiceImpl implements AgentService {
             }, 0, cleanInterval, TimeUnit.SECONDS);
             LOGGER.info("clean task started successfully");
         }
+        if (dataAddTaskCleanEnabled) {
+            ThreadFactory factory = new ThreadFactoryBuilder()
+                    .setNameFormat("scheduled-subSource-deleted-%d")
+                    .setDaemon(true)
+                    .build();
+            ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(factory);
+            executor.scheduleWithFixedDelay(() -> {
+                try {
+                    sourceMapper.logicalDeleteByTimeout(retentionDays);
+                    LOGGER.info("clean sub task successfully");
+                } catch (Throwable t) {
+                    LOGGER.error("clean sub task error", t);
+                }
+            }, 0, dataAddTaskCleanInterval, TimeUnit.SECONDS);
+            LOGGER.info("clean sub task started successfully");
+        }
     }
 
     @Override
@@ -441,7 +464,7 @@ public class AgentServiceImpl implements AgentService {
 
     /**
      * Add subtasks to template tasks.
-     * (Template task are agent_ip is null and template_id is null)
+     * (Template task are agent_ip is null and task_map_id is null)
      */
     private void preProcessTemplateFileTask(TaskRequest taskRequest) {
         List<Integer> needCopiedStatusList = 
Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(),
@@ -463,7 +486,7 @@ public class AgentServiceImpl implements AgentService {
                     if (groupEntity != null && 
noNeedAddTask.contains(GroupStatus.forCode(groupEntity.getStatus()))) {
                         return;
                     }
-                    StreamSourceEntity subSource = 
sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),
+                    StreamSourceEntity subSource = 
sourceMapper.selectOneByTaskMapIdAndAgentIp(sourceEntity.getId(),
                             agentIp);
                     if (subSource == null) {
                         InlongClusterNodeEntity clusterNodeEntity = 
selectByIpAndCluster(agentClusterName, agentIp);
@@ -474,7 +497,7 @@ public class AgentServiceImpl implements AgentService {
                                     
CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
                             
fileEntity.setSourceName(fileEntity.getSourceName() + "-"
                                     + 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
-                            fileEntity.setTemplateId(sourceEntity.getId());
+                            fileEntity.setTaskMapId(sourceEntity.getId());
                             fileEntity.setAgentIp(agentIp);
                             
fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
                             // create new sub source task
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index cb955decd9..ccefd5cf10 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -106,7 +106,7 @@ public abstract class AbstractSourceOperateListener 
implements SourceOperateList
             // template sources are filtered and processed in corresponding 
subclass listeners
             if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == 
SourceStatus.SOURCE_STOP
                     || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
-                    || 
CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
+                    || 
CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) {
                 return true;
             } else if (sourceStatus == SourceStatus.SOURCE_FAILED || 
sourceStatus == SourceStatus.SOURCE_DISABLE) {
                 return false;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
index 7575d8c25a..238b6bc1bc 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
@@ -51,7 +51,7 @@ public class SourceRestartListener extends 
AbstractSourceOperateListener {
     public void operateStreamSource(SourceRequest sourceRequest, String 
operator) {
         // if a source has sub-sources, it is considered a template source.
         // template sources do not need to be restarted, its sub-sources will 
be processed in this method later.
-        if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+        if (CollectionUtils.isNotEmpty(sourceRequest.getDataAddTaskList())) {
             return;
         }
         streamSourceService.restart(sourceRequest.getId(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
index e3636dcb89..e1328bf6e2 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
@@ -51,7 +51,7 @@ public class SourceStopListener extends 
AbstractSourceOperateListener {
     public void operateStreamSource(SourceRequest sourceRequest, String 
operator) {
         // if a source has sub-sources, it is considered a template source.
         // template sources do not need to be stopped, its sub-sources will be 
processed in this method later.
-        if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+        if (CollectionUtils.isNotEmpty(sourceRequest.getDataAddTaskList())) {
             return;
         }
         streamSourceService.stop(sourceRequest.getId(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 00f85052fd..9428101c40 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -32,6 +32,7 @@ import 
org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -338,4 +339,10 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
     public void syncSourceFieldInfo(SourceRequest request, String operator) {
         LOGGER.info("not support sync source field info for type ={}", 
request.getSourceType());
     }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
+    public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
+        throw new BusinessException(String.format("not support data add task 
for type =%s", request.getSourceType()));
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 5e7168879b..997f39b867 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -126,4 +127,13 @@ public interface StreamSourceOperator {
      */
     void syncSourceFieldInfo(SourceRequest request, String operator);
 
+    /**
+     * Save the data add task info.
+     *
+     * @param request request of data add task
+     * @param operator name of operator
+     * @return source id after saving
+     */
+    Integer addDataAddTask(DataAddTaskRequest request, String operator);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 1bcb9f9966..0dd4decd28 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.source;
 
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -229,4 +230,13 @@ public interface StreamSourceService {
         return true;
     }
 
+    /**
+     * Save the data add task information
+     *
+     * @param request Source request.
+     * @param operator Operator's name.
+     * @return source id after saving.
+     */
+    Integer addDataAddTask(DataAddTaskRequest request, String operator);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index a8a01224b8..2d3855b05e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -384,7 +385,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.expectNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
                 ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        boolean isTemplateSource = 
CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));
+        boolean isTemplateSource = 
CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id));
 
         // Check if it can be delete
         InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(entity.getInlongGroupId());
@@ -436,7 +437,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
                     String.format("InlongGroup does not exist with 
InlongGroupId=%s", entity.getInlongGroupId()));
         }
         // check record status
-        boolean isTemplateSource = 
CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));
+        boolean isTemplateSource = 
CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id));
         SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
         SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
         // if source is frozen|failed|new, or if it is a template source or 
auto push source, delete directly
@@ -629,4 +630,14 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         request.setInlongStreamId(entity.getInlongStreamId());
         request.setSourceName(entity.getSourceName());
     }
+
+    @Override
+    public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
+        LOGGER.info("begin to add data add task info: {}", request);
+        StreamSourceEntity entity = 
sourceMapper.selectById(request.getSourceId());
+        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(entity.getSourceType());
+        int id = sourceOperator.addDataAddTask(request, operator);
+        LOGGER.info("success to add data add task info: {}", request);
+        return id;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 05b22ae51c..5d4329c7ff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -23,9 +23,11 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.SubSourceDTO;
+import org.apache.inlong.manager.pojo.source.file.FileDataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.file.FileSource;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceRequest;
@@ -33,8 +35,13 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -45,6 +52,8 @@ import java.util.stream.Collectors;
 @Service
 public class FileSourceOperator extends AbstractSourceOperator {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSourceOperator.class);
+
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -88,14 +97,40 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
         List<StreamField> sourceFields = super.getSourceFields(entity.getId());
         source.setFieldList(sourceFields);
 
-        List<StreamSourceEntity> subSourceList = 
sourceMapper.selectByTemplateId(entity.getId());
-        source.setSubSourceList(subSourceList.stream().map(subEntity -> 
SubSourceDTO.builder()
+        List<StreamSourceEntity> dataAddTaskList = 
sourceMapper.selectByTaskMapId(entity.getId());
+        source.setDataAddTaskList(dataAddTaskList.stream().map(subEntity -> 
DataAddTaskDTO.builder()
                 .id(subEntity.getId())
-                .templateId(entity.getId())
+                .taskMapId(entity.getId())
                 .agentIp(subEntity.getAgentIp())
                 .status(subEntity.getStatus()).build())
                 .collect(Collectors.toList()));
         return source;
     }
 
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
+    public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
+        FileDataAddTaskRequest sourceRequest = (FileDataAddTaskRequest) 
request;
+        StreamSourceEntity sourceEntity = 
sourceMapper.selectById(request.getSourceId());
+        try {
+            List<StreamSourceEntity> dataAddTaskList = 
sourceMapper.selectByTaskMapId(sourceEntity.getId());
+            int dataAddTaskSize = CollectionUtils.isNotEmpty(dataAddTaskList) 
? dataAddTaskList.size() : 0;
+            FileSourceDTO dto = 
FileSourceDTO.getFromJson(sourceEntity.getExtParams());
+            dto.setStartTime(sourceRequest.getStartTime());
+            dto.setEndTime(sourceRequest.getEndTime());
+            dto.setRetry(true);
+            StreamSourceEntity dataAddTaskEntity =
+                    CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
+            dataAddTaskEntity.setId(null);
+            dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" 
+ (dataAddTaskSize + 1));
+            
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
+            return sourceMapper.insert(dataAddTaskEntity);
+        } catch (Exception e) {
+            LOGGER.error("serialize extParams of File SourceDTO failure: ", e);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of File SourceDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
index 871700c5b3..2758c80ce8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
@@ -59,13 +59,13 @@ public class DataCleansingTask extends TimerTask implements 
InitializingBean {
      */
     private static final int INITIAL_DELAY = 60;
 
-    @Value("${data.cleansing.enabled:false}")
+    @Value("${data.clean.enabled:false}")
     private Boolean enabled;
-    @Value("${data.cleansing.interval.seconds:1800}")
+    @Value("${data.clean.interval.seconds:1800}")
     private Integer interval;
-    @Value("${data.cleansing.before.days:10}")
+    @Value("${data.clean.before.days:10}")
     private Integer before;
-    @Value("${data.cleansing.batchSize:100}")
+    @Value("${data.clean.batchSize:100}")
     private Integer batchSize;
 
     @Autowired
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
index 9d3a83dff9..5e4b4e724e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
@@ -56,7 +56,7 @@ public class DeleteStreamSourceTask extends TimerTask 
implements InitializingBea
 
     @Value("${group.deleted.enabled:false}")
     private Boolean enabled;
-    @Value("${group.deleted.batchSize:100}")
+    @Value("${group.deleted.batch.size:100}")
     private Integer batchSize;
     @Value("${group.deleted.latest.hours:10}")
     private Integer latestHours;
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 70034803e8..41acbdfd86 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -141,7 +141,7 @@ class AgentServiceTest extends ServiceBaseTest {
     public void suspendSource(String groupId, String streamId) {
         List<StreamSource> sources = sourceService.listSource(groupId, 
streamId);
         sources.stream()
-                .filter(source -> source.getTemplateId() != null)
+                .filter(source -> source.getTaskMapId() != null)
                 .forEach(source -> sourceService.stop(source.getId(), 
GLOBAL_OPERATOR));
         groupMapper.updateStatus(groupId, 
GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR);
         streamMapper.updateStatusByIdentifier(groupId, streamId, 
StreamStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR);
@@ -153,7 +153,7 @@ class AgentServiceTest extends ServiceBaseTest {
     public void restartSource(String groupId, String streamId) {
         List<StreamSource> sources = sourceService.listSource(groupId, 
streamId);
         sources.stream()
-                .filter(source -> source.getTemplateId() != null)
+                .filter(source -> source.getTaskMapId() != null)
                 .forEach(source -> sourceService.restart(source.getId(), 
GLOBAL_OPERATOR));
         groupMapper.updateStatus(groupId, 
GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
         streamMapper.updateStatusByIdentifier(groupId, streamId, 
StreamStatus.RESTARTED.getCode(), GLOBAL_OPERATOR);
@@ -230,7 +230,7 @@ class AgentServiceTest extends ServiceBaseTest {
         agent.pullTask(); // report last success status
 
         final int sourceId = sourceService.listSource(groupStream.getLeft(), 
groupStream.getRight()).stream()
-                .filter(source -> source.getTemplateId() != null)
+                .filter(source -> source.getTaskMapId() != null)
                 .findAny()
                 .get()
                 .getId();
@@ -256,7 +256,7 @@ class AgentServiceTest extends ServiceBaseTest {
 
         // update group to config success
         final String groupId = sourceService.listSource(groupStream.getLeft(), 
groupStream.getRight()).stream()
-                .filter(source -> source.getTemplateId() != null)
+                .filter(source -> source.getTaskMapId() != null)
                 .findAny()
                 .get()
                 .getInlongGroupId();
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index edd9abf88d..1a2d28ad2f 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -331,7 +331,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `inlong_stream_id`    varchar(256) NOT NULL COMMENT 'Inlong stream id',
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 
'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source 
type, including: FILE, DB, etc',
-    `template_id`         int(11)               DEFAULT NULL COMMENT 'Id of 
the template task this agent belongs to',
+    `task_map_id`         int(11)               DEFAULT NULL COMMENT 'Id of 
the task this agent belongs to',
     `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of 
the agent running the task, NULL if this is a template task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid 
of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node 
name, which links to data_node table',
@@ -354,7 +354,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, 
`source_name`, `is_deleted`),
     INDEX `source_status_index` (`status`, `is_deleted`),
     INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
-    INDEX `source_template_id_index` (`template_id`)
+    INDEX `source_task_map_id_index` (`task_map_id`)
 );
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 025b53190e..5451516614 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -351,7 +351,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `inlong_stream_id`    varchar(256) NOT NULL COMMENT 'Inlong stream id',
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 
'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source 
type, including: FILE, DB, etc',
-    `template_id`         int(11)               DEFAULT NULL COMMENT 'Id of 
the template task this agent belongs to',
+    `task_map_id`         int(11)               DEFAULT NULL COMMENT 'Id of 
the task this agent belongs to',
     `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of 
the agent running the task, NULL if this is a template task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid 
of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node 
name, which links to data_node table',
@@ -374,7 +374,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, 
`source_name`, `is_deleted`),
     INDEX `source_status_index` (`status`, `is_deleted`),
     INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
-    INDEX `source_template_id_index` (`template_id`)
+    INDEX `source_task_map_id_index` (`task_map_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
 
diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql 
b/inlong-manager/manager-web/sql/changes-1.12.0.sql
index 82e8af66e5..a92011164f 100644
--- a/inlong-manager/manager-web/sql/changes-1.12.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.12.0.sql
@@ -24,5 +24,8 @@ SET FOREIGN_KEY_CHECKS = 0;
 USE `apache_inlong_manager`;
 
 ALTER TABLE `stream_source` ADD COLUMN  `data_time_zone` varchar(256) DEFAULT 
NULL COMMENT 'Data time zone';
+DROP INDEX `source_template_id_index` ON `stream_source`;
+CREATE INDEX source_task_map_id_index ON `stream_source` (`task_map_id`);
 
+ALTER TABLE `stream_source` CHANGE template_id task_map_id int(11) DEFAULT 
NULL COMMENT 'Id of the task this agent belongs to';
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index f0cbbd56da..8e7645f993 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -118,4 +119,10 @@ public class StreamSourceController {
                 sourceService.forceDelete(inlongGroupId, inlongStreamId, 
LoginUserUtils.getLoginUser().getName()));
     }
 
+    @RequestMapping(value = "/source/addDataAddTask", method = 
RequestMethod.POST)
+    @ApiOperation(value = "Add supplementary recording task for stream source")
+    public Response<Integer> addSub(@RequestBody DataAddTaskRequest request) {
+        return Response.success(sourceService.addDataAddTask(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
 }
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 0bcebdc06e..8a9032d5ec 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -73,15 +73,15 @@ audit.ck.username=default
 # ClickHouse password
 audit.ck.password=
 
-# Database cleansing
+# Database clean
 # If turned on, logically deleted data will be collected and permanently 
deleted periodically
-data.cleansing.enabled=false
+data.clean.enabled=false
 # The interval (in seconds) between the end of one execution and the start of 
the next, default is 1800s (0.5 hour)
-data.cleansing.interval.seconds=1800
+data.clean.interval.seconds=1800
 # Select the data whose latest modify time is some days before, default is 10 
days
-data.cleansing.before.days=10
+data.clean.before.days=10
 # The maximum size of data to be deleted in batch, default is 100
-data.cleansing.batchSize=100
+data.clean.batchSize=100
 
 # Whether to use ZooKeeper to manage the Sort task config, default is false, 
which means not using ZooKeeper
 sort.enable.zookeeper=false
@@ -97,14 +97,19 @@ source.update.enabled=false
 source.update.before.seconds=60
 source.update.interval=60
 
+# If turned on, regularly clear expired data add tasks
+add.task.clean.enabled=false
+add.task.clean.interval.seconds=10
+add.task.retention.days=7
+
 # If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=false
-source.cleansing.interval=600
+source.clean.enabled=false
+source.clean.interval.seconds=600
 
 # Select the InlongGroupIds whose latest modification time is within how many 
hours, the default is 10 hours
 group.deleted.latest.hours=10
 # The maximum size when querying InlongGroupIds in batches, those 
InlongGroupIds will be used to delete the related StreamSources.
-group.deleted.batchSize=100
+group.deleted.batch.size=100
 # If turned on, the groups could be deleted periodically.
 group.deleted.enabled=false
 
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index a9c55b39b3..835822bf84 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -72,15 +72,15 @@ audit.ck.username=default
 # ClickHouse password
 audit.ck.password=
 
-# Database cleansing
+# Database clean
 # If turned on, logically deleted data will be collected and permanently 
deleted periodically
-data.cleansing.enabled=false
+data.clean.enabled=false
 # The interval (in seconds) between the end of one execution and the start of 
the next, default is 1800s (0.5 hour)
-data.cleansing.interval.seconds=1800
+data.clean.interval.seconds=1800
 # Select the data whose latest modify time is some days before, default is 10 
days
-data.cleansing.before.days=10
+data.clean.before.days=10
 # The maximum size of data to be deleted in batch, default is 100
-data.cleansing.batchSize=100
+data.clean.batchSize=100
 
 # Whether to use ZooKeeper to manage the Sort task config, default is false, 
which means not using ZooKeeper
 sort.enable.zookeeper=false
@@ -96,14 +96,19 @@ source.update.enabled=false
 source.update.before.seconds=60
 source.update.interval=60
 
+# If turned on, regularly clear expired data add tasks
+add.task.clean.enabled=false
+add.task.clean.interval.seconds=10
+add.task.retention.days=7
+
 # If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=false
-source.cleansing.interval=600
+source.clean.enabled=false
+source.clean.interval.seconds=600
 
 # Select the InlongGroupIds whose latest modification time is within how many 
hours, the default is 10 hours
 group.deleted.latest.hours=10
 # The maximum size when querying InlongGroupIds in batches, those 
InlongGroupIds will be used to delete the related StreamSources.
-group.deleted.batchSize=100
+group.deleted.batch.size=100
 # If turned on, the groups could be deleted periodically.
 group.deleted.enabled=false
 
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 0bcebdc06e..8a9032d5ec 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -73,15 +73,15 @@ audit.ck.username=default
 # ClickHouse password
 audit.ck.password=
 
-# Database cleansing
+# Database clean
 # If turned on, logically deleted data will be collected and permanently 
deleted periodically
-data.cleansing.enabled=false
+data.clean.enabled=false
 # The interval (in seconds) between the end of one execution and the start of 
the next, default is 1800s (0.5 hour)
-data.cleansing.interval.seconds=1800
+data.clean.interval.seconds=1800
 # Select the data whose latest modify time is some days before, default is 10 
days
-data.cleansing.before.days=10
+data.clean.before.days=10
 # The maximum size of data to be deleted in batch, default is 100
-data.cleansing.batchSize=100
+data.clean.batchSize=100
 
 # Whether to use ZooKeeper to manage the Sort task config, default is false, 
which means not using ZooKeeper
 sort.enable.zookeeper=false
@@ -97,14 +97,19 @@ source.update.enabled=false
 source.update.before.seconds=60
 source.update.interval=60
 
+# If turned on, regularly clear expired data add tasks
+add.task.clean.enabled=false
+add.task.clean.interval.seconds=10
+add.task.retention.days=7
+
 # If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=false
-source.cleansing.interval=600
+source.clean.enabled=false
+source.clean.interval.seconds=600
 
 # Select the InlongGroupIds whose latest modification time is within how many 
hours, the default is 10 hours
 group.deleted.latest.hours=10
 # The maximum size when querying InlongGroupIds in batches, those 
InlongGroupIds will be used to delete the related StreamSources.
-group.deleted.batchSize=100
+group.deleted.batch.size=100
 # If turned on, the groups could be deleted periodically.
 group.deleted.enabled=false
 

Reply via email to