This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 10e512ff0 [INLONG-7199][Manager] Support save extension params for inlong cluster node (#7200) 10e512ff0 is described below commit 10e512ff0b87a1f7aa49c187bc3f8a656476efad Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Jan 10 20:43:41 2023 +0800 [INLONG-7199][Manager] Support save extension params for inlong cluster node (#7200) --- .../manager/common/consts/AgentConstants.java | 26 ------- .../dao/mapper/InlongClusterNodeEntityMapper.java | 2 + .../mappers/InlongClusterNodeEntityMapper.xml | 41 ++++++++++ .../manager/pojo/cluster/ClusterNodeRequest.java | 3 +- .../pojo/cluster/agent/AgentClusterNodeDTO.java | 64 +++++++++++++++ .../cluster/agent/AgentClusterNodeRequest.java | 46 +++++++++++ .../cluster/agent/AgentClusterNodeResponse.java | 46 +++++++++++ .../service/cluster/InlongClusterServiceImpl.java | 45 ++++------- .../cluster/node/AbstractClusterNodeOperator.java | 81 +++++++++++++++++++ .../cluster/node/AgentClusterNodeOperator.java | 90 ++++++++++++++++++++++ .../cluster/node/DefaultClusterNodeOperator.java | 60 +++++++++++++++ .../cluster/node/InlongClusterNodeOperator.java | 62 +++++++++++++++ .../node/InlongClusterNodeOperatorFactory.java | 52 +++++++++++++ .../service/core/impl/AgentServiceImpl.java | 70 +++++++++-------- .../service/heartbeat/HeartbeatManager.java | 23 +++--- 15 files changed, 613 insertions(+), 98 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java deleted file mode 100644 index 85da5a876..000000000 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.common.consts; - -/** - * Constant class for agent ext params - */ -public class AgentConstants { - - public static final String AGENT_GROUP_KEY = "agentGroup"; -} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java index dabc837dd..83d2108a3 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java @@ -43,6 +43,8 @@ public interface InlongClusterNodeEntityMapper { int updateById(InlongClusterNodeEntity record); + int updateByIdSelective(InlongClusterNodeEntity record); + int deleteById(Integer id); } \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index 6465d3c91..b65afebc5 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -150,6 +150,47 @@ where id = #{id,jdbcType=INTEGER} and version = #{version,jdbcType=INTEGER} </update> + <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity"> + update inlong_cluster_node + <set> + <if test="parentId != null"> + parent_id = #{parentId,jdbcType=INTEGER}, + </if> + <if test="type != null"> + type = #{type,jdbcType=VARCHAR}, + </if> + <if test="ip != null"> + ip = #{ip,jdbcType=VARCHAR}, + </if> + <if test="port != null"> + port = #{port,jdbcType=INTEGER}, + </if> + <if test="protocolType != null"> + protocol_type = #{protocolType,jdbcType=VARCHAR}, + </if> + <if test="nodeLoad != null"> + node_load = #{nodeLoad,jdbcType=INTEGER}, + </if> + <if test="extParams != null"> + ext_params = #{extParams,jdbcType=LONGVARCHAR}, + </if> + <if test="description != null"> + description = #{description,jdbcType=VARCHAR}, + </if> + <if test="status != null"> + status = #{status,jdbcType=INTEGER}, + </if> + <if test="isDeleted != null"> + is_deleted = #{isDeleted,jdbcType=INTEGER}, + </if> + <if test="modifier != null"> + modifier = #{modifier,jdbcType=VARCHAR}, + </if> + version = #{version,jdbcType=INTEGER} + 1 + </set> + where id = #{id,jdbcType=INTEGER} + and version = #{version,jdbcType=INTEGER} + </update> <delete id="deleteById" parameterType="java.lang.Integer"> delete diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java index 5e789100b..80fe962f8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.pojo.cluster; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -31,6 +32,7 @@ import javax.validation.constraints.NotNull; */ @Data @ApiModel("Cluster node request") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type", defaultImpl = ClusterNodeRequest.class) public class ClusterNodeRequest { @NotNull(groups = UpdateValidation.class) @@ -53,7 +55,6 @@ public class ClusterNodeRequest { @NotNull(message = "port cannot be null") @ApiModelProperty(value = "Cluster port") - @Length(max = 6, message = "length must be less than or equal to 6") private Integer port; @NotBlank(message = "protocolType cannot be blank") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java new file mode 100644 index 000000000..1d6e4a6a4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.agent; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import javax.validation.constraints.NotNull; + +/** + * Agent cluster node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Agent cluster node info") +public class AgentClusterNodeDTO { + + @ApiModelProperty(value = "Agent group name") + private String agentGroup; + + /** + * Get the dto instance from the request + */ + public static AgentClusterNodeDTO getFromRequest(AgentClusterNodeRequest request) { + return CommonBeanUtils.copyProperties(request, AgentClusterNodeDTO::new, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static AgentClusterNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, AgentClusterNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java new file mode 100644 index 000000000..1cc3b4242 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.agent; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; + +/** + * Inlong cluster node request for Agent + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.AGENT) +@ApiModel("Inlong cluster node request for Agent") +public class AgentClusterNodeRequest extends ClusterNodeRequest { + + @ApiModelProperty(value = "Agent group name") + private String agentGroup; + + public AgentClusterNodeRequest() { + this.setType(ClusterType.AGENT); + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java new file mode 100644 index 000000000..af43ae221 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.agent; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; + +/** + * Agent cluster response + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.AGENT) +@ApiModel("Inlong cluster node response for Agent") +public class AgentClusterNodeResponse extends ClusterNodeResponse { + + @ApiModelProperty(value = "Agent group name") + private String agentGroup; + + public AgentClusterNodeResponse() { + this.setType(ClusterType.AGENT); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index 6ddd96575..9ece6b2b0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -69,6 +69,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.user.UserInfo; +import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperator; +import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperatorFactory; import org.apache.inlong.manager.service.repository.DataProxyConfigRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +108,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { @Autowired private InlongClusterOperatorFactory clusterOperatorFactory; @Autowired + private InlongClusterNodeOperatorFactory clusterNodeOperatorFactory; + @Autowired private InlongClusterTagEntityMapper clusterTagMapper; @Autowired private InlongClusterEntityMapper clusterMapper; @@ -985,14 +989,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { LOGGER.error(errMsg); throw new BusinessException(errMsg); } - - InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new); - entity.setCreator(operator); - entity.setModifier(operator); - clusterNodeMapper.insert(entity); - - LOGGER.info("success to add inlong cluster node={}", request); - return entity.getId(); + InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType()); + return instance.saveOpt(request, operator); } @Override @@ -1027,11 +1025,9 @@ public class InlongClusterServiceImpl implements InlongClusterService { request.getType(), request.getIp(), request.getPort())); } // add record - InlongClusterNodeEntity clusterNode = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new); - clusterNode.setCreator(opInfo.getName()); - clusterNode.setModifier(opInfo.getName()); - clusterNodeMapper.insert(clusterNode); - return entity.getId(); + InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType()); + instance.saveOpt(request, opInfo.getName()); + return instance.saveOpt(request, opInfo.getName()); } @Override @@ -1045,9 +1041,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { InlongClusterEntity cluster = clusterMapper.selectById(entity.getParentId()); String message = "Current user does not have permission to get cluster node"; checkUser(cluster, currentUser, message); - ClusterNodeResponse clusterNodeResponse = CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new); - LOGGER.debug("success to get inlong cluster node by id={}", id); - return clusterNodeResponse; + InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(entity.getType()); + return instance.getFromEntity(entity); } @Override @@ -1267,14 +1262,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { String message = "Current user does not have permission to update cluster node"; checkUser(cluster, operator, message); - CommonBeanUtils.copyProperties(request, entity, true); - entity.setParentId(request.getParentId()); - entity.setModifier(operator); - if (InlongConstants.AFFECTED_ONE_ROW != clusterNodeMapper.updateById(entity)) { - LOGGER.warn(errMsg); - throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); - } - LOGGER.info("success to update inlong cluster node={}", request); + InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType()); + instance.updateOpt(request, operator); return true; } @@ -1322,12 +1311,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { "inlong cluster node already exist for " + request); } // update record - CommonBeanUtils.copyProperties(request, entity, true); - entity.setParentId(request.getParentId()); - entity.setModifier(opInfo.getName()); - if (InlongConstants.AFFECTED_ONE_ROW != clusterNodeMapper.updateById(entity)) { - throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); - } + InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType()); + instance.updateOpt(request, opInfo.getName()); return true; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java new file mode 100644 index 000000000..9f971bbd5 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster.node; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; +import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Transactional; + +/** + * Default operator of inlong cluster node. + */ +public abstract class AbstractClusterNodeOperator implements InlongClusterNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractClusterNodeOperator.class); + + @Autowired + protected InlongClusterNodeEntityMapper clusterNodeMapper; + + @Override + @Transactional(rollbackFor = Throwable.class) + public Integer saveOpt(ClusterNodeRequest request, String operator) { + InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new); + // set the ext params + this.setTargetEntity(request, entity); + + entity.setCreator(operator); + entity.setModifier(operator); + clusterNodeMapper.insert(entity); + + return entity.getId(); + } + + /** + * Set the parameters of the target entity. + * + * @param request inlong cluster request + * @param targetEntity entity which will set the new parameters + */ + protected abstract void setTargetEntity(ClusterNodeRequest request, InlongClusterNodeEntity targetEntity); + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public void updateOpt(ClusterNodeRequest request, String operator) { + InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new); + // set the ext params + this.setTargetEntity(request, entity); + entity.setModifier(operator); + if (InlongConstants.AFFECTED_ONE_ROW != clusterNodeMapper.updateByIdSelective(entity)) { + String errMsg = String.format( + "cluster node has already updated with ip=%s, port=%s, protocolType=%s, type=%s, curVersion=%s", + entity.getIp(), entity.getPort(), entity.getProtocolType(), entity.getType(), entity.getVersion()); + LOGGER.warn(errMsg); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg); + } + LOGGER.info("success to update inlong cluster node={}", request); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java new file mode 100644 index 000000000..38b0ce42e --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster.node; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Agent cluster node operator. + */ +@Slf4j +@Service +public class AgentClusterNodeOperator extends AbstractClusterNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AgentClusterNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String clusterNodeType) { + return getClusterNodeType().equals(clusterNodeType); + } + + @Override + public String getClusterNodeType() { + return ClusterType.AGENT; + } + + @Override + public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); + } + + AgentClusterNodeResponse agentClusterNodeResponse = new AgentClusterNodeResponse(); + CommonBeanUtils.copyProperties(entity, agentClusterNodeResponse); + if (StringUtils.isNotBlank(entity.getExtParams())) { + AgentClusterNodeDTO dto = AgentClusterNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, agentClusterNodeResponse); + } + + LOGGER.debug("success to get agent cluster node info from entity"); + return agentClusterNodeResponse; + } + + @Override + protected void setTargetEntity(ClusterNodeRequest request, InlongClusterNodeEntity targetEntity) { + AgentClusterNodeRequest agentNodeRequest = (AgentClusterNodeRequest) request; + CommonBeanUtils.copyProperties(agentNodeRequest, targetEntity, true); + try { + AgentClusterNodeDTO dto = AgentClusterNodeDTO.getFromRequest(agentNodeRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + LOGGER.debug("success to set entity for agent cluster node"); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java new file mode 100644 index 000000000..a684f4b86 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster.node; + +import lombok.extern.slf4j.Slf4j; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * Default cluster node operator. + */ +@Slf4j +@Service +public class DefaultClusterNodeOperator extends AbstractClusterNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterNodeOperator.class); + public static final String DEFAULT = "DEFAULT"; + + @Override + public Boolean accept(String clusterNodeType) { + return getClusterNodeType().equals(clusterNodeType); + } + + @Override + public String getClusterNodeType() { + return DEFAULT; + } + + @Override + public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) { + ClusterNodeResponse clusterNodeResponse = CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new); + LOGGER.debug("success to get inlong cluster node by id={}", entity.getId()); + return clusterNodeResponse; + } + + @Override + protected void setTargetEntity(ClusterNodeRequest request, InlongClusterNodeEntity targetEntity) { + LOGGER.info("do nothing for default cluster node in set target entity"); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java new file mode 100644 index 000000000..075f8fcb5 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster.node; + +import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; +import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse; + +public interface InlongClusterNodeOperator { + + /** + * Determines whether the current instance matches the specified type. + */ + Boolean accept(String clusterType); + + /** + * Get the cluster node type. + * + * @return cluster node type string + */ + String getClusterNodeType(); + + /** + * Save the inlong cluster node info. + * + * @param request request of the cluster node + * @param operator name of the operator + * @return cluster node id after saving + */ + Integer saveOpt(ClusterNodeRequest request, String operator); + + /** + * Get the cluster node info from the given entity. + * + * @param entity get field value from the entity + * @return cluster info after encapsulating + */ + ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity); + + /** + * Update the inlong cluster node info. + * + * @param request request of update + * @param operator name of operator + */ + void updateOpt(ClusterNodeRequest request, String operator); +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java new file mode 100644 index 000000000..d7b6873e2 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster.node; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * Factory for {@link InlongClusterNodeOperator}. + */ +@Service +public class InlongClusterNodeOperatorFactory { + + @Autowired + private List<InlongClusterNodeOperator> clusterNodeOperatorList; + + public static final String DEFAULT = "DEFAULT"; + + /** + * Get a cluster node operator instance via the given type + */ + public InlongClusterNodeOperator getInstance(String type) { + return clusterNodeOperatorList.stream() + .filter(inst -> inst.accept(type)) + .findFirst() + .orElseGet(() -> clusterNodeOperatorList.stream() + .filter(inst -> inst.accept(DEFAULT)) + .findFirst() + .orElseThrow( + () -> new BusinessException(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED.getMessage(), type)))); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 13918600c..035bcc993 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.core.impl; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; @@ -35,7 +36,6 @@ import org.apache.inlong.common.pojo.agent.TaskResult; import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo; import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; -import org.apache.inlong.manager.common.consts.AgentConstants; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; @@ -56,8 +56,9 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; -import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; @@ -79,7 +80,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -221,16 +221,19 @@ public class AgentServiceImpl implements AgentService { pageRequest.setType(ClusterType.AGENT); pageRequest.setKeyword(clusterNode); return clusterNodeMapper.selectByCondition(pageRequest).stream(); - }).filter(entity -> entity != null) + }).filter(Objects::nonNull) .forEach(entity -> { - Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>() - : GSON.fromJson(entity.getExtParams(), Map.class); - Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>() - : Sets.newHashSet( - extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); + Set<String> groupSet = new HashSet<>(); + AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO(); + if (StringUtils.isNotBlank(entity.getExtParams())) { + agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(entity.getExtParams()); + String agentGroup = agentClusterNodeDTO.getAgentGroup(); + groupSet = StringUtils.isBlank(agentGroup) ? groupSet + : Sets.newHashSet(agentGroup.split(InlongConstants.COMMA)); + } groupSet.add(request.getAgentGroup()); - extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet)); - entity.setExtParams(GSON.toJson(extParams)); + agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(groupSet)); + entity.setExtParams(GSON.toJson(agentClusterNodeDTO)); clusterNodeMapper.insertOnDuplicateKeyUpdate(entity); }); } @@ -242,16 +245,19 @@ public class AgentServiceImpl implements AgentService { pageRequest.setType(ClusterType.AGENT); pageRequest.setKeyword(clusterNode); return clusterNodeMapper.selectByCondition(pageRequest).stream(); - }).filter(entity -> entity != null) + }).filter(Objects::nonNull) .forEach(entity -> { - Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>() - : GSON.fromJson(entity.getExtParams(), Map.class); - Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>() - : Sets.newHashSet( - extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); + Set<String> groupSet = new HashSet<>(); + AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO(); + if (StringUtils.isNotBlank(entity.getExtParams())) { + agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(entity.getExtParams()); + String agentGroup = agentClusterNodeDTO.getAgentGroup(); + groupSet = StringUtils.isBlank(agentGroup) ? groupSet + : Sets.newHashSet(agentGroup.split(InlongConstants.COMMA)); + } groupSet.remove(request.getAgentGroup()); - extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet)); - entity.setExtParams(GSON.toJson(extParams)); + agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(groupSet)); + entity.setExtParams(GSON.toJson(agentClusterNodeDTO)); clusterNodeMapper.insertOnDuplicateKeyUpdate(entity); }); } @@ -361,9 +367,9 @@ public class AgentServiceImpl implements AgentService { /** * Find file collecting task match those condition: - * 1.agent ip match - * 2.cluster name match - * Send the corresponding task action request according to the matching state of the tag and the current state + * 1.agent ip match + * 2.cluster name match + * Send the corresponding task action request according to the matching state of the tag and the current state */ private void preProcessLabelFileTasks(TaskRequest taskRequest) { List<Integer> needProcessedStatusList = Arrays.asList( @@ -387,7 +393,7 @@ public class AgentServiceImpl implements AgentService { Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet( SourceStatus.SOURCE_FROZEN, SourceStatus.TO_BE_ISSUED_FROZEN); - if (!matchLabel(sourceEntity, clusterNodeEntity) + if (!matchGroup(sourceEntity, clusterNodeEntity) && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) { LOGGER.info("Transform task({}) from {} to {} because tag mismatch " + "for agent({}) in cluster({})", sourceEntity.getAgentIp(), @@ -406,7 +412,7 @@ public class AgentServiceImpl implements AgentService { SourceStatus.TO_BE_ISSUED_ACTIVE); Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet( StreamStatus.SUSPENDED, StreamStatus.SUSPENDED); - if (matchLabel(sourceEntity, clusterNodeEntity) + if (matchGroup(sourceEntity, clusterNodeEntity) && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus())) && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) { LOGGER.info("Transform task({}) from {} to {} because tag rematch " @@ -575,7 +581,7 @@ public class AgentServiceImpl implements AgentService { }).collect(Collectors.toList()); } - private boolean matchLabel(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) { + private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) { Preconditions.checkNotNull(sourceEntity, "cluster must be valid"); if (sourceEntity.getInlongClusterNodeGroup() == null) { return true; @@ -585,12 +591,16 @@ public class AgentServiceImpl implements AgentService { return false; } - Map<String, String> extParams = GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class); - Set<String> clusterNodeLabels = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>() - : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); - Set<String> sourceLabels = Stream.of( + Set<String> clusterNodeGroups = new HashSet<>(); + if (StringUtils.isNotBlank(clusterNodeEntity.getExtParams())) { + AgentClusterNodeDTO agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(clusterNodeEntity.getExtParams()); + String agentGroup = agentClusterNodeDTO.getAgentGroup(); + clusterNodeGroups = StringUtils.isBlank(agentGroup) ? new HashSet<>() + : Sets.newHashSet(agentGroup.split(InlongConstants.COMMA)); + } + Set<String> sourceGroups = Stream.of( sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet()); - return sourceLabels.stream().anyMatch(clusterNodeLabels::contains); + return sourceGroups.stream().anyMatch(clusterNodeGroups::contains); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 2fb8d5608..4232e4392 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Joiner; import com.google.gson.Gson; import lombok.Getter; import lombok.SneakyThrows; @@ -31,7 +32,6 @@ import org.apache.inlong.common.enums.NodeSrvStatus; import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager; import org.apache.inlong.common.heartbeat.ComponentHeartbeat; import org.apache.inlong.common.heartbeat.HeartbeatMsg; -import org.apache.inlong.manager.common.consts.AgentConstants; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterStatus; import org.apache.inlong.manager.common.enums.NodeStatus; @@ -43,6 +43,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.service.cluster.InlongClusterOperator; import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -50,9 +51,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -224,24 +223,26 @@ public class HeartbeatManager implements AbstractHeartbeatManager { clusterNode.setCreator(creator); clusterNode.setModifier(creator); clusterNode.setDescription(AUTO_REGISTERED); - insertOrUpdateLabel(clusterNode, heartbeat); + insertOrUpdateNodeGroup(clusterNode, heartbeat); return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode); } private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) { clusterNode.setStatus(ClusterStatus.NORMAL.getStatus()); clusterNode.setNodeLoad(heartbeat.getLoad()); - insertOrUpdateLabel(clusterNode, heartbeat); + insertOrUpdateNodeGroup(clusterNode, heartbeat); return clusterNodeMapper.updateById(clusterNode); } - private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) { - Set<String> groupSet = heartbeat.getNodeGroup() == null ? new HashSet<>() + private void insertOrUpdateNodeGroup(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) { + Set<String> groupSet = StringUtils.isBlank(heartbeat.getNodeGroup()) ? new HashSet<>() : Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet()); - Map<String, String> extParams = clusterNode.getExtParams() == null ? new HashMap<>() - : GSON.fromJson(clusterNode.getExtParams(), Map.class); - extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet)); - clusterNode.setExtParams(GSON.toJson(extParams)); + AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO(); + if (StringUtils.isNotBlank(clusterNode.getExtParams())) { + agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(clusterNode.getExtParams()); + agentClusterNodeDTO.setAgentGroup(Joiner.on(InlongConstants.COMMA).join(groupSet)); + } + clusterNode.setExtParams(GSON.toJson(agentClusterNodeDTO)); } private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {