This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d66775d01 [INLONG-7987][Manager] Add heartbeat timeout status to the
source (#7989)
d66775d01 is described below
commit d66775d015d6ad61e6891d11aba1bf4fbc27d4ca
Author: fuweng11 <[email protected]>
AuthorDate: Mon May 15 19:08:58 2023 +0800
[INLONG-7987][Manager] Add heartbeat timeout status to the source (#7989)
Co-authored-by: healchow <[email protected]>
---
CHANGES.md | 1 +
.../manager/common/enums/SimpleSourceStatus.java | 1 +
.../inlong/manager/common/enums/SourceStatus.java | 63 ++++++++++++++--------
.../dao/mapper/InlongClusterNodeEntityMapper.java | 5 ++
.../dao/mapper/StreamSourceEntityMapper.java | 22 ++++++++
.../mappers/InlongClusterNodeEntityMapper.xml | 18 ++++++-
.../resources/mappers/StreamSourceEntityMapper.xml | 59 ++++++++++++++++++--
.../manager/pojo/cluster/BindTagRequest.java | 2 +-
.../manager/pojo/cluster/ClusterTagRequest.java | 2 +-
.../inlong/manager/pojo/source/SourceRequest.java | 4 +-
.../service/cluster/InlongClusterServiceImpl.java | 2 +-
.../service/core/impl/AgentServiceImpl.java | 63 +++++++++++++++++++++-
.../service/heartbeat/HeartbeatManager.java | 27 +++++++++-
.../service/heartbeat/HeartbeatServiceImpl.java | 2 +
.../source/AbstractSourceOperateListener.java | 6 ++-
.../service/source/AbstractSourceOperator.java | 11 ++--
.../src/main/resources/application-dev.properties | 5 ++
.../src/main/resources/application-prod.properties | 5 ++
.../src/main/resources/application-test.properties | 4 ++
19 files changed, 262 insertions(+), 40 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5e350802d..bac536234 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -50,6 +50,7 @@
| [INLONG-8006](https://github.com/apache/inlong/issues/8006) |
[Improve][Manager] Set displayname for the auto-registered cluster
|
| [INLONG-7999](https://github.com/apache/inlong/issues/7999) |
[Improve][Manager] Support PostgreSQL data node
|
| [INLONG-7996](https://github.com/apache/inlong/issues/7996) |
[Improve][Manager] Support issued kafka consumer group to sort
|
+| [INLONG-7987](https://github.com/apache/inlong/issues/7987) |
[Improve][Manager] Add a heartbeat timeout status to the source
|
| [INLONG-7981](https://github.com/apache/inlong/issues/7981) | [Bug][Manager]
Failed to stop source correctly when suspend a group
|
| [INLONG-7948](https://github.com/apache/inlong/issues/7948) |
[Improve][Manager] Add user authentication when operate inlong consume
|
| [INLONG-7946](https://github.com/apache/inlong/issues/7946) |
[Improve][Manager] Add user authentication when bind clusterTag
|
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
index e361daadc..7c72f916e 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
@@ -44,6 +44,7 @@ public enum SimpleSourceStatus {
case SOURCE_STOP:
return FROZEN;
case SOURCE_FAILED:
+ case HEARTBEAT_TIMEOUT:
return FAILED;
case TO_BE_ISSUED_DELETE:
case BEEN_ISSUED_DELETE:
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
index 99118bebb..306a9e60c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
@@ -34,6 +34,7 @@ public enum SourceStatus {
SOURCE_NORMAL(101, "normal"),
SOURCE_FAILED(102, "failed"),
SOURCE_STOP(104, "stop"),
+ HEARTBEAT_TIMEOUT(105, "heartbeat timeout"),
// if not approved
SOURCE_NEW(110, "new created"),
@@ -76,7 +77,7 @@ public enum SourceStatus {
*/
public static final Set<Integer> ALLOWED_UPDATE = Sets.newHashSet(
SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(),
SOURCE_STOP.getCode(),
- SOURCE_NORMAL.getCode());
+ SOURCE_NORMAL.getCode(), HEARTBEAT_TIMEOUT.getCode());
public static final Set<SourceStatus> TOBE_ISSUED_SET = Sets.newHashSet(
TO_BE_ISSUED_ADD, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY,
@@ -87,59 +88,75 @@ public enum SourceStatus {
static {
// new
- SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_DISABLE,
SOURCE_NEW, TO_BE_ISSUED_ADD));
+ SOURCE_STATE_AUTOMATON.put(SOURCE_NEW,
+ Sets.newHashSet(SOURCE_DISABLE, SOURCE_NEW, TO_BE_ISSUED_ADD,
HEARTBEAT_TIMEOUT));
// normal
SOURCE_STATE_AUTOMATON.put(SOURCE_NORMAL,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED,
TO_BE_ISSUED_DELETE,
TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK,
TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
- TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC,
TO_BE_ISSUED_MAKEUP));
+ TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC,
TO_BE_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT));
// failed
- SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
+ SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED,
+ Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED,
TO_BE_ISSUED_RETRY, HEARTBEAT_TIMEOUT));
// frozen
- SOURCE_STATE_AUTOMATON.put(SOURCE_STOP,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP, TO_BE_ISSUED_ACTIVE));
+ SOURCE_STATE_AUTOMATON.put(SOURCE_STOP,
+ Sets.newHashSet(SOURCE_DISABLE, SOURCE_STOP,
TO_BE_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT));
// [xxx] bo be issued
- HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD,
SOURCE_DISABLE);
+ HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD,
SOURCE_DISABLE, HEARTBEAT_TIMEOUT);
tobeAddNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext);
- HashSet<SourceStatus> tobeDeleteNext =
Sets.newHashSet(BEEN_ISSUED_DELETE);
+ HashSet<SourceStatus> tobeDeleteNext =
Sets.newHashSet(BEEN_ISSUED_DELETE, HEARTBEAT_TIMEOUT);
tobeDeleteNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE,
Sets.newHashSet(tobeDeleteNext));
- HashSet<SourceStatus> tobeRetryNext =
Sets.newHashSet(BEEN_ISSUED_RETRY);
+ HashSet<SourceStatus> tobeRetryNext =
Sets.newHashSet(BEEN_ISSUED_RETRY, HEARTBEAT_TIMEOUT);
tobeRetryNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY,
Sets.newHashSet(tobeRetryNext));
- HashSet<SourceStatus> tobeBacktrackNext =
Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
+ HashSet<SourceStatus> tobeBacktrackNext =
Sets.newHashSet(BEEN_ISSUED_BACKTRACK, HEARTBEAT_TIMEOUT);
tobeBacktrackNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK,
Sets.newHashSet(tobeBacktrackNext));
- HashSet<SourceStatus> tobeFrozenNext =
Sets.newHashSet(BEEN_ISSUED_STOP);
+ HashSet<SourceStatus> tobeFrozenNext =
Sets.newHashSet(BEEN_ISSUED_STOP, HEARTBEAT_TIMEOUT);
tobeFrozenNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_STOP,
Sets.newHashSet(tobeFrozenNext));
- HashSet<SourceStatus> tobeActiveNext =
Sets.newHashSet(BEEN_ISSUED_ACTIVE);
+ HashSet<SourceStatus> tobeActiveNext =
Sets.newHashSet(BEEN_ISSUED_ACTIVE, HEARTBEAT_TIMEOUT);
tobeActiveNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE,
Sets.newHashSet(tobeActiveNext));
- HashSet<SourceStatus> tobeCheckNext =
Sets.newHashSet(BEEN_ISSUED_CHECK);
+ HashSet<SourceStatus> tobeCheckNext =
Sets.newHashSet(BEEN_ISSUED_CHECK, HEARTBEAT_TIMEOUT);
tobeCheckNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK,
Sets.newHashSet(tobeCheckNext));
- HashSet<SourceStatus> tobeRedoMetricNext =
Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
+ HashSet<SourceStatus> tobeRedoMetricNext =
Sets.newHashSet(BEEN_ISSUED_REDO_METRIC, HEARTBEAT_TIMEOUT);
tobeRedoMetricNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC,
Sets.newHashSet(tobeRedoMetricNext));
- HashSet<SourceStatus> tobeMakeupNext =
Sets.newHashSet(BEEN_ISSUED_MAKEUP);
+ HashSet<SourceStatus> tobeMakeupNext =
Sets.newHashSet(BEEN_ISSUED_MAKEUP, HEARTBEAT_TIMEOUT);
tobeMakeupNext.addAll(TOBE_ISSUED_SET);
SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP,
Sets.newHashSet(tobeMakeupNext));
// [xxx] been issued
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE,
+ Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED,
HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK,
+ Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED,
HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_STOP,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE,
+ Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED,
HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK,
Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED, HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC,
+ Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED,
HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP,
+ Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED,
HEARTBEAT_TIMEOUT));
+ SOURCE_STATE_AUTOMATON.put(HEARTBEAT_TIMEOUT,
+ Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED,
SOURCE_STOP, TO_BE_ISSUED_ADD,
+ TO_BE_ISSUED_DELETE,
+ TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK,
TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
+ TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC,
TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
+ BEEN_ISSUED_DELETE, BEEN_ISSUED_RETRY,
BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_STOP,
+ BEEN_ISSUED_ACTIVE, BEEN_ISSUED_CHECK,
BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP,
+ HEARTBEAT_TIMEOUT));
}
private final Integer code;
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 2c7537577..c85c83712 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -47,6 +47,11 @@ public interface InlongClusterNodeEntityMapper {
int updateByIdSelective(InlongClusterNodeEntity record);
+ /**
+ * Update the status to `nextStatus` by the given id.
+ */
+ int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer
nextStatus, @Param("status") Integer status);
+
int deleteById(Integer id);
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 99a06ac34..6d8610ab3 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -125,6 +125,13 @@ public interface StreamSourceEntityMapper {
List<Integer>
selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String
clusterName,
@Param("nodeName") String nodeName, @Param("sourceType") String
sourceType);
+ /**
+ * Query need update tasks by the given status list and type List.
+ */
+ List<Integer> selectHeartbeatTimeoutIds(@Param("sourceTypeList")
List<String> sourceTypeList,
+ @Param("agentIp") String agentIp,
+ @Param("clusterName") String clusterName);
+
int updateByPrimaryKeySelective(StreamSourceEntity record);
int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId")
String streamId,
@@ -164,6 +171,21 @@ public interface StreamSourceEntityMapper {
void updateStatusByIds(@Param("idList") List<Integer> idList,
@Param("status") Integer status,
@Param("operator") String operator);
+ /**
+ * Update the source status
+ *
+ * @param idList source id list
+ * @param operator operator name
+ */
+ void rollbackTimeoutStatusByIds(@Param("idList") List<Integer> idList,
@Param("operator") String operator);
+
+ /**
+ * Update the source status when it has been deleted
+ *
+ * @param beforeSeconds the modified time was beforeSeconds seconds ago
+ */
+ void updateStatusToTimeout(@Param("beforeSeconds") Integer beforeSeconds);
+
/**
* Physical delete stream sources by group id and stream id
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 2dd8b216f..7bf692fe5 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -201,7 +201,23 @@
where id = #{id,jdbcType=INTEGER}
and version = #{version,jdbcType=INTEGER}
</update>
-
+ <update id="updateStatus">
+ update inlong_cluster_node
+ <set>
+ <if test="nextStatus != null">
+ status = #{nextStatus,jdbcType=INTEGER}
+ </if>
+ </set>
+ <where>
+ is_deleted = 0
+ <if test="status != null">
+ and status = #{status,jdbcType=INTEGER}
+ </if>
+ <if test="id != null">
+ and id = #{id,jdbcType=INTEGER}
+ </if>
+ </where>
+ </update>
<delete id="deleteById" parameterType="java.lang.Integer">
delete
from inlong_cluster_node
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7c2df0199..d29200b87 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -342,6 +342,24 @@
</if>
</where>
</select>
+ <select id="selectHeartbeatTimeoutIds" resultType="java.lang.Integer">
+ select id
+ from stream_source
+ <where>
+ is_deleted = 0
+ and status = 105
+ <if test="sourceTypeList != null and sourceTypeList.size()>0">
+ and source_type in
+ <foreach item="item" index="index" collection="sourceTypeList"
open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </if>
+ <if test="agentIp != null and agentIp != ''">
+ and agent_ip = #{agentIp, jdbcType=VARCHAR}
+ </if>
+ and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+ </where>
+ </select>
<update id="updateByRelatedId">
update stream_source
<set>
@@ -484,9 +502,11 @@
update stream_source
<set>
previous_status = status,
- status = #{status, jdbcType=INTEGER},
- modifier = #{operator, jdbcType=VARCHAR},
- version = version + 1
+ status = #{status, jdbcType=INTEGER},
+ <if test="operator != null and operator !=''">
+ modifier = #{operator, jdbcType=VARCHAR},
+ </if>
+ version = version + 1
</set>
<where>
is_deleted = 0
@@ -498,6 +518,39 @@
</if>
</where>
</update>
+ <update id="rollbackTimeoutStatusByIds">
+ update stream_source
+ <set>
+ status = previous_status,
+ previous_status = 105,
+ <if test="operator != null and operator !=''">
+ modifier = #{operator, jdbcType=VARCHAR},
+ </if>
+ version = version + 1
+ </set>
+ <where>
+ is_deleted = 0
+ and status = 105
+ <if test="idList != null and idList.size() > 0">
+ and id in
+ <foreach item="item" index="index" collection="idList"
open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </if>
+ </where>
+ </update>
+ <update id="updateStatusToTimeout">
+ update stream_source
+ <set>
+ previous_status = status,
+ status = 105
+ </set>
+ <where>
+ is_deleted = 0
+ and modify_time <= DATE_ADD(NOW(), INTERVAL -#{beforeSeconds,
jdbcType=INTEGER} SECOND)
+ and status in (200, 201, 202, 203, 204, 205, 206, 207, 208)
+ </where>
+ </update>
<delete id="deleteByRelatedId">
delete
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
index b1aff06ae..0d6d5d68e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
@@ -38,7 +38,7 @@ public class BindTagRequest {
@ApiModelProperty(value = "Cluster tag")
@NotBlank(message = "clusterTag cannot be blank")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
- @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
+ @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
private String clusterTag;
@ApiModelProperty(value = "Cluster-ID list which needs to bind tag")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
index 305a90877..1548bafa0 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
@@ -43,7 +43,7 @@ public class ClusterTagRequest {
@ApiModelProperty(value = "Cluster tag")
@NotBlank(groups = SaveValidation.class, message = "clusterTag cannot be
blank")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
- @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
+ @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
private String clusterTag;
@ApiModelProperty(value = "Extended params")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 4b07ef0b3..d9306609c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -67,7 +67,7 @@ public class SourceRequest {
@ApiModelProperty("Source name, unique in one stream")
@NotBlank(groups = SaveValidation.class, message = "sourceName cannot be
blank")
@Length(min = 1, max = 100, message = "sourceName length must be between 1
and 100")
- @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sourceName only
supports lowercase letters, numbers, '-', or '_'")
+ @Pattern(regexp = "^[a-z0-9_.-]{1,100}$", message = "sourceName only
supports lowercase letters, numbers, '-', or '_'")
private String sourceName;
@ApiModelProperty("Ip of the agent running the task")
@@ -85,7 +85,7 @@ public class SourceRequest {
@ApiModelProperty("Inlong cluster node label for filtering stream source
collect task")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
- @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
+ @Pattern(regexp = "^[a-z0-9_.-]{1,128}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
private String inlongClusterNodeGroup;
@ApiModelProperty("Data node name")
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index d8fb62fbc..7ba844f8a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -1240,7 +1240,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
clusterEntityList.forEach(e ->
tagSet.addAll(Arrays.asList(e.getClusterTags().split(InlongConstants.COMMA))));
List<String> clusterTagList = new ArrayList<>(tagSet);
InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder()
- .status(GroupStatus.CONFIG_SUCCESSFUL.getCode())
+
.statusList(Arrays.asList(GroupStatus.CONFIG_SUCCESSFUL.getCode(),
GroupStatus.RESTARTED.getCode()))
.clusterTagList(clusterTagList)
.build();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index ff44bfb9c..1faf01561 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.core.impl;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
@@ -67,11 +68,13 @@ import org.elasticsearch.common.util.set.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -81,6 +84,11 @@ import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -98,7 +106,18 @@ public class AgentServiceImpl implements AgentService {
private static final int MODULUS_100 = 100;
private static final int TASK_FETCH_SIZE = 2;
private static final Gson GSON = new Gson();
-
+ private final ExecutorService executorService = new ThreadPoolExecutor(
+ 5,
+ 10,
+ 10L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(100),
+ new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
+ new CallerRunsPolicy());
+ @Value("${source.update.enabled:false}")
+ private Boolean enabled;
+ @Value("${source.update.before.seconds:60}")
+ private Integer beforeSeconds;
@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
@@ -114,6 +133,18 @@ public class AgentServiceImpl implements AgentService {
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
+ /**
+ * Start the update task
+ */
+ @PostConstruct
+ private void startHeartbeatTask() {
+ if (enabled) {
+ UpdateTaskRunnable taskRunnable = new UpdateTaskRunnable();
+ this.executorService.execute(taskRunnable);
+ LOGGER.info("update task status started successfully");
+ }
+ }
+
@Override
public Boolean reportSnapshot(TaskSnapshotRequest request) {
return snapshotOperator.snapshot(request);
@@ -129,6 +160,8 @@ public class AgentServiceImpl implements AgentService {
throw new BusinessException("agent request or agent ip was empty,
just return");
}
+ preTimeoutTasks(request);
+
// Update task status, other tasks with status 20x will change to 30x
in next request
if (CollectionUtils.isEmpty(request.getCommandInfo())) {
LOGGER.info("task result was empty in request: {}, just return",
request);
@@ -422,6 +455,16 @@ public class AgentServiceImpl implements AgentService {
});
}
+ private void preTimeoutTasks(TaskRequest taskRequest) {
+ // If the agent report succeeds, restore the source status
+ List<Integer> needUpdateIds =
sourceMapper.selectHeartbeatTimeoutIds(Lists.newArrayList(SourceType.FILE),
+ taskRequest.getAgentIp(), taskRequest.getClusterName());
+ // restore state for all source by ip and type
+ if (CollectionUtils.isNotEmpty(needUpdateIds)) {
+ sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
+ }
+ }
+
private InlongClusterNodeEntity selectByIpAndCluster(String clusterName,
String ip) {
InlongClusterEntity clusterEntity =
clusterMapper.selectByNameAndType(clusterName, ClusterType.AGENT);
if (clusterEntity == null) {
@@ -602,4 +645,22 @@ public class AgentServiceImpl implements AgentService {
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
+ /**
+ * update task status when task timeout
+ */
+ private class UpdateTaskRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ sourceMapper.updateStatusToTimeout(beforeSeconds);
+ Thread.sleep(beforeSeconds * 1000);
+ } catch (Throwable t) {
+ LOGGER.error("update task status runnable error", t);
+ }
+ }
+ }
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 20ee092fd..8f2ec1b4e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -23,17 +23,21 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterStatus;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -42,23 +46,28 @@ import
org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
+@Lazy
@Component
public class HeartbeatManager implements AbstractHeartbeatManager {
@@ -75,6 +84,8 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
+ @Autowired
+ private StreamSourceEntityMapper sourceMapper;
/**
* Check whether the configuration information carried in the heartbeat
has been updated
@@ -88,11 +99,15 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
if (oldHB == null) {
return true;
}
- return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad()
!= newHB.getLoad();
+ return !Objects.equals(oldHB.getNodeGroup(), newHB.getNodeGroup()) ||
!Objects.equals(oldHB.getLoad(),
+ newHB.getLoad());
}
@PostConstruct
public void init() {
+ // When the manager restarts, set the heartbeat timeout state of all
nodes
+ // and wait for the heartbeat report of the corresponding node
+ clusterNodeMapper.updateStatus(null,
NodeStatus.HEARTBEAT_TIMEOUT.getStatus(), NodeStatus.NORMAL.getStatus());
long expireTime = heartbeatInterval() * 2L;
Scheduler evictScheduler =
Scheduler.forScheduledExecutorService(Executors.newSingleThreadScheduledExecutor());
heartbeatCache = Caffeine.newBuilder()
@@ -162,6 +177,16 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
handlerNum += insertClusterNode(clusterInfo, heartbeatMsg,
clusterInfo.getCreator());
} else {
handlerNum += updateClusterNode(clusterNode, heartbeatMsg);
+ // If the agent report succeeds, restore the source status
+ if (Objects.equals(clusterNode.getType(),
ClusterType.AGENT)) {
+ // If the agent report succeeds, restore the source
status
+ List<Integer> needUpdateIds =
sourceMapper.selectHeartbeatTimeoutIds(
+ Lists.newArrayList(SourceType.FILE),
heartbeat.getIp(), heartbeat.getClusterName());
+ // restore state for all source by ip and type
+ if (CollectionUtils.isNotEmpty(needUpdateIds)) {
+
sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
+ }
+ }
}
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index ec7071c98..052a818f6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -45,6 +45,7 @@ import
org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
import org.apache.inlong.manager.pojo.heartbeat.StreamHeartbeatResponse;
import org.apache.inlong.manager.service.core.HeartbeatService;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -59,6 +60,7 @@ public class HeartbeatServiceImpl implements HeartbeatService
{
private static final Gson GSON = new Gson();
@Autowired
+ @Lazy
private HeartbeatManager heartbeatManager;
@Autowired
private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index 2e3d32283..fdad06090 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -62,7 +62,7 @@ public abstract class AbstractSourceOperateListener
implements SourceOperateList
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- log.info("operate stream source for context={}", context);
+ log.info("operate stream source for groupId={}",
context.getProcessForm().getInlongGroupId());
InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
final String groupId = groupInfo.getInlongGroupId();
List<InlongStreamBriefInfo> streamResponses =
streamService.listBriefWithSink(groupId);
@@ -104,6 +104,7 @@ public abstract class AbstractSourceOperateListener
implements SourceOperateList
SourceStatus sourceStatus = SourceStatus.forCode(status);
// template sources are filtered and processed in corresponding
subclass listeners
if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus ==
SourceStatus.SOURCE_STOP
+ || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
||
CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
return true;
} else if (sourceStatus == SourceStatus.SOURCE_FAILED ||
sourceStatus == SourceStatus.SOURCE_DISABLE) {
@@ -118,7 +119,8 @@ public abstract class AbstractSourceOperateListener
implements SourceOperateList
if (sourceStatus != SourceStatus.SOURCE_NORMAL
&& sourceStatus != SourceStatus.SOURCE_STOP
&& sourceStatus != SourceStatus.SOURCE_DISABLE
- && sourceStatus != SourceStatus.SOURCE_FAILED) {
+ && sourceStatus != SourceStatus.SOURCE_FAILED
+ && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) {
log.error("stream source ={} cannot be operated for status={}",
streamSource, sourceStatus);
unOperatedSources.add(streamSource);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index a6a25b5e5..6c3cf4442 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -167,26 +167,29 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
// setting updated parameters of stream source entity.
setTargetEntity(request, entity);
entity.setModifier(operator);
-
entity.setPreviousStatus(entity.getStatus());
// re-issue task if necessary
if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
+ SourceStatus sourceStatus =
SourceStatus.forCode(entity.getStatus());
+ Integer nextStatus = entity.getStatus();
if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
- entity.setStatus(SourceStatus.TO_BE_ISSUED_RETRY.getCode());
+ nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
} else {
switch (SourceStatus.forCode(entity.getStatus())) {
case SOURCE_NORMAL:
-
entity.setStatus(SourceStatus.TO_BE_ISSUED_RETRY.getCode());
+ case HEARTBEAT_TIMEOUT:
+ nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
break;
case SOURCE_FAILED:
- entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
+ nextStatus = SourceStatus.SOURCE_NEW.getCode();
break;
default:
// others leave it be
break;
}
}
+ entity.setStatus(nextStatus);
}
int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 53dd907c1..5bd94df1d 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -85,3 +85,8 @@ data.cleansing.batchSize=100
# Whether to use ZooKeeper to manage the Sort task config, default is false,
which means not using ZooKeeper
sort.enable.zookeeper=false
+
+# If turned on, synchronizing change the source status when the agent
heartbeat times out
+source.update.enabled=false
+source.update.before.seconds=60
+
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index dfac2d818..c07e79f04 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -84,3 +84,8 @@ data.cleansing.batchSize=100
# Whether to use ZooKeeper to manage the Sort task config, default is false,
which means not using ZooKeeper
sort.enable.zookeeper=false
+
+# If turned on, synchronizing change the source status when the agent
heartbeat times out
+source.update.enabled=false
+source.update.before.seconds=60
+
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 53dd907c1..13822969a 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -85,3 +85,7 @@ data.cleansing.batchSize=100
# Whether to use ZooKeeper to manage the Sort task config, default is false,
which means not using ZooKeeper
sort.enable.zookeeper=false
+
+# If turned on, synchronizing change the source status when the agent
heartbeat times out
+source.update.enabled=false
+source.update.before.seconds=60