This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d66775d01 [INLONG-7987][Manager] Add heartbeat timeout status to the 
source (#7989)
d66775d01 is described below

commit d66775d015d6ad61e6891d11aba1bf4fbc27d4ca
Author: fuweng11 <[email protected]>
AuthorDate: Mon May 15 19:08:58 2023 +0800

    [INLONG-7987][Manager] Add heartbeat timeout status to the source (#7989)
    
    Co-authored-by: healchow <[email protected]>
---
 CHANGES.md                                         |  1 +
 .../manager/common/enums/SimpleSourceStatus.java   |  1 +
 .../inlong/manager/common/enums/SourceStatus.java  | 63 ++++++++++++++--------
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |  5 ++
 .../dao/mapper/StreamSourceEntityMapper.java       | 22 ++++++++
 .../mappers/InlongClusterNodeEntityMapper.xml      | 18 ++++++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 59 ++++++++++++++++++--
 .../manager/pojo/cluster/BindTagRequest.java       |  2 +-
 .../manager/pojo/cluster/ClusterTagRequest.java    |  2 +-
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterServiceImpl.java  |  2 +-
 .../service/core/impl/AgentServiceImpl.java        | 63 +++++++++++++++++++++-
 .../service/heartbeat/HeartbeatManager.java        | 27 +++++++++-
 .../service/heartbeat/HeartbeatServiceImpl.java    |  2 +
 .../source/AbstractSourceOperateListener.java      |  6 ++-
 .../service/source/AbstractSourceOperator.java     | 11 ++--
 .../src/main/resources/application-dev.properties  |  5 ++
 .../src/main/resources/application-prod.properties |  5 ++
 .../src/main/resources/application-test.properties |  4 ++
 19 files changed, 262 insertions(+), 40 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5e350802d..bac536234 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -50,6 +50,7 @@
 | [INLONG-8006](https://github.com/apache/inlong/issues/8006) | 
[Improve][Manager] Set displayname for the auto-registered cluster              
               |
 | [INLONG-7999](https://github.com/apache/inlong/issues/7999) | 
[Improve][Manager] Support PostgreSQL data node                                 
               |
 | [INLONG-7996](https://github.com/apache/inlong/issues/7996) | 
[Improve][Manager] Support issued kafka consumer group to sort                  
               |
+| [INLONG-7987](https://github.com/apache/inlong/issues/7987) | 
[Improve][Manager] Add a heartbeat timeout status to the source                 
               |
 | [INLONG-7981](https://github.com/apache/inlong/issues/7981) | [Bug][Manager] 
Failed to stop source correctly when suspend a group                            
|
 | [INLONG-7948](https://github.com/apache/inlong/issues/7948) | 
[Improve][Manager] Add user authentication when operate inlong consume          
               |
 | [INLONG-7946](https://github.com/apache/inlong/issues/7946) | 
[Improve][Manager] Add user authentication when bind clusterTag                 
               |
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
index e361daadc..7c72f916e 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
@@ -44,6 +44,7 @@ public enum SimpleSourceStatus {
             case SOURCE_STOP:
                 return FROZEN;
             case SOURCE_FAILED:
+            case HEARTBEAT_TIMEOUT:
                 return FAILED;
             case TO_BE_ISSUED_DELETE:
             case BEEN_ISSUED_DELETE:
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
index 99118bebb..306a9e60c 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
@@ -34,6 +34,7 @@ public enum SourceStatus {
     SOURCE_NORMAL(101, "normal"),
     SOURCE_FAILED(102, "failed"),
     SOURCE_STOP(104, "stop"),
+    HEARTBEAT_TIMEOUT(105, "heartbeat timeout"),
 
     // if not approved
     SOURCE_NEW(110, "new created"),
@@ -76,7 +77,7 @@ public enum SourceStatus {
      */
     public static final Set<Integer> ALLOWED_UPDATE = Sets.newHashSet(
             SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), 
SOURCE_STOP.getCode(),
-            SOURCE_NORMAL.getCode());
+            SOURCE_NORMAL.getCode(), HEARTBEAT_TIMEOUT.getCode());
 
     public static final Set<SourceStatus> TOBE_ISSUED_SET = Sets.newHashSet(
             TO_BE_ISSUED_ADD, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY,
@@ -87,59 +88,75 @@ public enum SourceStatus {
 
     static {
         // new
-        SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_DISABLE, 
SOURCE_NEW, TO_BE_ISSUED_ADD));
+        SOURCE_STATE_AUTOMATON.put(SOURCE_NEW,
+                Sets.newHashSet(SOURCE_DISABLE, SOURCE_NEW, TO_BE_ISSUED_ADD, 
HEARTBEAT_TIMEOUT));
 
         // normal
         SOURCE_STATE_AUTOMATON.put(SOURCE_NORMAL,
                 Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, 
TO_BE_ISSUED_DELETE,
                         TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, 
TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
-                        TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, 
TO_BE_ISSUED_MAKEUP));
+                        TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, 
TO_BE_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT));
 
         // failed
-        SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED, 
Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
+        SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED,
+                Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, 
TO_BE_ISSUED_RETRY, HEARTBEAT_TIMEOUT));
 
         // frozen
-        SOURCE_STATE_AUTOMATON.put(SOURCE_STOP, 
Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, TO_BE_ISSUED_ACTIVE));
+        SOURCE_STATE_AUTOMATON.put(SOURCE_STOP,
+                Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, 
TO_BE_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT));
 
         // [xxx] bo be issued
-        HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, 
SOURCE_DISABLE);
+        HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, 
SOURCE_DISABLE, HEARTBEAT_TIMEOUT);
         tobeAddNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext);
-        HashSet<SourceStatus> tobeDeleteNext = 
Sets.newHashSet(BEEN_ISSUED_DELETE);
+        HashSet<SourceStatus> tobeDeleteNext = 
Sets.newHashSet(BEEN_ISSUED_DELETE, HEARTBEAT_TIMEOUT);
         tobeDeleteNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, 
Sets.newHashSet(tobeDeleteNext));
-        HashSet<SourceStatus> tobeRetryNext = 
Sets.newHashSet(BEEN_ISSUED_RETRY);
+        HashSet<SourceStatus> tobeRetryNext = 
Sets.newHashSet(BEEN_ISSUED_RETRY, HEARTBEAT_TIMEOUT);
         tobeRetryNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, 
