This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit db296b39df069afca8ac6d8755ffc67395e2262a Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Thu Jan 5 18:35:19 2023 +0800 [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134) --- .../inlong/agent/constant/AgentConstants.java | 2 +- .../apache/inlong/agent/core/HeartbeatManager.java | 8 +- inlong-agent/conf/agent.properties | 2 +- .../inlong/common/heartbeat/HeartbeatMsg.java | 4 +- .../inlong/manager/client/api/InlongClient.java | 9 --- .../manager/client/api/impl/InlongClientImpl.java | 6 -- .../api/inner/client/InlongClusterClient.java | 13 --- .../client/api/service/InlongClusterApi.java | 4 - .../manager/common/consts/AgentConstants.java} | 33 +------- .../dao/entity/InlongClusterNodeEntity.java | 1 - .../manager/dao/entity/StreamSourceEntity.java | 2 +- .../mappers/InlongClusterNodeEntityMapper.xml | 13 ++- .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++-- .../AgentClusterNodeBindGroupRequest.java} | 19 ++--- .../inlong/manager/pojo/source/SourceRequest.java | 4 +- .../service/cluster/InlongClusterService.java | 10 --- .../service/cluster/InlongClusterServiceImpl.java | 55 ------------- .../inlong/manager/service/core/AgentService.java | 8 ++ .../service/core/impl/AgentServiceImpl.java | 92 +++++++++++++++++++--- .../service/heartbeat/HeartbeatManager.java | 24 +++++- .../service/core/impl/AgentServiceTest.java | 74 ++++++++--------- .../inlong/manager/service/mocks/MockAgent.java | 12 +-- .../main/resources/h2/apache_inlong_manager.sql | 3 +- .../manager-web/sql/apache_inlong_manager.sql | 3 +- inlong-manager/manager-web/sql/changes-1.5.0.sql | 6 +- .../web/controller/InlongClusterController.java | 9 --- .../web/controller/openapi/AgentController.java | 6 ++ 27 files changed, 195 insertions(+), 241 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 2933b907b..eceb0dbf3 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -110,7 +110,7 @@ public class AgentConstants { public static final String AGENT_LOCAL_UUID = "agent.local.uuid"; public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open"; public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false; - public static final String AGENT_NODE_TAG = "agent.node.tag"; + public static final String AGENT_NODE_GROUP = "agent.node.group"; public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port"; public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 067c92361..9451f8b8d 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -51,7 +51,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG; +import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH; @@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea final String clusterName = conf.get(AGENT_CLUSTER_NAME); final String clusterTag = conf.get(AGENT_CLUSTER_TAG); final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES); - final String nodeTag = conf.get(AGENT_NODE_TAG); + final String nodeGroup = conf.get(AGENT_NODE_GROUP); HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(agentIp); @@ -227,8 +227,8 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea if (StringUtils.isNotBlank(inCharges)) { heartbeatMsg.setInCharges(inCharges); } - if (StringUtils.isNotBlank(nodeTag)) { - heartbeatMsg.setNodeTag(nodeTag); + if (StringUtils.isNotBlank(nodeGroup)) { + heartbeatMsg.setNodeGroup(nodeGroup); } Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs(); diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties index ee0942a22..e8dfd5911 100755 --- a/inlong-agent/conf/agent.properties +++ b/inlong-agent/conf/agent.properties @@ -42,7 +42,7 @@ thread.pool.await.time=30 agent.local.ip=127.0.0.1 agent.local.uuid= agent.local.uuid.open=false -agent.node.tag=default_tag +agent.node.group=default_group agent.enable.oom.exit=false agent.custom.fixed.ip= # max capacity of memory channel diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java index 31d1eee65..3a54b714f 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java @@ -76,9 +76,9 @@ public class HeartbeatMsg { private String clusterTag; /** - * Tag of node, separated by commas(,) + * Group of node for filtering stream source collect task, separated by commas(,) */ - private String nodeTag; + private String nodeGroup; /** * Ext tag of cluster, key=value pairs seperated by & diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java index a1be17a92..d65660f3e 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java @@ -20,7 +20,6 @@ package org.apache.inlong.manager.client.api; import org.apache.inlong.manager.client.api.impl.InlongClientImpl; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -261,12 +260,4 @@ public interface InlongClient { * @return whether succeed */ Boolean deleteNode(Integer id); - - /** - * Bind or unbind cluster tag node for cluster node. - * - * @param request cluster info to be modified - * @return whether succeed - */ - Boolean bindNodeTag(ClusterNodeBindTagRequest request); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java index 69a538eea..c8f988e44 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java @@ -39,7 +39,6 @@ import org.apache.inlong.manager.common.util.HttpUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -288,11 +287,6 @@ public class InlongClientImpl implements InlongClient { return clusterClient.deleteNode(id); } - @Override - public Boolean bindNodeTag(ClusterNodeBindTagRequest request) { - return clusterClient.bindNodeTag(request); - } - private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus, List<StreamSource> sources) { Map<SimpleSourceStatus, List<StreamSource>> statusListMap = Maps.newHashMap(); sources.forEach(source -> { diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java index ecf271102..7140bf595 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java @@ -23,7 +23,6 @@ import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -297,16 +296,4 @@ public class InlongClusterClient { ClientUtils.assertRespSuccess(response); return response.getData(); } - - /** - * Bind or unbind cluster tag node for cluster node. - * - * @param request cluster info to be modified - * @return whether succeed - */ - public Boolean bindNodeTag(ClusterNodeBindTagRequest request) { - Response<Boolean> response = ClientUtils.executeHttpCall(inlongClusterApi.bindNodeTag(request)); - ClientUtils.assertRespSuccess(response); - return response.getData(); - } } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java index 79b98bf9b..776aacec0 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java @@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.service; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -99,7 +98,4 @@ public interface InlongClusterApi { @DELETE("cluster/node/delete/{id}") Call<Response<Boolean>> deleteNode(@Path("id") Integer id); - - @POST("cluster/node/bindTag") - Call<Response<Boolean>> bindNodeTag(@Body ClusterNodeBindTagRequest request); } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java similarity index 50% copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java index a084f72fb..85da5a876 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java @@ -15,37 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.manager.dao.entity; - -import lombok.Data; - -import java.io.Serializable; -import java.util.Date; +package org.apache.inlong.manager.common.consts; /** - * Inlong cluster node entity, including parent id, type, ip, etc. + * Constant class for agent ext params */ -@Data -public class InlongClusterNodeEntity implements Serializable { - - private static final long serialVersionUID = 1L; - private Integer id; - private Integer parentId; - private String type; - private String ip; - private Integer port; - private String protocolType; - private Integer nodeLoad; - private String nodeTags; - private String extParams; - private String description; - - private Integer status; - private Integer isDeleted; - private String creator; - private String modifier; - private Date createTime; - private Date modifyTime; - private Integer version; +public class AgentConstants { + public static final String AGENT_GROUP_KEY = "agentGroup"; } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java index a084f72fb..b57aa6d26 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java @@ -36,7 +36,6 @@ public class InlongClusterNodeEntity implements Serializable { private Integer port; private String protocolType; private Integer nodeLoad; - private String nodeTags; private String extParams; private String description; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java index c4de96c0f..561270f63 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java @@ -42,7 +42,7 @@ public class StreamSourceEntity implements Serializable { private String dataNodeName; private String inlongClusterName; - private String inlongClusterNodeTag; + private String inlongClusterNodeGroup; private String serializationType; private String snapshot; private Date reportTime; diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index 491dba3cd..6465d3c91 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -28,7 +28,6 @@ <result column="port" jdbcType="INTEGER" property="port"/> <result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/> <result column="node_load" jdbcType="INTEGER" property="nodeLoad"/> - <result column="node_tags" jdbcType="VARCHAR" property="nodeTags"/> <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/> <result column="description" jdbcType="VARCHAR" property="description"/> <result column="status" jdbcType="INTEGER" property="status"/> @@ -40,7 +39,7 @@ <result column="version" jdbcType="INTEGER" property="version"/> </resultMap> <sql id="Base_Column_List"> - id, parent_id, type, ip, port, protocol_type, node_load, node_tags, ext_params, description, + id, parent_id, type, ip, port, protocol_type, node_load, ext_params, description, status, is_deleted, creator, modifier, create_time, modify_time, version </sql> @@ -48,12 +47,12 @@ parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity"> insert into inlong_cluster_node (id, parent_id, type, ip, port, protocol_type, - node_load, node_tags, ext_params, + node_load, ext_params, description, status, creator, modifier) values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR}, #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, - #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, + #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) </insert> @@ -62,14 +61,13 @@ parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity"> insert into inlong_cluster_node (id, parent_id, type, ip, port, protocol_type, - node_load, node_tags, ext_params, status, + node_load, ext_params, status, creator, modifier) values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR}, #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, - #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, + #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) ON DUPLICATE KEY UPDATE node_load = VALUES(node_load), - node_tags = VALUES(node_tags), ext_params = VALUES(ext_params), status = VALUES(status), modifier = VALUES(modifier) @@ -143,7 +141,6 @@ port = #{port,jdbcType=INTEGER}, protocol_type = #{protocolType,jdbcType=VARCHAR}, node_load = #{nodeLoad,jdbcType=INTEGER}, - node_tags = #{nodeTags,jdbcType=VARCHAR}, ext_params = #{extParams,jdbcType=LONGVARCHAR}, description = #{description,jdbcType=VARCHAR}, status = #{status,jdbcType=INTEGER}, diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 7ba947b1f..2858e986a 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -31,7 +31,7 @@ <result column="uuid" jdbcType="VARCHAR" property="uuid"/> <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/> <result column="inlong_cluster_name" jdbcType="VARCHAR" property="inlongClusterName"/> - <result column="inlong_cluster_node_tag" jdbcType="VARCHAR" property="inlongClusterNodeTag"/> + <result column="inlong_cluster_node_group" jdbcType="VARCHAR" property="inlongClusterNodeGroup"/> <result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/> <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/> <result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/> @@ -47,7 +47,7 @@ </resultMap> <sql id="Base_Column_List"> id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid, - data_node_name, inlong_cluster_name, inlong_cluster_node_tag, serialization_type, snapshot, report_time, + data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time, ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time </sql> @@ -55,14 +55,14 @@ parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> insert into stream_source (inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, - uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_tag, + uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time, ext_params, status, previous_status, creator, modifier) values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER}, #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, - #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeTag,jdbcType=VARCHAR}, + #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeGroup,jdbcType=VARCHAR}, #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) @@ -337,8 +337,8 @@ <if test="inlongClusterName != null"> inlong_cluster_name = #{inlongClusterName,jdbcType=INTEGER}, </if> - <if test="inlongClusterNodeTag != null"> - inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=INTEGER}, + <if test="inlongClusterNodeGroup != null"> + inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=INTEGER}, </if> <if test="serializationType != null"> serialization_type = #{serializationType,jdbcType=VARCHAR}, @@ -382,7 +382,7 @@ uuid = #{uuid,jdbcType=VARCHAR}, data_node_name = #{dataNodeName,jdbcType=VARCHAR}, inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR}, - inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=VARCHAR}, + inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=VARCHAR}, serialization_type = #{serializationType,jdbcType=VARCHAR}, snapshot = #{snapshot,jdbcType=LONGVARCHAR}, report_time = #{reportTime,jdbcType=TIMESTAMP}, diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java similarity index 72% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java index ad6fe0673..493c03ab4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster; +package org.apache.inlong.manager.pojo.cluster.agent; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -25,24 +25,21 @@ import javax.validation.constraints.NotBlank; import java.util.List; /** - * Inlong cluster node bind or unbind tag request + * Inlong cluster node bind or unbind group.Group is used to distinguish which stream source tasks are collected */ @Data -@ApiModel("Cluster node bind and unbind tag request") -public class ClusterNodeBindTagRequest { +@ApiModel("Cluster node bind and unbind stream source label request, stream source label is a filter to judge " + + "whether to accept the stream source task") +public class AgentClusterNodeBindGroupRequest { - @NotBlank(message = "Cluster nodeTag cannot be blank") - @ApiModelProperty(value = "Cluster node tag") - private String clusterNodeTag; + @NotBlank(message = "Cluster agent group cannot be blank") + @ApiModelProperty(value = "Cluster agent group") + private String agentGroup; @NotBlank(message = "clusterName cannot be blank") @ApiModelProperty(value = "Cluster name") private String clusterName; - @NotBlank(message = "type cannot be blank") - @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.") - private String type; - @ApiModelProperty(value = "Cluster node ip list which needs to bind tag") private List<String> bindClusterNodes; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index 099ab45e7..f0adcdd5e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -81,10 +81,10 @@ public class SourceRequest { @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") private String inlongClusterName; - @ApiModelProperty("Inlong cluster node tag") + @ApiModelProperty("Inlong cluster node label for filtering stream source collect task") @Length(min = 1, max = 128, message = "length must be between 1 and 128") @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") - private String inlongClusterNodeTag; + private String inlongClusterNodeGroup; @ApiModelProperty("Data node name") @Length(max = 128, message = "length must be less than or equal to 128") diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java index 8ecccbc8b..fac486088 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java @@ -21,7 +21,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig; import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -386,15 +385,6 @@ public interface InlongClusterService { */ Boolean deleteNode(Integer id, String operator); - /** - * Bind or unbind cluster tag node for cluster node. - * - * @param request cluster info to be modified - * @param operator current operator - * @return whether succeed - */ - Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator); - /** * Delete cluster node. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index a76ec6dcd..598dd10a0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -54,7 +54,6 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.UserEntityMapper; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -1390,60 +1389,6 @@ public class InlongClusterServiceImpl implements InlongClusterService { return true; } - @Override - public Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator) { - HashSet<String> bindSet = Sets.newHashSet(); - HashSet<String> unbindSet = Sets.newHashSet(); - if (request.getBindClusterNodes() != null) { - bindSet.addAll(request.getBindClusterNodes()); - } - if (request.getUnbindClusterNodes() != null) { - unbindSet.addAll(request.getUnbindClusterNodes()); - } - Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(), - "can not add and del node tag in the sameTime"); - InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), request.getType()); - String message = "Current user does not have permission to bind cluster node tag"; - checkUser(cluster, operator, message); - - if (CollectionUtils.isNotEmpty(bindSet)) { - bindSet.stream().flatMap(clusterNode -> { - ClusterPageRequest pageRequest = new ClusterPageRequest(); - pageRequest.setParentId(cluster.getId()); - pageRequest.setType(request.getType()); - pageRequest.setKeyword(clusterNode); - return clusterNodeMapper.selectByCondition(pageRequest).stream(); - }).filter(entity -> entity != null) - .forEach(entity -> { - String nodeTags = entity.getNodeTags(); - Set<String> tagSet = nodeTags == null ? Sets.newHashSet() - : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA)); - tagSet.add(request.getClusterNodeTag()); - entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet)); - clusterNodeMapper.updateById(entity); - }); - } - - if (CollectionUtils.isNotEmpty(unbindSet)) { - unbindSet.stream().flatMap(clusterNode -> { - ClusterPageRequest pageRequest = new ClusterPageRequest(); - pageRequest.setParentId(cluster.getId()); - pageRequest.setType(request.getType()); - pageRequest.setKeyword(clusterNode); - return clusterNodeMapper.selectByCondition(pageRequest).stream(); - }).filter(entity -> entity != null) - .forEach(entity -> { - String nodeTags = entity.getNodeTags(); - Set<String> tagSet = nodeTags == null ? Sets.newHashSet() - : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA)); - tagSet.remove(request.getClusterNodeTag()); - entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet)); - clusterNodeMapper.updateById(entity); - }); - } - return true; - } - @Override public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolType) { LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", groupId, protocolType); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java index e7db76937..099ebca72 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core; import org.apache.inlong.common.pojo.agent.TaskRequest; import org.apache.inlong.common.pojo.agent.TaskResult; import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; /** * The service interface for agent @@ -49,4 +50,11 @@ public interface AgentService { */ TaskResult getTaskResult(TaskRequest request); + /** + * Divide the agent into different groups, which collect different stream source tasks. + * + * @param request Request of the bind group. + * @return Whether succeed. + */ + Boolean bindGroup(AgentClusterNodeBindGroupRequest request); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index f6e1776ce..0efd42c32 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.core.impl; import com.google.common.collect.Lists; +import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -33,6 +34,7 @@ import org.apache.inlong.common.pojo.agent.TaskResult; import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo; import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; +import org.apache.inlong.manager.common.consts.AgentConstants; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.consts.SourceType; @@ -54,6 +56,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; @@ -75,6 +78,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -92,6 +96,7 @@ public class AgentServiceImpl implements AgentService { private static final int ISSUED_STATUS = 3; private static final int MODULUS_100 = 100; private static final int TASK_FETCH_SIZE = 2; + private static final Gson GSON = new Gson(); @Autowired private StreamSourceEntityMapper sourceMapper; @@ -192,6 +197,66 @@ public class AgentServiceImpl implements AgentService { return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build(); } + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) + public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) { + HashSet<String> bindSet = Sets.newHashSet(); + HashSet<String> unbindSet = Sets.newHashSet(); + if (request.getBindClusterNodes() != null) { + bindSet.addAll(request.getBindClusterNodes()); + } + if (request.getUnbindClusterNodes() != null) { + unbindSet.addAll(request.getUnbindClusterNodes()); + } + Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(), + "can not add and del node tag in the sameTime"); + InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), ClusterType.AGENT); + String message = "Current user does not have permission to bind cluster node tag"; + + if (CollectionUtils.isNotEmpty(bindSet)) { + bindSet.stream().flatMap(clusterNode -> { + ClusterPageRequest pageRequest = new ClusterPageRequest(); + pageRequest.setParentId(cluster.getId()); + pageRequest.setType(ClusterType.AGENT); + pageRequest.setKeyword(clusterNode); + return clusterNodeMapper.selectByCondition(pageRequest).stream(); + }).filter(entity -> entity != null) + .forEach(entity -> { + Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>() + : GSON.fromJson(entity.getExtParams(), Map.class); + Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>() + : Sets.newHashSet( + extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); + groupSet.add(request.getAgentGroup()); + extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet)); + entity.setExtParams(GSON.toJson(extParams)); + clusterNodeMapper.insertOnDuplicateKeyUpdate(entity); + }); + } + + if (CollectionUtils.isNotEmpty(unbindSet)) { + unbindSet.stream().flatMap(clusterNode -> { + ClusterPageRequest pageRequest = new ClusterPageRequest(); + pageRequest.setParentId(cluster.getId()); + pageRequest.setType(ClusterType.AGENT); + pageRequest.setKeyword(clusterNode); + return clusterNodeMapper.selectByCondition(pageRequest).stream(); + }).filter(entity -> entity != null) + .forEach(entity -> { + Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>() + : GSON.fromJson(entity.getExtParams(), Map.class); + Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>() + : Sets.newHashSet( + extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); + groupSet.remove(request.getAgentGroup()); + extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet)); + entity.setExtParams(GSON.toJson(extParams)); + clusterNodeMapper.insertOnDuplicateKeyUpdate(entity); + }); + } + return true; + } + /** * Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated) * @@ -251,7 +316,7 @@ public class AgentServiceImpl implements AgentService { private void preProcessFileTask(TaskRequest taskRequest) { preProcessTemplateFileTask(taskRequest); - preProcessTagFileTasks(taskRequest); + preProcessLabelFileTasks(taskRequest); } /** @@ -306,7 +371,7 @@ public class AgentServiceImpl implements AgentService { * * @param taskRequest */ - private void preProcessTagFileTasks(TaskRequest taskRequest) { + private void preProcessLabelFileTasks(TaskRequest taskRequest) { List<Integer> needProcessedStatusList = Arrays.asList( SourceStatus.SOURCE_NORMAL.getCode(), SourceStatus.SOURCE_FAILED.getCode(), @@ -329,7 +394,7 @@ public class AgentServiceImpl implements AgentService { Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet( SourceStatus.SOURCE_FROZEN, SourceStatus.TO_BE_ISSUED_FROZEN); - if (!matchTag(sourceEntity, clusterNodeEntity) + if (!matchLabel(sourceEntity, clusterNodeEntity) && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) { LOGGER.info("Transform task({}) from {} to {} because tag mismatch " + "for agent({}) in cluster({})", sourceEntity.getAgentIp(), @@ -348,7 +413,7 @@ public class AgentServiceImpl implements AgentService { SourceStatus.TO_BE_ISSUED_ACTIVE); Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet( StreamStatus.SUSPENDED, StreamStatus.SUSPENDED); - if (matchTag(sourceEntity, clusterNodeEntity) + if (matchLabel(sourceEntity, clusterNodeEntity) && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus())) && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) { LOGGER.info("Transform task({}) from {} to {} because tag rematch " @@ -514,20 +579,21 @@ public class AgentServiceImpl implements AgentService { }).collect(Collectors.toList()); } - private boolean matchTag(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) { + private boolean matchLabel(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) { Preconditions.checkNotNull(sourceEntity, "cluster must be valid"); - if (sourceEntity.getInlongClusterNodeTag() == null) { + if (sourceEntity.getInlongClusterNodeGroup() == null) { return true; } - if (clusterNodeEntity == null || clusterNodeEntity.getNodeTags() == null) { + + if (clusterNodeEntity == null || clusterNodeEntity.getExtParams() == null) { return false; } - Set<String> nodeTags = Stream.of( - clusterNodeEntity.getNodeTags().split(InlongConstants.COMMA)).collect(Collectors.toSet()); - Set<String> sourceTags = Stream.of( - sourceEntity.getInlongClusterNodeTag().split(InlongConstants.COMMA)).collect(Collectors.toSet()); - return sourceTags.stream().anyMatch(sourceTag -> nodeTags.contains(sourceTag)); + Map<String, String> extParams = GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class); + Set<String> clusterNodeLabels = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>() + : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); + Set<String> sourceLabels = Stream.of( + sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet()); + return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel)); } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 7ccb7db5d..2fb8d5608 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.gson.Gson; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -30,6 +31,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus; import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager; import org.apache.inlong.common.heartbeat.ComponentHeartbeat; import org.apache.inlong.common.heartbeat.HeartbeatMsg; +import org.apache.inlong.manager.common.consts.AgentConstants; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterStatus; import org.apache.inlong.manager.common.enums.NodeStatus; @@ -47,14 +49,21 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Slf4j @Component public class HeartbeatManager implements AbstractHeartbeatManager { private static final String AUTO_REGISTERED = "auto registered"; + private static final Gson GSON = new Gson(); @Getter private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache; @@ -211,21 +220,30 @@ public class HeartbeatManager implements AbstractHeartbeatManager { clusterNode.setPort(Integer.valueOf(heartbeat.getPort())); clusterNode.setProtocolType(heartbeat.getProtocolType()); clusterNode.setNodeLoad(heartbeat.getLoad()); - clusterNode.setNodeTags(heartbeat.getNodeTag()); clusterNode.setStatus(ClusterStatus.NORMAL.getStatus()); clusterNode.setCreator(creator); clusterNode.setModifier(creator); clusterNode.setDescription(AUTO_REGISTERED); + insertOrUpdateLabel(clusterNode, heartbeat); return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode); } private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) { clusterNode.setStatus(ClusterStatus.NORMAL.getStatus()); clusterNode.setNodeLoad(heartbeat.getLoad()); - clusterNode.setNodeTags(heartbeat.getNodeTag()); + insertOrUpdateLabel(clusterNode, heartbeat); return clusterNodeMapper.updateById(clusterNode); } + private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) { + Set<String> groupSet = heartbeat.getNodeGroup() == null ? new HashSet<>() + : Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet()); + Map<String, String> extParams = clusterNode.getExtParams() == null ? new HashMap<>() + : GSON.fromJson(clusterNode.getExtParams(), Map.class); + extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet)); + clusterNode.setExtParams(GSON.toJson(extParams)); + } + private int deleteClusterNode(InlongClusterNodeEntity clusterNode) { return clusterNodeMapper.deleteById(clusterNode.getId()); } @@ -281,6 +299,6 @@ public class HeartbeatManager implements AbstractHeartbeatManager { if (oldHB == null) { return true; } - return oldHB.getNodeTag() != newHB.getNodeTag() || oldHB.getLoad() != newHB.getLoad(); + return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() != newHB.getLoad(); } } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java index f509f934b..2f85d1c9a 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java @@ -78,7 +78,7 @@ class AgentServiceTest extends ServiceBaseTest { private InlongStreamEntityMapper streamMapper; private List<Pair<String, String>> groupStreamCache; - private List<String> tagCache; + private List<String> groupCache; /** * Save template source @@ -110,7 +110,7 @@ class AgentServiceTest extends ServiceBaseTest { /** * mock {@link StreamSourceService#save} */ - public Pair<String, String> saveSource(String tag) { + public Pair<String, String> saveSource(String group) { String groupId = UUID.randomUUID().toString(); String streamId = UUID.randomUUID().toString(); groupStreamCache.add(new ImmutablePair<>(groupId, streamId)); @@ -121,9 +121,9 @@ class AgentServiceTest extends ServiceBaseTest { sourceInfo.setInlongStreamId(streamId); sourceInfo.setSourceType(SourceType.FILE); sourceInfo.setInlongClusterName(MockAgent.CLUSTER_NAME); - sourceInfo.setInlongClusterNodeTag(tag); + sourceInfo.setInlongClusterNodeGroup(group); sourceInfo.setSourceName( - String.format("Source task for cluster(%s) and tag(%s)", MockAgent.CLUSTER_NAME, tag)); + String.format("Source task for cluster(%s) and group(%s)", MockAgent.CLUSTER_NAME, group)); sourceService.save(sourceInfo, GLOBAL_OPERATOR); sourceService.updateStatus( groupId, @@ -170,7 +170,7 @@ class AgentServiceTest extends ServiceBaseTest { @BeforeEach public void setupEach() { groupStreamCache = new ArrayList<>(); - tagCache = new ArrayList<>(); + groupCache = new ArrayList<>(); } @AfterEach @@ -184,27 +184,27 @@ class AgentServiceTest extends ServiceBaseTest { groupStreamCache.stream().map(Pair::getValue).collect(Collectors.toList())); } groupStreamCache.clear(); - tagCache.stream().forEach(tag -> bindTag(false, tag));; + groupCache.stream().forEach(group -> bindGroup(false, group));; } - private void bindTag(boolean bind, String tag) { + private void bindGroup(boolean bind, String group) { if (bind) { - tagCache.add(tag); + groupCache.add(group); } - agent.bindTag(bind, tag); + agent.bindGroup(bind, group); } /** - * Test bind tag for node. + * Test bind group for node. */ @Test - public void testTagMatch() { - saveSource("tag1,tag3"); - saveSource("tag2,tag3"); - saveSource("tag2,tag3"); - saveSource("tag4"); - bindTag(true, "tag1"); - bindTag(true, "tag2"); + public void testGroupMatch() { + saveSource("group1,group3"); + saveSource("group2,group3"); + saveSource("group2,group3"); + saveSource("group4"); + bindGroup(true, "group1"); + bindGroup(true, "group2"); TaskResult taskResult = agent.pullTask(); Assertions.assertTrue(taskResult.getCmdConfigs().isEmpty()); @@ -220,12 +220,12 @@ class AgentServiceTest extends ServiceBaseTest { } /** - * Test node tag mismatch source task and next time rematch source task. + * Test node group mismatch source task and next time rematch source task. */ @Test - public void testTagMismatchAndRematch() { - final Pair<String, String> groupStream = saveSource("tag1,tag3"); - bindTag(true, "tag1"); + public void testGroupMismatchAndRematch() { + final Pair<String, String> groupStream = saveSource("group1,group3"); + bindGroup(true, "group1"); agent.pullTask(); agent.pullTask(); // report last success status @@ -235,8 +235,8 @@ class AgentServiceTest extends ServiceBaseTest { .findAny() .get() .getId(); - // unbind tag and mismatch - bindTag(false, "tag1"); + // unbind group and mismatch + bindGroup(false, "group1"); TaskResult t1 = agent.pullTask(); Assertions.assertEquals(1, t1.getDataConfigs().size()); Assertions.assertEquals(1, t1.getDataConfigs().stream() @@ -246,8 +246,8 @@ class AgentServiceTest extends ServiceBaseTest { DataConfig d1 = t1.getDataConfigs().get(0); Assertions.assertEquals(sourceId, d1.getTaskId()); - // bind tag and rematch - bindTag(true, "tag1"); + // bind group and rematch + bindGroup(true, "group1"); TaskResult t2 = agent.pullTask(); Assertions.assertEquals(1, t2.getDataConfigs().size()); Assertions.assertEquals(1, t2.getDataConfigs().stream() @@ -263,14 +263,14 @@ class AgentServiceTest extends ServiceBaseTest { */ @Test public void testSuspendFailWhenNotAck() { - Pair<String, String> groupStream = saveSource("tag1,tag3"); - bindTag(true, "tag1"); + Pair<String, String> groupStream = saveSource("group1,group3"); + bindGroup(true, "group1"); agent.pullTask(); agent.pullTask(); // report last success status // mismatch - bindTag(false, "tag1"); + bindGroup(false, "group1"); agent.pullTask(); // suspend @@ -282,21 +282,21 @@ class AgentServiceTest extends ServiceBaseTest { } /** - * Test node tag rematch source task but group suspend + * Test node group rematch source task but group suspend */ @Test public void testRematchedWhenSuspend() { - final Pair<String, String> groupStream = saveSource("tag1,tag3"); - bindTag(true, "tag1"); + final Pair<String, String> groupStream = saveSource("group1,group3"); + bindGroup(true, "group1"); agent.pullTask(); agent.pullTask(); // report last success status // mismatch and rematch - bindTag(false, "tag1"); + bindGroup(false, "group1"); agent.pullTask(); agent.pullTask(); // report last to make it from 304 -> 104 - bindTag(true, "tag1"); + bindGroup(true, "group1"); // suspend suspendSource(groupStream.getLeft(), groupStream.getRight()); @@ -305,12 +305,12 @@ class AgentServiceTest extends ServiceBaseTest { } /** - * Test node tag mismatch source task but group restart + * Test node group mismatch source task but group restart */ @Test public void testMismatchedWhenRestart() { - final Pair<String, String> groupStream = saveSource("tag1,tag3"); - bindTag(true, "tag1"); + final Pair<String, String> groupStream = saveSource("group1,group3"); + bindGroup(true, "group1"); agent.pullTask(); agent.pullTask(); // report last success status @@ -318,7 +318,7 @@ class AgentServiceTest extends ServiceBaseTest { // suspend and restart suspendSource(groupStream.getLeft(), groupStream.getRight()); restartSource(groupStream.getLeft(), groupStream.getRight()); - bindTag(false, "tag1"); + bindGroup(false, "group1"); TaskResult taskResult = agent.pullTask(); Assertions.assertEquals(1, taskResult.getDataConfigs().size()); Assertions.assertEquals(1, taskResult.getDataConfigs().stream() diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java index 2acb651f5..9e0061d18 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java @@ -44,7 +44,7 @@ public class MockAgent { // 2. Regularly report the previously executed tasks to the manager (may be successful or fail) public static final String LOCAL_IP = "127.0.0.1"; public static final String LOCAL_PORT = "8008"; - public static final String LOCAL_TAG = "default_tag"; + public static final String LOCAL_GROUP = "default_group"; public static final String CLUSTER_TAG = "default_cluster_tag"; public static final String CLUSTER_NAME = "1c59ef9e8e1e43cfb25ee8b744c9c81b_2790"; @@ -52,7 +52,7 @@ public class MockAgent { private HeartbeatService heartbeatService; private Queue<CommandEntity> commands = new LinkedList<>(); - private Set<String> tags = Sets.newHashSet(LOCAL_TAG); + private Set<String> groups = Sets.newHashSet(LOCAL_GROUP); private int jobLimit; public MockAgent(AgentService agentService, HeartbeatService heartbeatService, int jobLimit) { @@ -83,17 +83,17 @@ public class MockAgent { heartbeat.setComponentType(ComponentTypeEnum.Agent.getType()); heartbeat.setClusterName(CLUSTER_NAME); heartbeat.setClusterTag(CLUSTER_TAG); - heartbeat.setNodeTag(tags.stream().collect(Collectors.joining(InlongConstants.COMMA))); + heartbeat.setNodeGroup(groups.stream().collect(Collectors.joining(InlongConstants.COMMA))); heartbeat.setInCharges(GLOBAL_OPERATOR); heartbeat.setReportTime(System.currentTimeMillis()); heartbeatService.reportHeartbeat(heartbeat); } - public void bindTag(boolean bind, String tag) { + public void bindGroup(boolean bind, String group) { if (bind) { - tags.add(tag); + groups.add(group); } else { - tags.remove(tag); + groups.remove(group); } sendHeartbeat(); } diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 6af822e01..fc4938469 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -133,7 +133,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` `port` int(6) NULL COMMENT 'Cluster port', `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node', - `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip', `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', `status` int(4) DEFAULT '0' COMMENT 'Cluster status', @@ -330,7 +329,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', - `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag', + `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group', `serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc', `snapshot` mediumtext DEFAULT NULL COMMENT 'Snapshot of this source task', `report_time` timestamp NULL COMMENT 'Snapshot time', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index f8bcb47fb..3036a7525 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -143,7 +143,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` `port` int(6) NULL COMMENT 'Cluster port', `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node', - `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip', `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', `status` int(4) DEFAULT '0' COMMENT 'Cluster status', @@ -347,7 +346,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', - `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag', + `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group', `serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc', `snapshot` mediumtext DEFAULT NULL COMMENT 'Snapshot of this source task', `report_time` timestamp NULL COMMENT 'Snapshot time', diff --git a/inlong-manager/manager-web/sql/changes-1.5.0.sql b/inlong-manager/manager-web/sql/changes-1.5.0.sql index 4b655bc2c..88ed142bd 100644 --- a/inlong-manager/manager-web/sql/changes-1.5.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.5.0.sql @@ -32,9 +32,5 @@ ALTER TABLE `inlong_cluster_node` ADD COLUMN `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node'; -ALTER TABLE `inlong_cluster_node` - ADD COLUMN `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip'; - - ALTER TABLE `stream_source` - ADD COLUMN `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag'; \ No newline at end of file + ADD COLUMN `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group'; diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java index 3ea4634d3..df49e13ea 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java @@ -25,7 +25,6 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.UpdateValidation; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; @@ -228,14 +227,6 @@ public class InlongClusterController { return Response.success(clusterService.deleteNode(id, LoginUserUtils.getLoginUser().getName())); } - @RequestMapping(value = "/cluster/node/bindTag") - @OperationLog(operation = OperationType.UPDATE) - @ApiOperation(value = "Bind or unbind cluster node tag") - public Response<Boolean> bindNodeTag(@Validated @RequestBody ClusterNodeBindTagRequest request) { - String username = LoginUserUtils.getLoginUser().getName(); - return Response.success(clusterService.bindNodeTag(request, username)); - } - @PostMapping("/cluster/testConnection") @ApiOperation(value = "Test connection for inlong cluster") public Response<Boolean> testConnection(@RequestBody ClusterRequest request) { diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java index 88343f940..5e2270637 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java @@ -22,6 +22,7 @@ import io.swagger.annotations.ApiOperation; import org.apache.inlong.common.pojo.agent.TaskRequest; import org.apache.inlong.common.pojo.agent.TaskResult; import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.core.AgentService; @@ -68,4 +69,9 @@ public class AgentController { return Response.success(agentService.getTaskResult(request)); } + @PostMapping("/agent/bindGroup") + @ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.") + public Response<Boolean> bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) { + return Response.success(agentService.bindGroup(request)); + } }