This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d5007fc5 [INLONG-10601][Manager] Optimize the agent task 
configuration process (#10602)
e4d5007fc5 is described below

commit e4d5007fc56240f41f9233b440c6cde75942c69f
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Jul 11 19:19:19 2024 +0800

    [INLONG-10601][Manager] Optimize the agent task configuration process 
(#10602)
---
 .../inlong/common/pojo/agent/AgentConfigInfo.java  |   1 +
 .../inlong/common/pojo/agent/TaskResult.java       |   1 +
 .../manager/dao/entity/AgentTaskConfigEntity.java  |  39 ++-
 .../dao/mapper/AgentTaskConfigEntityMapper.java    |  45 ++++
 .../mappers/AgentTaskConfigEntityMapper.xml        | 100 ++++++++
 .../{SortConfigLoader.java => ConfigLoader.java}   |  10 +-
 .../service/core/impl/AgentServiceImpl.java        | 136 +++++-----
 ...ConfigLoaderImpl.java => ConfigLoaderImpl.java} |  17 +-
 .../service/core/impl/SortClusterServiceImpl.java  |  16 +-
 .../manager/service/core/impl/SortServiceImpl.java |   6 +-
 .../service/core/impl/SortSourceServiceImpl.java   |   6 +-
 .../listener/StreamTaskListenerFactory.java        |   4 +
 .../listener/source/SourceStartListener.java       |  86 +++++++
 .../repository/DataProxyConfigRepository.java      |   8 +-
 .../service/source/AbstractSourceOperator.java     | 275 +++++++++++++++++++++
 .../service/source/StreamSourceOperator.java       |   8 +
 .../service/stream/TemplateServiceImpl.java        |   4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  20 ++
 .../manager-web/sql/apache_inlong_manager.sql      |  21 ++
 inlong-manager/manager-web/sql/changes-1.13.0.sql  |  21 ++
 20 files changed, 703 insertions(+), 121 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
index 2399c9657c..ff8cd4df3e 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
@@ -34,6 +34,7 @@ public class AgentConfigInfo {
     AgentResponseCode code;
     private String zkUrl;
     private AgentClusterInfo cluster;
+    private Integer version;
     private String md5;
 
     @Data
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index 2fcbec919d..7970e345cb 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -36,6 +36,7 @@ public class TaskResult {
     private List<CmdConfig> cmdConfigs;
     private List<DataConfig> dataConfigs;
     private String md5;
+    private Integer version;
     AgentResponseCode code;
 
 }
\ No newline at end of file
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
similarity index 58%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
copy to 
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
index 2399c9657c..cc8c3ab529 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
@@ -15,36 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.agent;
+package org.apache.inlong.manager.dao.entity;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.Date;
 
 /**
- * The Agent config info.
+ * Agent task config entity, including agent ip, cluster name, etc.
  */
 @Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class AgentConfigInfo {
+public class AgentTaskConfigEntity implements Serializable {
 
-    AgentResponseCode code;
-    private String zkUrl;
-    private AgentClusterInfo cluster;
-    private String md5;
+    private static final long serialVersionUID = 1L;
+    private Integer id;
+    private String clusterName;
+    private String agentIp;
 
-    @Data
-    @Builder
-    @NoArgsConstructor
-    @AllArgsConstructor
-    public static class AgentClusterInfo {
+    private String configParams;
 
-        private Integer parentId;
+    private String taskParams;
 
-        private String clusterName;
+    private Integer isDeleted;
+    private String creator;
+    private String modifier;
+    private Date createTime;
+    private Date modifyTime;
+    private Integer version;
 
-    }
 }
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java
new file mode 100644
index 0000000000..d673a16eac
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
+
+import org.apache.ibatis.annotations.Options;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface AgentTaskConfigEntityMapper {
+
+    int insert(AgentTaskConfigEntity record);
+
+    AgentTaskConfigEntity selectByPrimaryKey(Integer id);
+
+    AgentTaskConfigEntity selectByIdentifier(@Param("agentIp") String agentIp,
+            @Param("clusterName") String clusterName);
+
+    int updateByIdSelective(AgentTaskConfigEntity record);
+
+    @MultiTenantQuery(with = false)
+    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 
Integer.MIN_VALUE)
+    Cursor<AgentTaskConfigEntity> selectAllAgentTaskConfigs();
+
+}
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
new file mode 100644
index 0000000000..9a5f237c8d
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
+        <result column="cluster_name" jdbcType="VARCHAR" 
property="clusterName"/>
+        <result column="config_params" jdbcType="VARCHAR" 
property="configParams"/>
+        <result column="task_params" jdbcType="VARCHAR" property="taskParams"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
+        <result column="version" jdbcType="INTEGER" property="version"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, agent_ip, cluster_name, config_params, task_params, is_deleted, 
creator, modifier, create_time, modify_time, version
+    </sql>
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            
parameterType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+        insert into agent_task_config (id, agent_ip, cluster_name,
+                                 config_params, task_params,
+                                 creator, modifier)
+        values (#{id, jdbcType=INTEGER}, #{agentIp, jdbcType=VARCHAR}, 
#{clusterName, jdbcType=VARCHAR},
+                #{configParams, jdbcType=VARCHAR}, #{taskParams, 
jdbcType=VARCHAR},
+                #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
+    </insert>
+
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from agent_task_config
+        where id = #{id,jdbcType=INTEGER}
+        and is_deleted = 0
+    </select>
+    <select id="selectByIdentifier" 
resultType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from agent_task_config
+        where agent_ip = #{agentIp,jdbcType=VARCHAR}
+        and cluster_name = #{clusterName, jdbcType=VARCHAR}
+        and is_deleted = 0
+    </select>
+    <select id="selectAllAgentTaskConfigs" 
resultType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from agent_task_config
+        <where>
+            and is_deleted = 0
+            and agent_ip is not null
+        </where>
+    </select>
+    <update id="updateByIdSelective" 
parameterType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+        update agent_task_config
+        <set>
+            <if test="agentIp != null">
+                agent_ip = #{agentIp,jdbcType=VARCHAR},
+            </if>
+            <if test="clusterName != null">
+                cluster_name = #{clusterName,jdbcType=VARCHAR},
+            </if>
+            <if test="configParams != null">
+                config_params = #{configParams,jdbcType=VARCHAR},
+            </if>
+            <if test="taskParams != null">
+                task_params = #{taskParams,jdbcType=VARCHAR},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier,jdbcType=VARCHAR},
+            </if>
+            version = #{version,jdbcType=INTEGER} + 1
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+        and version = #{version,jdbcType=INTEGER}
+    </update>
+</mapper>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
similarity index 93%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
index 2923e6f34d..a491838c68 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.core;
 
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
 import org.apache.inlong.manager.dao.entity.ClusterConfigEntity;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
@@ -35,7 +36,7 @@ import java.util.List;
 /**
  * Loader for sort service to load configs thought Cursor
  */
-public interface SortConfigLoader {
+public interface ConfigLoader {
 
     /**
      * Load all clusters by cursor
@@ -124,4 +125,11 @@ public interface SortConfigLoader {
      */
     List<ClusterConfigEntity> loadAllClusterConfigEntity();
 
+    /**
+     * Load all agent task config info
+     *
+     * @return List of agent task config info
+     */
+    List<AgentTaskConfigEntity> loadAllAgentTaskConfigEntity();
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 37a19beb14..f7d669151a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -47,6 +47,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -63,7 +64,6 @@ import 
org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
-import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterInfo;
 import 
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
@@ -73,6 +73,7 @@ import 
org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.core.ConfigLoader;
 import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
 import org.apache.inlong.manager.service.source.StreamSourceOperator;
@@ -107,9 +108,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -144,10 +147,8 @@ public class AgentServiceImpl implements AgentService {
             new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
             new CallerRunsPolicy());
 
-    @Getter
-    private LoadingCache<TaskRequest, TaskResult> taskCache;
-    @Getter
-    private LoadingCache<AgentConfigRequest, AgentConfigInfo> agentConfigCache;
+    private Map<String, TaskResult> taskConfigMap = new ConcurrentHashMap<>();
+    private Map<String, AgentConfigInfo> agentConfigMap = new 
ConcurrentHashMap<>();
 
     @Getter
     private LoadingCache<ConfigRequest, ConfigResult> moduleConfigCache;
@@ -191,6 +192,8 @@ public class AgentServiceImpl implements AgentService {
     private PackageConfigEntityMapper packageConfigEntityMapper;
     @Autowired
     private InlongClusterService clusterService;
+    @Autowired
+    private ConfigLoader configLoader;
 
     /**
      * Start the update task
@@ -201,12 +204,6 @@ public class AgentServiceImpl implements AgentService {
         // The expiry time of cluster info cache must be greater than 
taskCache cache
         // because the eviction handler needs to query cluster info cache
         long expireTime = 10 * 5;
-        taskCache = Caffeine.newBuilder()
-                .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
-                .build(this::fetchTask);
-        agentConfigCache = Caffeine.newBuilder()
-                .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
-                .build(this::fetchAgentConfig);
         LOGGER.debug("start to reload config for installer.");
         try {
             moduleConfigCache = Caffeine.newBuilder()
@@ -215,6 +212,12 @@ public class AgentServiceImpl implements AgentService {
         } catch (Throwable t) {
             LOGGER.error("fail to reload all config for installer ", t);
         }
+        try {
+            reload();
+            setReloadTimer();
+        } catch (Exception e) {
+            LOGGER.error("load agent task config failed", e);
+        }
         LOGGER.debug("end to reload config for installer");
         if (updateTaskTimeoutEnabled) {
             ThreadFactory factory = new ThreadFactoryBuilder()
@@ -266,6 +269,11 @@ public class AgentServiceImpl implements AgentService {
         }
     }
 
+    private void setReloadTimer() {
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+        executorService.scheduleWithFixedDelay(this::reload, 60000L, 60000L, 
TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public Boolean reportSnapshot(TaskSnapshotRequest request) {
         return snapshotOperator.snapshot(request);
@@ -293,6 +301,42 @@ public class AgentServiceImpl implements AgentService {
         }
     }
 
+    public void reload() {
+        LOGGER.debug("start to reload agent task config.");
+        try {
+            Map<String, TaskResult> newTaskConfigMap = new 
ConcurrentHashMap<>();
+            Map<String, AgentConfigInfo> newAgentConfigMap = new 
ConcurrentHashMap<>();
+            List<AgentTaskConfigEntity> agentTaskConfigEntityList = 
configLoader.loadAllAgentTaskConfigEntity();
+            agentTaskConfigEntityList.forEach(agentTaskConfigEntity -> {
+                try {
+                    String key = agentTaskConfigEntity.getAgentIp() + 
InlongConstants.UNDERSCORE
+                            + agentTaskConfigEntity.getClusterName();
+                    TaskResult taskResult = 
JsonUtils.parseObject(agentTaskConfigEntity.getTaskParams(),
+                            TaskResult.class);
+                    if (taskResult != null) {
+                        
taskResult.setVersion(agentTaskConfigEntity.getVersion());
+                        newTaskConfigMap.putIfAbsent(key, taskResult);
+                    }
+                    AgentConfigInfo agentConfigInfo = 
JsonUtils.parseObject(agentTaskConfigEntity.getConfigParams(),
+                            AgentConfigInfo.class);
+                    if (agentConfigInfo != null) {
+                        
agentConfigInfo.setVersion(agentTaskConfigEntity.getVersion());
+                        newAgentConfigMap.putIfAbsent(key, agentConfigInfo);
+                    }
+                } catch (Exception e) {
+                    LOGGER.error("failed to get agent task config for agent 
ip={}, cluster name={}",
+                            agentTaskConfigEntity.getAgentIp(), 
agentTaskConfigEntity.getClusterName());
+                }
+
+            });
+            taskConfigMap = newTaskConfigMap;
+            agentConfigMap = newAgentConfigMap;
+        } catch (Throwable t) {
+            LOGGER.error("failed to reload all agent task config", t);
+        }
+        LOGGER.debug("end to reload agent task config");
+    }
+
     /**
      * Update task status by command.
      *
@@ -337,7 +381,8 @@ public class AgentServiceImpl implements AgentService {
     @Override
     public AgentConfigInfo getAgentConfig(AgentConfigRequest request) {
         LOGGER.debug("begin to get agent config info for {}", request);
-        AgentConfigInfo agentConfigInfo = agentConfigCache.get(request);
+        String key = request.getIp() + InlongConstants.UNDERSCORE + 
request.getClusterName();
+        AgentConfigInfo agentConfigInfo = agentConfigMap.get(key);
         if (agentConfigInfo == null) {
             return null;
         }
@@ -370,7 +415,8 @@ public class AgentServiceImpl implements AgentService {
     @Override
     public TaskResult getExistTaskConfig(TaskRequest request) {
         LOGGER.debug("begin to get all exist task by request={}", request);
-        TaskResult taskResult = taskCache.get(request);
+        String key = request.getAgentIp() + InlongConstants.UNDERSCORE + 
request.getClusterName();
+        TaskResult taskResult = taskConfigMap.get(key);
         if (taskResult == null) {
             return null;
         }
@@ -825,70 +871,6 @@ public class AgentServiceImpl implements AgentService {
         return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
     }
 
-    private TaskResult fetchTask(TaskRequest request) {
-        final String clusterName = request.getClusterName();
-        final String ip = request.getAgentIp();
-        final String uuid = request.getUuid();
-        List<StreamSourceEntity> normalSourceEntities = 
sourceMapper.selectByStatusAndCluster(
-                
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
-                clusterName, ip, uuid);
-        List<StreamSourceEntity> taskLists = new 
ArrayList<>(normalSourceEntities);
-        List<StreamSourceEntity> stopSourceEntities = 
sourceMapper.selectByStatusAndCluster(
-                
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
-                clusterName, ip, uuid);
-        taskLists.addAll(stopSourceEntities);
-        LOGGER.debug("success to add task : {}", taskLists.size());
-        List<DataConfig> runningTaskConfig = Lists.newArrayList();
-        try {
-            List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
-            if (CollectionUtils.isEmpty(taskLists)) {
-                return 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
-            }
-            for (StreamSourceEntity sourceEntity : taskLists) {
-                int op = getOp(sourceEntity.getStatus());
-                DataConfig dataConfig = getDataConfig(sourceEntity, op);
-                runningTaskConfig.add(dataConfig);
-            }
-            TaskResult taskResult = 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
-            String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
-            taskResult.setMd5(md5);
-            taskResult.setCode(AgentResponseCode.SUCCESS);
-            return taskResult;
-        } catch (Exception e) {
-            LOGGER.error("get all exist task failed:", e);
-            throw new BusinessException("get all exist task failed:" + 
e.getMessage());
-        }
-    }
-
-    private AgentConfigInfo fetchAgentConfig(AgentConfigRequest request) {
-        LOGGER.debug("begin to get agent config info for {}", request);
-        AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
-        Set<String> tagSet = new HashSet<>(16);
-        
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
-        List<String> clusterTagList = new ArrayList<>(tagSet);
-        ClusterPageRequest pageRequest = ClusterPageRequest.builder()
-                .type(ClusterType.AGENT_ZK)
-                .clusterTagList(clusterTagList)
-                .build();
-        List<InlongClusterEntity> agentZkCluster = 
clusterMapper.selectByCondition(pageRequest);
-        if (CollectionUtils.isNotEmpty(agentZkCluster)) {
-            agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
-        }
-
-        AgentClusterInfo clusterInfo = (AgentClusterInfo) 
clusterService.getOne(
-                null, request.getClusterName(), ClusterType.AGENT);
-        agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
-                .parentId(clusterInfo.getId())
-                .clusterName(clusterInfo.getName())
-                .build());
-        String jsonStr = GSON.toJson(agentConfigInfo);
-        String configMd5 = DigestUtils.md5Hex(jsonStr);
-        agentConfigInfo.setMd5(configMd5);
-        agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
-        LOGGER.debug("success to get agent config info for: {}, result: {}", 
request, agentConfigInfo);
-        return agentConfigInfo;
-    }
-
     private ConfigResult loadModuleConfigs(ConfigRequest request) {
         final String clusterName = request.getClusterName();
         final String ip = request.getLocalIp();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
similarity index 90%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
index 03b5f24977..649586bb21 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
 import org.apache.inlong.manager.dao.entity.ClusterConfigEntity;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.dao.entity.SortConfigEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ClusterConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
@@ -39,7 +41,7 @@ import 
org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.core.ConfigLoader;
 
 import org.apache.ibatis.cursor.Cursor;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -50,7 +52,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 @Service
-public class SortConfigLoaderImpl implements SortConfigLoader {
+public class ConfigLoaderImpl implements ConfigLoader {
 
     @Autowired
     private InlongClusterEntityMapper clusterEntityMapper;
@@ -72,6 +74,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader 
{
     private SortConfigEntityMapper sortConfigEntityMapper;
     @Autowired
     private ClusterConfigEntityMapper clusterConfigEntityMapper;
+    @Autowired
+    private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper;
 
     @Transactional
     @Override
@@ -180,4 +184,13 @@ public class SortConfigLoaderImpl implements 
SortConfigLoader {
         cursor.forEach(allClusterConfigs::add);
         return allClusterConfigs;
     }
+
+    @Transactional
+    @Override
+    public List<AgentTaskConfigEntity> loadAllAgentTaskConfigEntity() {
+        Cursor<AgentTaskConfigEntity> cursor = 
agentTaskConfigEntityMapper.selectAllAgentTaskConfigs();
+        List<AgentTaskConfigEntity> agentTaskConfigEntityList = new 
ArrayList<>();
+        cursor.forEach(agentTaskConfigEntityList::add);
+        return agentTaskConfigEntityList;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index 24ee49f757..36328007e1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -26,8 +26,8 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
+import org.apache.inlong.manager.service.core.ConfigLoader;
 import org.apache.inlong.manager.service.core.SortClusterService;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
 import org.apache.inlong.manager.service.node.DataNodeOperator;
 import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
 import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
@@ -93,7 +93,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
     private long reloadInterval;
 
     @Autowired
-    private SortConfigLoader sortConfigLoader;
+    private ConfigLoader configLoader;
     @Autowired
     private SinkOperatorFactory sinkOperatorFactory;
     @Autowired
@@ -171,16 +171,16 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
     private void reloadAllClusterConfig() {
         // load all fields info
-        List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
+        List<SortFieldInfo> fieldInfos = configLoader.loadAllFields();
         fieldMap = new HashMap<>();
         fieldInfos.forEach(info -> {
             List<String> fields = fieldMap.computeIfAbsent(info.getSinkId(), k 
-> new ArrayList<>());
             fields.add(info.getFieldName());
         });
 
-        List<StreamSinkEntity> sinkEntities = 
sortConfigLoader.loadAllStreamSinkEntity();
+        List<StreamSinkEntity> sinkEntities = 
configLoader.loadAllStreamSinkEntity();
         // get all task under a given cluster, has been reduced into cluster 
and task.
-        List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
+        List<SortTaskInfo> tasks = configLoader.loadAllTask();
         Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
                 .filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName())
                         && StringUtils.isNotBlank(dto.getSortTaskName())
@@ -189,7 +189,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
 
         // reload all streams
-        allStreams = sortConfigLoader.loadAllStreams()
+        allStreams = configLoader.loadAllStreams()
                 .stream()
                 
.collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
                         
Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
@@ -202,7 +202,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 
.collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName));
 
         // get all data nodes and group by node name
-        List<DataNodeEntity> dataNodeEntities = 
sortConfigLoader.loadAllDataNodeEntity();
+        List<DataNodeEntity> dataNodeEntities = 
configLoader.loadAllDataNodeEntity();
         Map<String, DataNodeInfo> task2DataNodeMap = dataNodeEntities.stream()
                 .filter(entity -> StringUtils.isNotBlank(entity.getName()))
                 .map(entity -> {
@@ -300,6 +300,6 @@ public class SortClusterServiceImpl implements 
SortClusterService {
      */
     private void setReloadTimer() {
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
-        executorService.scheduleAtFixedRate(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
+        executorService.scheduleWithFixedDelay(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 2f87d3b92e..d01911acb9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -42,8 +42,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.core.ConfigLoader;
 import org.apache.inlong.manager.service.core.SortClusterService;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
 import org.apache.inlong.manager.service.core.SortService;
 import org.apache.inlong.manager.service.core.SortSourceService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -99,7 +99,7 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
     @Autowired
     private InlongStreamService streamService;
     @Autowired
-    private SortConfigLoader configLoader;
+    private ConfigLoader configLoader;
     @Autowired
     private DataNodeOperatorFactory dataNodeOperatorFactory;
     /**
@@ -152,7 +152,7 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
     private void setReloadTimer() {
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
         long reloadInterval = 60000L;
-        executorService.scheduleAtFixedRate(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
+        executorService.scheduleWithFixedDelay(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 247f270685..9d6fe9ceb5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -33,7 +33,7 @@ import 
org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.core.ConfigLoader;
 import org.apache.inlong.manager.service.core.SortSourceService;
 
 import com.google.gson.Gson;
@@ -111,7 +111,7 @@ public class SortSourceServiceImpl implements 
SortSourceService {
     private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> 
streamSinkMap;
 
     @Autowired
-    private SortConfigLoader configLoader;
+    private ConfigLoader configLoader;
 
     @PostConstruct
     public void initialize() {
@@ -458,6 +458,6 @@ public class SortSourceServiceImpl implements 
SortSourceService {
     private void setReloadTimer() {
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
         long reloadInterval = 60000L;
-        executorService.scheduleAtFixedRate(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
+        executorService.scheduleWithFixedDelay(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index 5069423c13..ba729202af 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.plugin.PluginBinder;
 import 
org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener;
 import 
org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener;
 import 
org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener;
+import org.apache.inlong.manager.service.listener.source.SourceStartListener;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
 import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
@@ -60,6 +61,8 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
     private StreamSortConfigListener streamSortConfigListener;
     @Autowired
     private StreamSinkResourceListener sinkResourceListener;
+    @Autowired
+    private SourceStartListener sourceStartListener;
 
     @PostConstruct
     public void init() {
@@ -70,6 +73,7 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
         sortOperateListeners.add(streamSortConfigListener);
         sinkOperateListeners = new LinkedList<>();
         sinkOperateListeners.add(sinkResourceListener);
+        sourceOperateListeners.add(sourceStartListener);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java
new file mode 100644
index 0000000000..2ee6f504ae
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.listener.source;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.source.SourceOperatorFactory;
+import org.apache.inlong.manager.service.source.StreamSourceOperator;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Slf4j
+@Component
+public class SourceStartListener implements SourceOperateListener {
+
+    @Autowired
+    protected StreamSourceService streamSourceService;
+    @Autowired
+    private SourceOperatorFactory operatorFactory;
+
+    @Override
+    public String name() {
+        return getClass().getSimpleName();
+    }
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public boolean accept(WorkflowContext context) {
+        if (isGroupProcessForm(context)) {
+            return false;
+        }
+        StreamResourceProcessForm processForm = (StreamResourceProcessForm) 
context.getProcessForm();
+        return 
InlongConstants.STANDARD_MODE.equals(processForm.getGroupInfo().getInlongGroupMode())
+                && processForm.getGroupOperateType() == GroupOperateType.INIT;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws Exception {
+        StreamResourceProcessForm form = (StreamResourceProcessForm) 
context.getProcessForm();
+        String operator = context.getOperator();
+        InlongStreamInfo streamInfo = form.getStreamInfo();
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        log.info("begin to update agent task config for groupId={}, 
streamId={}", groupId, streamId);
+        List<StreamSource> sources = streamSourceService.listSource(groupId, 
streamId);
+        for (StreamSource source : sources) {
+            SourceRequest request = source.genSourceRequest();
+            StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
+            sourceOperator.updateAgentTaskConfig(request, operator);
+        }
+        log.info("success to update agent task config for groupId={}, 
streamId={}", groupId, streamId);
+        return ListenerResult.success();
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index d4c951c209..d4087d8976 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -46,7 +46,7 @@ import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
 import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
 import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.core.ConfigLoader;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Sets;
@@ -122,7 +122,7 @@ public class DataProxyConfigRepository implements 
IRepository {
     @Autowired
     private StreamSinkEntityMapper streamSinkMapper;
     @Autowired
-    private SortConfigLoader sortConfigLoader;
+    private ConfigLoader configLoader;
 
     @PostConstruct
     public void initialize() {
@@ -364,7 +364,7 @@ public class DataProxyConfigRepository implements 
IRepository {
         Map<String, Map<String, String>> groupParams = new HashMap<>();
         groupIdMap.forEach((k, v) -> groupParams.put(k, 
fromJsonToMap(v.getExtParams())));
         // reload inlong group ext
-        List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
+        List<InlongGroupExtEntity> groupExtCursor = configLoader
                 .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
         groupExtCursor.forEach(v -> 
groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
                 .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
@@ -390,7 +390,7 @@ public class DataProxyConfigRepository implements 
IRepository {
             streamParams.put(k, params);
         });
         // reload inlong stream ext
-        List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
+        List<InlongStreamExtEntity> streamExtCursor = configLoader
                 .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
         streamExtCursor.forEach(v -> streamParams
                 .computeIfAbsent(getInlongId(v.getInlongGroupId(), 
v.getInlongStreamId()), k -> new HashMap<>())
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 39d3845209..2ecd79fb37 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -17,28 +17,58 @@
 
 package org.apache.inlong.manager.service.source;
 
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
+import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
+import org.apache.inlong.common.pojo.agent.CmdConfig;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
+import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.node.DataNodeService;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -48,8 +78,17 @@ import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
 
 /**
  * Default operator of stream source.
@@ -57,6 +96,8 @@ import java.util.Objects;
 public abstract class AbstractSourceOperator implements StreamSourceOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSourceOperator.class);
+    private static final Gson GSON = new Gson();
+    private static final int MODULUS_100 = 100;
 
     @Autowired
     protected StreamSourceEntityMapper sourceMapper;
@@ -66,6 +107,18 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
     protected InlongStreamFieldEntityMapper streamFieldMapper;
     @Autowired
     protected DataNodeService dataNodeService;
+    @Autowired
+    private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper;
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongClusterEntityMapper clusterMapper;
+    @Autowired
+    private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     /**
      * Getting the source type.
@@ -109,6 +162,9 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         if (request.getEnableSyncSchema()) {
             syncSourceFieldInfo(request, operator);
         }
+        if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+            updateAgentTaskConfig(request, operator);
+        }
         return entity.getId();
     }
 
@@ -207,6 +263,9 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         }
         updateFieldOpt(entity, request.getFieldList());
         LOGGER.debug("success to update source of type={}", 
request.getSourceType());
+        if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+            updateAgentTaskConfig(request, operator);
+        }
     }
 
     @Override
@@ -232,6 +291,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
                     curEntity.getVersion());
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
+        updateAgentTaskConfig(request, operator);
     }
 
     @Override
@@ -346,4 +406,219 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
     public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
         throw new BusinessException(String.format("not support data add task 
for type =%s", request.getSourceType()));
     }
+
+    @Override
+    public void updateAgentTaskConfig(SourceRequest request, String operator) {
+        try {
+            if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
+                return;
+            }
+            final String clusterName = request.getInlongClusterName();
+            final String ip = request.getAgentIp();
+            final String uuid = request.getUuid();
+            if (StringUtils.isBlank(clusterName) || StringUtils.isBlank(ip)) {
+                LOGGER.warn("skip update agent task config where cluster name 
or ip is null for request={}", request);
+                return;
+            }
+            AgentTaskConfigEntity existEntity = 
agentTaskConfigEntityMapper.selectByIdentifier(ip, clusterName);
+            AgentTaskConfigEntity agentTaskConfigEntity = new 
AgentTaskConfigEntity();
+            if (existEntity != null) {
+                agentTaskConfigEntity = 
CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true);
+            }
+            List<StreamSourceEntity> normalSourceEntities = 
sourceMapper.selectByStatusAndCluster(
+                    
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode)
+                            .collect(Collectors.toList()),
+                    clusterName, ip, uuid);
+            List<StreamSourceEntity> taskLists = new 
ArrayList<>(normalSourceEntities);
+            List<StreamSourceEntity> stopSourceEntities = 
sourceMapper.selectByStatusAndCluster(
+                    
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode)
+                            .collect(Collectors.toList()),
+                    clusterName, ip, uuid);
+            taskLists.addAll(stopSourceEntities);
+            LOGGER.debug("success to add task : {}", taskLists.size());
+            List<DataConfig> runningTaskConfig = Lists.newArrayList();
+            List<CmdConfig> cmdConfigs = 
sourceCmdConfigMapper.queryCmdByAgentIp(request.getAgentIp()).stream()
+                    .map(cmd -> {
+                        CmdConfig cmdConfig = new CmdConfig();
+                        cmdConfig.setDataTime(cmd.getSpecifiedDataTime());
+                        cmdConfig.setOp(cmd.getCmdType());
+                        cmdConfig.setId(cmd.getId());
+                        cmdConfig.setTaskId(cmd.getTaskId());
+                        return cmdConfig;
+                    }).collect(Collectors.toList());
+            if (CollectionUtils.isEmpty(taskLists)) {
+                
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
+                
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+                return;
+            }
+            for (StreamSourceEntity sourceEntity : taskLists) {
+                int op = sourceEntity.getStatus() % MODULUS_100;
+                DataConfig dataConfig = getDataConfig(sourceEntity, op);
+                runningTaskConfig.add(dataConfig);
+            }
+            TaskResult taskResult =
+                    
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+            String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
+            taskResult.setMd5(md5);
+            taskResult.setCode(AgentResponseCode.SUCCESS);
+            agentTaskConfigEntity.setAgentIp(request.getAgentIp());
+            
agentTaskConfigEntity.setClusterName(request.getInlongClusterName());
+            
agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult));
+
+            LOGGER.debug("begin to get agent config info for {}", request);
+            Set<String> tagSet = new HashSet<>(16);
+            InlongGroupEntity groupEntity =
+                    
groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId());
+            String clusterTag = groupEntity.getInlongClusterTag();
+            InlongClusterEntity agentClusterInfo = 
clusterMapper.selectByNameAndType(request.getInlongClusterName(),
+                    ClusterType.AGENT);
+            AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
+                    .cluster(AgentConfigInfo.AgentClusterInfo.builder()
+                            .parentId(agentClusterInfo.getId())
+                            .clusterName(agentClusterInfo.getName())
+                            .build())
+                    .build();
+            if (StringUtils.isNotBlank(clusterTag)) {
+                
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
+                List<String> clusterTagList = new ArrayList<>(tagSet);
+                ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+                        .type(ClusterType.AGENT_ZK)
+                        .clusterTagList(clusterTagList)
+                        .build();
+                List<InlongClusterEntity> agentZkCluster = 
clusterMapper.selectByCondition(pageRequest);
+                if (CollectionUtils.isNotEmpty(agentZkCluster)) {
+                    agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+                }
+            }
+
+            String jsonStr = GSON.toJson(agentConfigInfo);
+            String configMd5 = DigestUtils.md5Hex(jsonStr);
+            agentConfigInfo.setMd5(configMd5);
+            agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
+            
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
+            agentClusterInfo.setModifier(operator);
+            if (existEntity == null) {
+                agentTaskConfigEntity.setCreator(operator);
+                agentTaskConfigEntityMapper.insert(agentTaskConfigEntity);
+            } else {
+                
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+            }
+            LOGGER.debug("success to update agent config info for: {}, result: 
{}", request, agentConfigInfo);
+        } catch (Exception e) {
+            String errMsg = String.format("update agent task config failed for 
groupId=%s, streamId=%s, ip=%s",
+                    request.getInlongGroupId(), request.getInlongStreamId(), 
request.getAgentIp());
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
+    private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setIp(entity.getAgentIp());
+        dataConfig.setUuid(entity.getUuid());
+        dataConfig.setOp(String.valueOf(op));
+        dataConfig.setTaskId(entity.getId());
+        dataConfig.setTaskType(getTaskType(entity));
+        dataConfig.setTaskName(entity.getSourceName());
+        dataConfig.setSnapshot(entity.getSnapshot());
+        dataConfig.setTimeZone(entity.getDataTimeZone());
+        dataConfig.setVersion(entity.getVersion());
+
+        String groupId = entity.getInlongGroupId();
+        String streamId = entity.getInlongStreamId();
+        dataConfig.setInlongGroupId(groupId);
+        dataConfig.setInlongStreamId(streamId);
+
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
+        InlongStreamEntity streamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
+        String extParams = getExtParams(entity);
+        if (groupEntity != null && streamEntity != null) {
+            dataConfig.setState(
+                    
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
+                            ? TaskStateEnum.RUNNING.getType()
+                            : TaskStateEnum.FROZEN.getType());
+            dataConfig.setSyncSend(streamEntity.getSyncSend());
+            if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) {
+                String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
+                FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, 
FileSourceDTO.class);
+                if (Objects.nonNull(fileSourceDTO)) {
+                    fileSourceDTO.setDataSeparator(dataSeparator);
+                    
dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
+                    
fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
+                    extParams = JsonUtils.toJsonString(fileSourceDTO);
+                }
+            }
+            InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+            // Processing extParams
+            unpackExtParams(streamEntity.getExtParams(), streamInfo);
+            dataConfig.setPredefinedFields(streamInfo.getPredefinedFields());
+
+            int dataReportType = groupEntity.getDataReportType();
+            dataConfig.setDataReportType(dataReportType);
+            if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
+                // add mq cluster setting
+                List<MQClusterInfo> mqSet = new ArrayList<>();
+                List<String> clusterTagList = 
Collections.singletonList(groupEntity.getInlongClusterTag());
+                ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+                        .type(groupEntity.getMqType())
+                        .clusterTagList(clusterTagList)
+                        .build();
+                List<InlongClusterEntity> mqClusterList = 
clusterMapper.selectByCondition(pageRequest);
+                for (InlongClusterEntity cluster : mqClusterList) {
+                    MQClusterInfo clusterInfo = new MQClusterInfo();
+                    clusterInfo.setUrl(cluster.getUrl());
+                    clusterInfo.setToken(cluster.getToken());
+                    clusterInfo.setMqType(cluster.getType());
+                    
clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), 
HashMap.class));
+                    mqSet.add(clusterInfo);
+                }
+                dataConfig.setMqClusters(mqSet);
+
+                // add topic setting
+                String mqResource = groupEntity.getMqResource();
+                String mqType = groupEntity.getMqType();
+                if (MQType.PULSAR.equals(mqType) || 
MQType.TDMQ_PULSAR.equals(mqType)) {
+                    // first get the tenant from the InlongGroup, and then get 
it from the PulsarCluster.
+                    InlongPulsarDTO pulsarDTO = 
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+                    String tenant = pulsarDTO.getPulsarTenant();
+                    if (StringUtils.isBlank(tenant)) {
+                        // If there are multiple Pulsar clusters, take the 
first one.
+                        // Note that the tenants in multiple Pulsar clusters 
must be identical.
+                        PulsarClusterDTO pulsarCluster = 
PulsarClusterDTO.getFromJson(
+                                mqClusterList.get(0).getExtParams());
+                        tenant = pulsarCluster.getPulsarTenant();
+                    }
+
+                    String topic = 
String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+                            tenant, mqResource, streamEntity.getMqResource());
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId + "/" + streamId);
+                    topicConfig.setTopic(topic);
+                    dataConfig.setTopicInfo(topicConfig);
+                } else if (MQType.TUBEMQ.equals(mqType)) {
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId);
+                    topicConfig.setTopic(mqResource);
+                    dataConfig.setTopicInfo(topicConfig);
+                } else if (MQType.KAFKA.equals(mqType)) {
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId);
+                    topicConfig.setTopic(groupEntity.getMqResource() + DOT + 
streamEntity.getMqResource());
+                    dataConfig.setTopicInfo(topicConfig);
+                }
+            } else {
+                LOGGER.warn("set syncSend=[0] as the stream not exists for 
groupId={}, streamId={}", groupId, streamId);
+            }
+        }
+        dataConfig.setExtParams(extParams);
+        return dataConfig;
+    }
+
+    private int getTaskType(StreamSourceEntity sourceEntity) {
+        TaskTypeEnum taskType = 
SourceType.SOURCE_TASK_MAP.get(sourceEntity.getSourceType());
+        if (taskType == null) {
+            throw new BusinessException("Unsupported task type for source type 
" + sourceEntity.getSourceType());
+        }
+        return taskType.getType();
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 997f39b867..e820140ae6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -136,4 +136,12 @@ public interface StreamSourceOperator {
      */
     Integer addDataAddTask(DataAddTaskRequest request, String operator);
 
+    /**
+     * Update the agent task config info.
+     *
+     * @param request source request
+     * @param operator name of the operator
+     */
+    void updateAgentTaskConfig(SourceRequest request, String operator);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
index a76328ee4d..37953f1276 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
@@ -116,7 +116,7 @@ public class TemplateServiceImpl implements TemplateService 
{
         TemplateEntity templateEntity = 
templateEntityMapper.selectByName(templateName);
         if (templateEntity == null) {
             LOGGER.error("inlong template not found by template name={}", 
templateName);
-            throw new BusinessException(ErrorCodeEnum.TEMPLATE_INFO_INCORRECT);
+            throw new BusinessException(ErrorCodeEnum.TEMPLATE_NOT_FOUND);
         }
 
         TemplateInfo templateInfo = 
CommonBeanUtils.copyProperties(templateEntity, TemplateInfo::new);
@@ -182,7 +182,7 @@ public class TemplateServiceImpl implements TemplateService 
{
         TemplateEntity templateEntity = 
templateEntityMapper.selectByName(templateName);
         if (templateEntity == null) {
             LOGGER.error("inlong template not found by template name={}", 
templateName);
-            throw new BusinessException(ErrorCodeEnum.TEMPLATE_INFO_INCORRECT);
+            throw new BusinessException(ErrorCodeEnum.TEMPLATE_NOT_FOUND);
         }
 
         if (templateEntity.getInCharges().contains(operator)) {
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 8a0bcbf9c9..531a639d4a 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -901,6 +901,26 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
     UNIQUE KEY `unique_clustert_config_sink_id` (`cluster_tag`, `is_deleted`)
 );
 
+-- ----------------------------
+-- Table structure for agent_task_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `agent_task_config`
+(
+    `id`                  int(11)       NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `config_params`       text          DEFAULT NULL COMMENT 'The agent config 
params',
+    `task_params`         text          NOT NULL COMMENT 'The agent task 
config params',
+    `agent_ip`            varchar(128)  NOT NULL COMMENT 'agent ip',
+    `cluster_name`        varchar(128)  NOT NULL COMMENT 'Inlong cluster name',
+    `creator`             varchar(128)  DEFAULT NULL COMMENT 'Creator',
+    `modifier`            varchar(128)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`         datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`         datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`          int(11)       DEFAULT '0' COMMENT 'Whether to 
delete, 0 is not deleted, if greater than 0, delete',
+    `version`             int(11)       NOT NULL DEFAULT '1' COMMENT 'Version 
number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`, 
`cluster_name`, `is_deleted`)
+);
+
 -- ----------------------------
 -- Table structure for template
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index dfb6420dc9..430982df28 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -952,6 +952,27 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
 ) ENGINE = InnoDB
     DEFAULT CHARSET = utf8mb4 COMMENT = 'cluster_config';
 
+-- ----------------------------
+-- Table structure for agent_task_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `agent_task_config`
+(
+    `id`                  int(11)       NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `config_params`       text          DEFAULT NULL COMMENT 'The agent config 
params',
+    `task_params`         text          NOT NULL COMMENT 'The agent task 
config params',
+    `agent_ip`            varchar(128)  NOT NULL COMMENT 'agent ip',
+    `cluster_name`        varchar(128)  NOT NULL COMMENT 'Inlong cluster name',
+    `creator`             varchar(128)  DEFAULT NULL COMMENT 'Creator',
+    `modifier`            varchar(128)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`         datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`         datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`          int(11)       DEFAULT '0' COMMENT 'Whether to 
delete, 0 is not deleted, if greater than 0, delete',
+    `version`             int(11)       NOT NULL DEFAULT '1' COMMENT 'Version 
number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`, 
`cluster_name`, `is_deleted`)
+) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT = 'agent_task_config';
+
 -- ----------------------------
 -- Table structure for template
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql 
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index 5000782285..166c98f199 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -118,3 +118,24 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     ) ENGINE = InnoDB
     DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
 -- ----------------------------
+
+-- ----------------------------
+-- Table structure for agent_task_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `agent_task_config`
+(
+    `id`                  int(11)       NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `config_params`       text          DEFAULT NULL COMMENT 'The agent config 
params',
+    `task_params`         text          NOT NULL COMMENT 'The agent task 
config params',
+    `agent_ip`            varchar(128)  NOT NULL COMMENT 'agent ip',
+    `cluster_name`        varchar(128)  NOT NULL COMMENT 'Inlong cluster name',
+    `creator`             varchar(128)  DEFAULT NULL COMMENT 'Creator',
+    `modifier`            varchar(128)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`         datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`         datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`          int(11)       DEFAULT '0' COMMENT 'Whether to 
delete, 0 is not deleted, if greater than 0, delete',
+    `version`             int(11)       NOT NULL DEFAULT '1' COMMENT 'Version 
number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`, 
`cluster_name`, `is_deleted`)
+) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT = 'agent_task_config';

Reply via email to