Sets.newHashSet(tobeRetryNext));
-        HashSet<SourceStatus> tobeBacktrackNext = 
Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
+        HashSet<SourceStatus> tobeBacktrackNext = 
Sets.newHashSet(BEEN_ISSUED_BACKTRACK, HEARTBEAT_TIMEOUT);
         tobeBacktrackNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, 
Sets.newHashSet(tobeBacktrackNext));
-        HashSet<SourceStatus> tobeFrozenNext = 
Sets.newHashSet(BEEN_ISSUED_STOP);
+        HashSet<SourceStatus> tobeFrozenNext = 
Sets.newHashSet(BEEN_ISSUED_STOP, HEARTBEAT_TIMEOUT);
         tobeFrozenNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_STOP, 
Sets.newHashSet(tobeFrozenNext));
-        HashSet<SourceStatus> tobeActiveNext = 
Sets.newHashSet(BEEN_ISSUED_ACTIVE);
+        HashSet<SourceStatus> tobeActiveNext = 
Sets.newHashSet(BEEN_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT);
         tobeActiveNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, 
Sets.newHashSet(tobeActiveNext));
-        HashSet<SourceStatus> tobeCheckNext = 
Sets.newHashSet(BEEN_ISSUED_CHECK);
+        HashSet<SourceStatus> tobeCheckNext = 
Sets.newHashSet(BEEN_ISSUED_CHECK, HEARTBEAT_TIMEOUT);
         tobeCheckNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, 
Sets.newHashSet(tobeCheckNext));
-        HashSet<SourceStatus> tobeRedoMetricNext = 
Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
+        HashSet<SourceStatus> tobeRedoMetricNext = 
Sets.newHashSet(BEEN_ISSUED_REDO_METRIC, HEARTBEAT_TIMEOUT);
         tobeRedoMetricNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, 
Sets.newHashSet(tobeRedoMetricNext));
-        HashSet<SourceStatus> tobeMakeupNext = 
Sets.newHashSet(BEEN_ISSUED_MAKEUP);
+        HashSet<SourceStatus> tobeMakeupNext = 
Sets.newHashSet(BEEN_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT);
         tobeMakeupNext.addAll(TOBE_ISSUED_SET);
         SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, 
Sets.newHashSet(tobeMakeupNext));
 
         // [xxx] been issued
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE,
+                Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, 
HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK,
+                Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, 
HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE,
+                Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, 
HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, 
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC,
+                Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, 
HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP,
+                Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, 
HEARTBEAT_TIMEOUT));
+        SOURCE_STATE_AUTOMATON.put(HEARTBEAT_TIMEOUT,
+                Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, 
SOURCE_STOP, TO_BE_ISSUED_ADD,
+                        TO_BE_ISSUED_DELETE,
+                        TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, 
TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
+                        TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, 
TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
+                        BEEN_ISSUED_DELETE, BEEN_ISSUED_RETRY, 
BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_STOP,
+                        BEEN_ISSUED_ACTIVE, BEEN_ISSUED_CHECK, 
BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP,
+                        HEARTBEAT_TIMEOUT));
     }
 
     private final Integer code;
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 2c7537577..c85c83712 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -47,6 +47,11 @@ public interface InlongClusterNodeEntityMapper {
 
     int updateByIdSelective(InlongClusterNodeEntity record);
 
+    /**
+     * Update the status to `nextStatus` by the given id.
+     */
+    int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer 
nextStatus, @Param("status") Integer status);
+
     int deleteById(Integer id);
 
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 99a06ac34..6d8610ab3 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -125,6 +125,13 @@ public interface StreamSourceEntityMapper {
     List<Integer> 
selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String 
clusterName,
             @Param("nodeName") String nodeName, @Param("sourceType") String 
sourceType);
 
+    /**
+     * Query need update tasks by the given status list and type List.
+     */
+    List<Integer> selectHeartbeatTimeoutIds(@Param("sourceTypeList") 
List<String> sourceTypeList,
+            @Param("agentIp") String agentIp,
+            @Param("clusterName") String clusterName);
+
     int updateByPrimaryKeySelective(StreamSourceEntity record);
 
     int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") 
String streamId,
@@ -164,6 +171,21 @@ public interface StreamSourceEntityMapper {
     void updateStatusByIds(@Param("idList") List<Integer> idList, 
@Param("status") Integer status,
             @Param("operator") String operator);
 
+    /**
+     * Update the source status
+     *
+     * @param idList source id list
+     * @param operator operator name
+     */
+    void rollbackTimeoutStatusByIds(@Param("idList") List<Integer> idList, 
@Param("operator") String operator);
+
+    /**
+     * Update the source status when it has been deleted
+     *
+     * @param beforeSeconds the modified time was beforeSeconds seconds ago
+     */
+    void updateStatusToTimeout(@Param("beforeSeconds") Integer beforeSeconds);
+
     /**
      * Physical delete stream sources by group id and stream id
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 2dd8b216f..7bf692fe5 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -201,7 +201,23 @@
         where id = #{id,jdbcType=INTEGER}
         and version = #{version,jdbcType=INTEGER}
     </update>
-
+    <update id="updateStatus">
+        update inlong_cluster_node
+        <set>
+            <if test="nextStatus != null">
+                status = #{nextStatus,jdbcType=INTEGER}
+            </if>
+        </set>
+        <where>
+            is_deleted = 0
+            <if test="status != null">
+                and status = #{status,jdbcType=INTEGER}
+            </if>
+            <if test="id != null">
+                and id = #{id,jdbcType=INTEGER}
+            </if>
+        </where>
+    </update>
     <delete id="deleteById" parameterType="java.lang.Integer">
         delete
         from inlong_cluster_node
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7c2df0199..d29200b87 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -342,6 +342,24 @@
             </if>
         </where>
     </select>
+    <select id="selectHeartbeatTimeoutIds" resultType="java.lang.Integer">
+        select id
+        from stream_source
+        <where>
+            is_deleted = 0
+            and status = 105
+            <if test="sourceTypeList != null and sourceTypeList.size()>0">
+                and source_type in
+                <foreach item="item" index="index" collection="sourceTypeList" 
open="(" close=")" separator=",">
+                    #{item}
+                </foreach>
+            </if>
+            <if test="agentIp != null and agentIp != ''">
+                and agent_ip = #{agentIp, jdbcType=VARCHAR}
+            </if>
+            and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+        </where>
+    </select>
     <update id="updateByRelatedId">
         update stream_source
         <set>
@@ -484,9 +502,11 @@
         update stream_source
         <set>
             previous_status = status,
-            status          = #{status, jdbcType=INTEGER},
-            modifier        = #{operator, jdbcType=VARCHAR},
-            version         = version + 1
+            status = #{status, jdbcType=INTEGER},
+            <if test="operator != null and operator !=''">
+                modifier = #{operator, jdbcType=VARCHAR},
+            </if>
+            version = version + 1
         </set>
         <where>
             is_deleted = 0
@@ -498,6 +518,39 @@
             </if>
         </where>
     </update>
+    <update id="rollbackTimeoutStatusByIds">
+        update stream_source
+        <set>
+            status = previous_status,
+            previous_status = 105,
+            <if test="operator != null and operator !=''">
+                modifier = #{operator, jdbcType=VARCHAR},
+            </if>
+            version = version + 1
+        </set>
+        <where>
+            is_deleted = 0
+            and status = 105
+            <if test="idList != null and idList.size() > 0">
+                and id in
+                <foreach item="item" index="index" collection="idList" 
open="(" close=")" separator=",">
+                    #{item}
+                </foreach>
+            </if>
+        </where>
+    </update>
+    <update id="updateStatusToTimeout">
+        update stream_source
+        <set>
+            previous_status = status,
+            status = 105
+        </set>
+        <where>
+            is_deleted = 0
+            and modify_time &lt;= DATE_ADD(NOW(), INTERVAL -#{beforeSeconds, 
jdbcType=INTEGER} SECOND)
+            and status in (200, 201, 202, 203, 204, 205, 206, 207, 208)
+        </where>
+    </update>
 
     <delete id="deleteByRelatedId">
         delete
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
index b1aff06ae..0d6d5d68e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
@@ -38,7 +38,7 @@ public class BindTagRequest {
     @ApiModelProperty(value = "Cluster tag")
     @NotBlank(message = "clusterTag cannot be blank")
     @Length(min = 1, max = 128, message = "length must be between 1 and 128")
-    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
+    @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
     private String clusterTag;
 
     @ApiModelProperty(value = "Cluster-ID list which needs to bind tag")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
index 305a90877..1548bafa0 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
@@ -43,7 +43,7 @@ public class ClusterTagRequest {
     @ApiModelProperty(value = "Cluster tag")
     @NotBlank(groups = SaveValidation.class, message = "clusterTag cannot be 
blank")
     @Length(min = 1, max = 128, message = "length must be between 1 and 128")
-    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
+    @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
     private String clusterTag;
 
     @ApiModelProperty(value = "Extended params")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 4b07ef0b3..d9306609c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -67,7 +67,7 @@ public class SourceRequest {
     @ApiModelProperty("Source name, unique in one stream")
     @NotBlank(groups = SaveValidation.class, message = "sourceName cannot be 
blank")
     @Length(min = 1, max = 100, message = "sourceName length must be between 1 
and 100")
-    @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sourceName only 
supports lowercase letters, numbers, '-', or '_'")
+    @Pattern(regexp = "^[a-z0-9_.-]{1,100}$", message = "sourceName only 
supports lowercase letters, numbers, '-', or '_'")
     private String sourceName;
 
     @ApiModelProperty("Ip of the agent running the task")
@@ -85,7 +85,7 @@ public class SourceRequest {
 
     @ApiModelProperty("Inlong cluster node label for filtering stream source 
collect task")
     @Length(min = 1, max = 128, message = "length must be between 1 and 128")
-    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
+    @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
     private String inlongClusterNodeGroup;
 
     @ApiModelProperty("Data node name")
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index d8fb62fbc..7ba844f8a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -1240,7 +1240,7 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         clusterEntityList.forEach(e -> 
tagSet.addAll(Arrays.asList(e.getClusterTags().split(InlongConstants.COMMA))));
         List<String> clusterTagList = new ArrayList<>(tagSet);
         InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder()
-                .status(GroupStatus.CONFIG_SUCCESSFUL.getCode())
+                
.statusList(Arrays.asList(GroupStatus.CONFIG_SUCCESSFUL.getCode(), 
GroupStatus.RESTARTED.getCode()))
                 .clusterTagList(clusterTagList)
                 .build();
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index ff44bfb9c..1faf01561 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.core.impl;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -67,11 +68,13 @@ import org.elasticsearch.common.util.set.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -81,6 +84,11 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -98,7 +106,18 @@ public class AgentServiceImpl implements AgentService {
     private static final int MODULUS_100 = 100;
     private static final int TASK_FETCH_SIZE = 2;
     private static final Gson GSON = new Gson();
-
+    private final ExecutorService executorService = new ThreadPoolExecutor(
+            5,
+            10,
+            10L,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(100),
+            new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
+            new CallerRunsPolicy());
+    @Value("${source.update.enabled:false}")
+    private Boolean enabled;
+    @Value("${source.update.before.seconds:60}")
+    private Integer beforeSeconds;
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
@@ -114,6 +133,18 @@ public class AgentServiceImpl implements AgentService {
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
 
+    /**
+     * Start the update task
+     */
+    @PostConstruct
+    private void startHeartbeatTask() {
+        if (enabled) {
+            UpdateTaskRunnable taskRunnable = new UpdateTaskRunnable();
+            this.executorService.execute(taskRunnable);
+            LOGGER.info("update task status started successfully");
+        }
+    }
+
     @Override
     public Boolean reportSnapshot(TaskSnapshotRequest request) {
         return snapshotOperator.snapshot(request);
@@ -129,6 +160,8 @@ public class AgentServiceImpl implements AgentService {
             throw new BusinessException("agent request or agent ip was empty, 
just return");
         }
 
+        preTimeoutTasks(request);
+
         // Update task status, other tasks with status 20x will change to 30x 
in next request
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.info("task result was empty in request: {}, just return", 
request);
@@ -422,6 +455,16 @@ public class AgentServiceImpl implements AgentService {
         });
     }
 
+    private void preTimeoutTasks(TaskRequest taskRequest) {
+        // If the agent report succeeds, restore the source status
+        List<Integer> needUpdateIds = 
sourceMapper.selectHeartbeatTimeoutIds(Lists.newArrayList(SourceType.FILE),
+                taskRequest.getAgentIp(), taskRequest.getClusterName());
+        // restore state for all source by ip and type
+        if (CollectionUtils.isNotEmpty(needUpdateIds)) {
+            sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
+        }
+    }
+
     private InlongClusterNodeEntity selectByIpAndCluster(String clusterName, 
String ip) {
         InlongClusterEntity clusterEntity = 
clusterMapper.selectByNameAndType(clusterName, ClusterType.AGENT);
         if (clusterEntity == null) {
@@ -602,4 +645,22 @@ public class AgentServiceImpl implements AgentService {
         return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
     }
 
+    /**
+     * update task status when task timeout
+     */
+    private class UpdateTaskRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    sourceMapper.updateStatusToTimeout(beforeSeconds);
+                    Thread.sleep(beforeSeconds * 1000);
+                } catch (Throwable t) {
+                    LOGGER.error("update task status runnable error", t);
+                }
+            }
+        }
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 20ee092fd..8f2ec1b4e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -23,17 +23,21 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterStatus;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -42,23 +46,28 @@ import 
org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
 import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
 import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Slf4j
+@Lazy
 @Component
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
@@ -75,6 +84,8 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
     private InlongClusterEntityMapper clusterMapper;
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
+    @Autowired
+    private StreamSourceEntityMapper sourceMapper;
 
     /**
      * Check whether the configuration information carried in the heartbeat 
has been updated
@@ -88,11 +99,15 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         if (oldHB == null) {
             return true;
         }
-        return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() 
!= newHB.getLoad();
+        return !Objects.equals(oldHB.getNodeGroup(), newHB.getNodeGroup()) || 
!Objects.equals(oldHB.getLoad(),
+                newHB.getLoad());
     }
 
     @PostConstruct
     public void init() {
+        // When the manager restarts, set the heartbeat timeout state of all 
nodes
+        // and wait for the heartbeat report of the corresponding node
+        clusterNodeMapper.updateStatus(null, 
NodeStatus.HEARTBEAT_TIMEOUT.getStatus(), NodeStatus.NORMAL.getStatus());
         long expireTime = heartbeatInterval() * 2L;
         Scheduler evictScheduler = 
Scheduler.forScheduledExecutorService(Executors.newSingleThreadScheduledExecutor());
         heartbeatCache = Caffeine.newBuilder()
@@ -162,6 +177,16 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
                     handlerNum += insertClusterNode(clusterInfo, heartbeatMsg, 
clusterInfo.getCreator());
                 } else {
                     handlerNum += updateClusterNode(clusterNode, heartbeatMsg);
+                    // If the agent report succeeds, restore the source status
+                    if (Objects.equals(clusterNode.getType(), 
ClusterType.AGENT)) {
+                        // If the agent report succeeds, restore the source 
status
+                        List<Integer> needUpdateIds = 
sourceMapper.selectHeartbeatTimeoutIds(
+                                Lists.newArrayList(SourceType.FILE), 
heartbeat.getIp(), heartbeat.getClusterName());
+                        // restore state for all source by ip and type
+                        if (CollectionUtils.isNotEmpty(needUpdateIds)) {
+                            
sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
+                        }
+                    }
                 }
             }
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index ec7071c98..052a818f6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
 import org.apache.inlong.manager.pojo.heartbeat.StreamHeartbeatResponse;
 import org.apache.inlong.manager.service.core.HeartbeatService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
@@ -59,6 +60,7 @@ public class HeartbeatServiceImpl implements HeartbeatService 
{
     private static final Gson GSON = new Gson();
 
     @Autowired
+    @Lazy
     private HeartbeatManager heartbeatManager;
     @Autowired
     private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index 2e3d32283..fdad06090 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -62,7 +62,7 @@ public abstract class AbstractSourceOperateListener 
implements SourceOperateList
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        log.info("operate stream source for context={}", context);
+        log.info("operate stream source for groupId={}", 
context.getProcessForm().getInlongGroupId());
         InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
         final String groupId = groupInfo.getInlongGroupId();
         List<InlongStreamBriefInfo> streamResponses = 
streamService.listBriefWithSink(groupId);
@@ -104,6 +104,7 @@ public abstract class AbstractSourceOperateListener 
implements SourceOperateList
             SourceStatus sourceStatus = SourceStatus.forCode(status);
             // template sources are filtered and processed in corresponding 
subclass listeners
             if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == 
SourceStatus.SOURCE_STOP
+                    || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
                     || 
CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
                 return true;
             } else if (sourceStatus == SourceStatus.SOURCE_FAILED || 
sourceStatus == SourceStatus.SOURCE_DISABLE) {
@@ -118,7 +119,8 @@ public abstract class AbstractSourceOperateListener 
implements SourceOperateList
         if (sourceStatus != SourceStatus.SOURCE_NORMAL
                 && sourceStatus != SourceStatus.SOURCE_STOP
                 && sourceStatus != SourceStatus.SOURCE_DISABLE
-                && sourceStatus != SourceStatus.SOURCE_FAILED) {
+                && sourceStatus != SourceStatus.SOURCE_FAILED
+                && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) {
             log.error("stream source ={} cannot be operated for status={}", 
streamSource, sourceStatus);
             unOperatedSources.add(streamSource);
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index a6a25b5e5..6c3cf4442 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -167,26 +167,29 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         // setting updated parameters of stream source entity.
         setTargetEntity(request, entity);
         entity.setModifier(operator);
-
         entity.setPreviousStatus(entity.getStatus());
 
         // re-issue task if necessary
         if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
+            SourceStatus sourceStatus = 
SourceStatus.forCode(entity.getStatus());
+            Integer nextStatus = entity.getStatus();
             if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
-                entity.setStatus(SourceStatus.TO_BE_ISSUED_RETRY.getCode());
+                nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
             } else {
                 switch (SourceStatus.forCode(entity.getStatus())) {
                     case SOURCE_NORMAL:
-                        
entity.setStatus(SourceStatus.TO_BE_ISSUED_RETRY.getCode());
+                    case HEARTBEAT_TIMEOUT:
+                        nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
                         break;
                     case SOURCE_FAILED:
-                        entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
+                        nextStatus = SourceStatus.SOURCE_NEW.getCode();
                         break;
                     default:
                         // others leave it be
                         break;
                 }
             }
+            entity.setStatus(nextStatus);
         }
 
         int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 53dd907c1..5bd94df1d 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -85,3 +85,8 @@ data.cleansing.batchSize=100
 
 # Whether to use ZooKeeper to manage the Sort task config, default is false, 
which means not using ZooKeeper
 sort.enable.zookeeper=false
+
+# If turned on, synchronizing change the source status when the agent 
heartbeat times out
+source.update.enabled=false
+source.update.before.seconds=60
+
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index dfac2d818..c07e79f04 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -84,3 +84,8 @@ data.cleansing.batchSize=100
 
 # Whether to use ZooKeeper to manage the Sort task config, default is false, 
which means not using ZooKeeper
 sort.enable.zookeeper=false
+
+# If turned on, synchronizing change the source status when the agent 
heartbeat times out
+source.update.enabled=false
+source.update.before.seconds=60
+
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 53dd907c1..13822969a 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -85,3 +85,7 @@ data.cleansing.batchSize=100
 
 # Whether to use ZooKeeper to manage the Sort task config, default is false, 
which means not using ZooKeeper
 sort.enable.zookeeper=false
+
+# If turned on, synchronizing change the source status when the agent 
heartbeat times out
+source.update.enabled=false
+source.update.before.seconds=60


Reply via email to