This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 10e512ff0 [INLONG-7199][Manager] Support save extension params for 
inlong cluster node (#7200)
10e512ff0 is described below

commit 10e512ff0b87a1f7aa49c187bc3f8a656476efad
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Jan 10 20:43:41 2023 +0800

    [INLONG-7199][Manager] Support save extension params for inlong cluster 
node (#7200)
---
 .../manager/common/consts/AgentConstants.java      | 26 -------
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |  2 +
 .../mappers/InlongClusterNodeEntityMapper.xml      | 41 ++++++++++
 .../manager/pojo/cluster/ClusterNodeRequest.java   |  3 +-
 .../pojo/cluster/agent/AgentClusterNodeDTO.java    | 64 +++++++++++++++
 .../cluster/agent/AgentClusterNodeRequest.java     | 46 +++++++++++
 .../cluster/agent/AgentClusterNodeResponse.java    | 46 +++++++++++
 .../service/cluster/InlongClusterServiceImpl.java  | 45 ++++-------
 .../cluster/node/AbstractClusterNodeOperator.java  | 81 +++++++++++++++++++
 .../cluster/node/AgentClusterNodeOperator.java     | 90 ++++++++++++++++++++++
 .../cluster/node/DefaultClusterNodeOperator.java   | 60 +++++++++++++++
 .../cluster/node/InlongClusterNodeOperator.java    | 62 +++++++++++++++
 .../node/InlongClusterNodeOperatorFactory.java     | 52 +++++++++++++
 .../service/core/impl/AgentServiceImpl.java        | 70 +++++++++--------
 .../service/heartbeat/HeartbeatManager.java        | 23 +++---
 15 files changed, 613 insertions(+), 98 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
deleted file mode 100644
index 85da5a876..000000000
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.consts;
-
-/**
- * Constant class for agent ext params
- */
-public class AgentConstants {
-
-    public static final String AGENT_GROUP_KEY = "agentGroup";
-}
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index dabc837dd..83d2108a3 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -43,6 +43,8 @@ public interface InlongClusterNodeEntityMapper {
 
     int updateById(InlongClusterNodeEntity record);
 
+    int updateByIdSelective(InlongClusterNodeEntity record);
+
     int deleteById(Integer id);
 
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 6465d3c91..b65afebc5 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -150,6 +150,47 @@
         where id = #{id,jdbcType=INTEGER}
           and version = #{version,jdbcType=INTEGER}
     </update>
+    <update id="updateByIdSelective" 
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
+        update inlong_cluster_node
+        <set>
+            <if test="parentId != null">
+                parent_id = #{parentId,jdbcType=INTEGER},
+            </if>
+            <if test="type != null">
+                type = #{type,jdbcType=VARCHAR},
+            </if>
+            <if test="ip != null">
+                ip = #{ip,jdbcType=VARCHAR},
+            </if>
+            <if test="port != null">
+                port = #{port,jdbcType=INTEGER},
+            </if>
+            <if test="protocolType != null">
+                protocol_type = #{protocolType,jdbcType=VARCHAR},
+            </if>
+            <if test="nodeLoad != null">
+                node_load = #{nodeLoad,jdbcType=INTEGER},
+            </if>
+            <if test="extParams != null">
+                ext_params = #{extParams,jdbcType=LONGVARCHAR},
+            </if>
+            <if test="description != null">
+                description = #{description,jdbcType=VARCHAR},
+            </if>
+            <if test="status != null">
+                status = #{status,jdbcType=INTEGER},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier,jdbcType=VARCHAR},
+            </if>
+            version = #{version,jdbcType=INTEGER} + 1
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+        and version = #{version,jdbcType=INTEGER}
+    </update>
 
     <delete id="deleteById" parameterType="java.lang.Integer">
         delete
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 5e789100b..80fe962f8 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.cluster;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
@@ -31,6 +32,7 @@ import javax.validation.constraints.NotNull;
  */
 @Data
 @ApiModel("Cluster node request")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type", 
defaultImpl = ClusterNodeRequest.class)
 public class ClusterNodeRequest {
 
     @NotNull(groups = UpdateValidation.class)
@@ -53,7 +55,6 @@ public class ClusterNodeRequest {
 
     @NotNull(message = "port cannot be null")
     @ApiModelProperty(value = "Cluster port")
-    @Length(max = 6, message = "length must be less than or equal to 6")
     private Integer port;
 
     @NotBlank(message = "protocolType cannot be blank")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java
new file mode 100644
index 000000000..1d6e4a6a4
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.agent;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Agent cluster node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Agent cluster node info")
+public class AgentClusterNodeDTO {
+
+    @ApiModelProperty(value = "Agent group name")
+    private String agentGroup;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static AgentClusterNodeDTO getFromRequest(AgentClusterNodeRequest 
request) {
+        return CommonBeanUtils.copyProperties(request, 
AgentClusterNodeDTO::new, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static AgentClusterNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, AgentClusterNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java
new file mode 100644
index 000000000..1cc3b4242
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.agent;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+
+/**
+ * Inlong cluster node request for Agent
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.AGENT)
+@ApiModel("Inlong cluster node request for Agent")
+public class AgentClusterNodeRequest extends ClusterNodeRequest {
+
+    @ApiModelProperty(value = "Agent group name")
+    private String agentGroup;
+
+    public AgentClusterNodeRequest() {
+        this.setType(ClusterType.AGENT);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java
new file mode 100644
index 000000000..af43ae221
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.agent;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+
+/**
+ * Agent cluster response
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.AGENT)
+@ApiModel("Inlong cluster node response for Agent")
+public class AgentClusterNodeResponse extends ClusterNodeResponse {
+
+    @ApiModelProperty(value = "Agent group name")
+    private String agentGroup;
+
+    public AgentClusterNodeResponse() {
+        this.setType(ClusterType.AGENT);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 6ddd96575..9ece6b2b0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -69,6 +69,8 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.user.UserInfo;
+import 
org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperator;
+import 
org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperatorFactory;
 import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +108,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
     @Autowired
     private InlongClusterOperatorFactory clusterOperatorFactory;
     @Autowired
+    private InlongClusterNodeOperatorFactory clusterNodeOperatorFactory;
+    @Autowired
     private InlongClusterTagEntityMapper clusterTagMapper;
     @Autowired
     private InlongClusterEntityMapper clusterMapper;
@@ -985,14 +989,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
-
-        InlongClusterNodeEntity entity = 
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
-        entity.setCreator(operator);
-        entity.setModifier(operator);
-        clusterNodeMapper.insert(entity);
-
-        LOGGER.info("success to add inlong cluster node={}", request);
-        return entity.getId();
+        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
+        return instance.saveOpt(request, operator);
     }
 
     @Override
@@ -1027,11 +1025,9 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
                             request.getType(), request.getIp(), 
request.getPort()));
         }
         // add record
-        InlongClusterNodeEntity clusterNode = 
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
-        clusterNode.setCreator(opInfo.getName());
-        clusterNode.setModifier(opInfo.getName());
-        clusterNodeMapper.insert(clusterNode);
-        return entity.getId();
+        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
+        instance.saveOpt(request, opInfo.getName());
+        return instance.saveOpt(request, opInfo.getName());
     }
 
     @Override
@@ -1045,9 +1041,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         InlongClusterEntity cluster = 
clusterMapper.selectById(entity.getParentId());
         String message = "Current user does not have permission to get cluster 
node";
         checkUser(cluster, currentUser, message);
-        ClusterNodeResponse clusterNodeResponse = 
CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new);
-        LOGGER.debug("success to get inlong cluster node by id={}", id);
-        return clusterNodeResponse;
+        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(entity.getType());
+        return instance.getFromEntity(entity);
     }
 
     @Override
@@ -1267,14 +1262,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         String message = "Current user does not have permission to update 
cluster node";
         checkUser(cluster, operator, message);
 
-        CommonBeanUtils.copyProperties(request, entity, true);
-        entity.setParentId(request.getParentId());
-        entity.setModifier(operator);
-        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterNodeMapper.updateById(entity)) {
-            LOGGER.warn(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        LOGGER.info("success to update inlong cluster node={}", request);
+        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
+        instance.updateOpt(request, operator);
         return true;
     }
 
@@ -1322,12 +1311,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
                     "inlong cluster node already exist for " + request);
         }
         // update record
-        CommonBeanUtils.copyProperties(request, entity, true);
-        entity.setParentId(request.getParentId());
-        entity.setModifier(opInfo.getName());
-        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterNodeMapper.updateById(entity)) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
+        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
+        instance.updateOpt(request, opInfo.getName());
         return true;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
new file mode 100644
index 000000000..9f971bbd5
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operator of inlong cluster node.
+ */
+public abstract class AbstractClusterNodeOperator implements 
InlongClusterNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractClusterNodeOperator.class);
+
+    @Autowired
+    protected InlongClusterNodeEntityMapper clusterNodeMapper;
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Integer saveOpt(ClusterNodeRequest request, String operator) {
+        InlongClusterNodeEntity entity = 
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        clusterNodeMapper.insert(entity);
+
+        return entity.getId();
+    }
+
+    /**
+     * Set the parameters of the target entity.
+     *
+     * @param request inlong cluster request
+     * @param targetEntity entity which will set the new parameters
+     */
+    protected abstract void setTargetEntity(ClusterNodeRequest request, 
InlongClusterNodeEntity targetEntity);
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
+    public void updateOpt(ClusterNodeRequest request, String operator) {
+        InlongClusterNodeEntity entity = 
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+        entity.setModifier(operator);
+        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterNodeMapper.updateByIdSelective(entity)) {
+            String errMsg = String.format(
+                    "cluster node has already updated with ip=%s, port=%s, 
protocolType=%s, type=%s, curVersion=%s",
+                    entity.getIp(), entity.getPort(), 
entity.getProtocolType(), entity.getType(), entity.getVersion());
+            LOGGER.warn(errMsg);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg);
+        }
+        LOGGER.info("success to update inlong cluster node={}", request);
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
new file mode 100644
index 000000000..38b0ce42e
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Agent cluster node operator.
+ */
+@Slf4j
+@Service
+public class AgentClusterNodeOperator extends AbstractClusterNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentClusterNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String clusterNodeType) {
+        return getClusterNodeType().equals(clusterNodeType);
+    }
+
+    @Override
+    public String getClusterNodeType() {
+        return ClusterType.AGENT;
+    }
+
+    @Override
+    public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+        }
+
+        AgentClusterNodeResponse agentClusterNodeResponse = new 
AgentClusterNodeResponse();
+        CommonBeanUtils.copyProperties(entity, agentClusterNodeResponse);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            AgentClusterNodeDTO dto = 
AgentClusterNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, agentClusterNodeResponse);
+        }
+
+        LOGGER.debug("success to get agent cluster node info from entity");
+        return agentClusterNodeResponse;
+    }
+
+    @Override
+    protected void setTargetEntity(ClusterNodeRequest request, 
InlongClusterNodeEntity targetEntity) {
+        AgentClusterNodeRequest agentNodeRequest = (AgentClusterNodeRequest) 
request;
+        CommonBeanUtils.copyProperties(agentNodeRequest, targetEntity, true);
+        try {
+            AgentClusterNodeDTO dto = 
AgentClusterNodeDTO.getFromRequest(agentNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            LOGGER.debug("success to set entity for agent cluster node");
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java
new file mode 100644
index 000000000..a684f4b86
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Default cluster node operator.
+ */
+@Slf4j
+@Service
+public class DefaultClusterNodeOperator extends AbstractClusterNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultClusterNodeOperator.class);
+    public static final String DEFAULT = "DEFAULT";
+
+    @Override
+    public Boolean accept(String clusterNodeType) {
+        return getClusterNodeType().equals(clusterNodeType);
+    }
+
+    @Override
+    public String getClusterNodeType() {
+        return DEFAULT;
+    }
+
+    @Override
+    public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) {
+        ClusterNodeResponse clusterNodeResponse = 
CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new);
+        LOGGER.debug("success to get inlong cluster node by id={}", 
entity.getId());
+        return clusterNodeResponse;
+    }
+
+    @Override
+    protected void setTargetEntity(ClusterNodeRequest request, 
InlongClusterNodeEntity targetEntity) {
+        LOGGER.info("do nothing for default cluster node in set target 
entity");
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java
new file mode 100644
index 000000000..075f8fcb5
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+
+public interface InlongClusterNodeOperator {
+
+    /**
+     * Determines whether the current instance matches the specified type.
+     */
+    Boolean accept(String clusterType);
+
+    /**
+     * Get the cluster node type.
+     *
+     * @return cluster node type string
+     */
+    String getClusterNodeType();
+
+    /**
+     * Save the inlong cluster node info.
+     *
+     * @param request request of the cluster node
+     * @param operator name of the operator
+     * @return cluster node id after saving
+     */
+    Integer saveOpt(ClusterNodeRequest request, String operator);
+
+    /**
+     * Get the cluster node info from the given entity.
+     *
+     * @param entity get field value from the entity
+     * @return cluster info after encapsulating
+     */
+    ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity);
+
+    /**
+     * Update the inlong cluster node info.
+     *
+     * @param request request of update
+     * @param operator name of operator
+     */
+    void updateOpt(ClusterNodeRequest request, String operator);
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java
new file mode 100644
index 000000000..d7b6873e2
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InlongClusterNodeOperator}.
+ */
+@Service
+public class InlongClusterNodeOperatorFactory {
+
+    @Autowired
+    private List<InlongClusterNodeOperator> clusterNodeOperatorList;
+
+    public static final String DEFAULT = "DEFAULT";
+
+    /**
+     * Get a cluster node operator instance via the given type
+     */
+    public InlongClusterNodeOperator getInstance(String type) {
+        return clusterNodeOperatorList.stream()
+                .filter(inst -> inst.accept(type))
+                .findFirst()
+                .orElseGet(() -> clusterNodeOperatorList.stream()
+                        .filter(inst -> inst.accept(DEFAULT))
+                        .findFirst()
+                        .orElseThrow(
+                                () -> new 
BusinessException(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED,
+                                        
String.format(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED.getMessage(), type))));
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 13918600c..035bcc993 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
@@ -35,7 +36,6 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
-import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -56,8 +56,9 @@ import 
org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
-import 
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import 
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
@@ -79,7 +80,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -221,16 +221,19 @@ public class AgentServiceImpl implements AgentService {
                 pageRequest.setType(ClusterType.AGENT);
                 pageRequest.setKeyword(clusterNode);
                 return 
clusterNodeMapper.selectByCondition(pageRequest).stream();
-            }).filter(entity -> entity != null)
+            }).filter(Objects::nonNull)
                     .forEach(entity -> {
-                        Map<String, String> extParams = entity.getExtParams() 
== null ? new HashMap<>()
-                                : GSON.fromJson(entity.getExtParams(), 
Map.class);
-                        Set<String> groupSet = 
!extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
-                                : Sets.newHashSet(
-                                        
extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+                        Set<String> groupSet = new HashSet<>();
+                        AgentClusterNodeDTO agentClusterNodeDTO = new 
AgentClusterNodeDTO();
+                        if (StringUtils.isNotBlank(entity.getExtParams())) {
+                            agentClusterNodeDTO = 
AgentClusterNodeDTO.getFromJson(entity.getExtParams());
+                            String agentGroup = 
agentClusterNodeDTO.getAgentGroup();
+                            groupSet = StringUtils.isBlank(agentGroup) ? 
groupSet
+                                    : 
Sets.newHashSet(agentGroup.split(InlongConstants.COMMA));
+                        }
                         groupSet.add(request.getAgentGroup());
-                        extParams.put(AgentConstants.AGENT_GROUP_KEY, 
String.join(InlongConstants.COMMA, groupSet));
-                        entity.setExtParams(GSON.toJson(extParams));
+                        
agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(groupSet));
+                        entity.setExtParams(GSON.toJson(agentClusterNodeDTO));
                         clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
                     });
         }
@@ -242,16 +245,19 @@ public class AgentServiceImpl implements AgentService {
                 pageRequest.setType(ClusterType.AGENT);
                 pageRequest.setKeyword(clusterNode);
                 return 
clusterNodeMapper.selectByCondition(pageRequest).stream();
-            }).filter(entity -> entity != null)
+            }).filter(Objects::nonNull)
                     .forEach(entity -> {
-                        Map<String, String> extParams = entity.getExtParams() 
== null ? new HashMap<>()
-                                : GSON.fromJson(entity.getExtParams(), 
Map.class);
-                        Set<String> groupSet = 
!extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
-                                : Sets.newHashSet(
-                                        
extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+                        Set<String> groupSet = new HashSet<>();
+                        AgentClusterNodeDTO agentClusterNodeDTO = new 
AgentClusterNodeDTO();
+                        if (StringUtils.isNotBlank(entity.getExtParams())) {
+                            agentClusterNodeDTO = 
AgentClusterNodeDTO.getFromJson(entity.getExtParams());
+                            String agentGroup = 
agentClusterNodeDTO.getAgentGroup();
+                            groupSet = StringUtils.isBlank(agentGroup) ? 
groupSet
+                                    : 
Sets.newHashSet(agentGroup.split(InlongConstants.COMMA));
+                        }
                         groupSet.remove(request.getAgentGroup());
-                        extParams.put(AgentConstants.AGENT_GROUP_KEY, 
String.join(InlongConstants.COMMA, groupSet));
-                        entity.setExtParams(GSON.toJson(extParams));
+                        
agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(groupSet));
+                        entity.setExtParams(GSON.toJson(agentClusterNodeDTO));
                         clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
                     });
         }
@@ -361,9 +367,9 @@ public class AgentServiceImpl implements AgentService {
 
     /**
      * Find file collecting task match those condition:
-     *  1.agent ip match
-     *  2.cluster name match
-     *  Send the corresponding task action request according to the matching 
state of the tag and the current state
+     * 1.agent ip match
+     * 2.cluster name match
+     * Send the corresponding task action request according to the matching 
state of the tag and the current state
      */
     private void preProcessLabelFileTasks(TaskRequest taskRequest) {
         List<Integer> needProcessedStatusList = Arrays.asList(
@@ -387,7 +393,7 @@ public class AgentServiceImpl implements AgentService {
             Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
                     SourceStatus.SOURCE_FROZEN,
                     SourceStatus.TO_BE_ISSUED_FROZEN);
-            if (!matchLabel(sourceEntity, clusterNodeEntity)
+            if (!matchGroup(sourceEntity, clusterNodeEntity)
                     && 
!exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus())))
 {
                 LOGGER.info("Transform task({}) from {} to {} because tag 
mismatch "
                         + "for agent({}) in cluster({})", 
sourceEntity.getAgentIp(),
@@ -406,7 +412,7 @@ public class AgentServiceImpl implements AgentService {
                     SourceStatus.TO_BE_ISSUED_ACTIVE);
             Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
                     StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
-            if (matchLabel(sourceEntity, clusterNodeEntity)
+            if (matchGroup(sourceEntity, clusterNodeEntity)
                     && 
!exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
                     && 
!exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus())))
 {
                 LOGGER.info("Transform task({}) from {} to {} because tag 
rematch "
@@ -575,7 +581,7 @@ public class AgentServiceImpl implements AgentService {
         }).collect(Collectors.toList());
     }
 
-    private boolean matchLabel(StreamSourceEntity sourceEntity, 
InlongClusterNodeEntity clusterNodeEntity) {
+    private boolean matchGroup(StreamSourceEntity sourceEntity, 
InlongClusterNodeEntity clusterNodeEntity) {
         Preconditions.checkNotNull(sourceEntity, "cluster must be valid");
         if (sourceEntity.getInlongClusterNodeGroup() == null) {
             return true;
@@ -585,12 +591,16 @@ public class AgentServiceImpl implements AgentService {
             return false;
         }
 
-        Map<String, String> extParams = 
GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class);
-        Set<String> clusterNodeLabels = 
!extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
-                : 
Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
-        Set<String> sourceLabels = Stream.of(
+        Set<String> clusterNodeGroups = new HashSet<>();
+        if (StringUtils.isNotBlank(clusterNodeEntity.getExtParams())) {
+            AgentClusterNodeDTO agentClusterNodeDTO = 
AgentClusterNodeDTO.getFromJson(clusterNodeEntity.getExtParams());
+            String agentGroup = agentClusterNodeDTO.getAgentGroup();
+            clusterNodeGroups = StringUtils.isBlank(agentGroup) ? new 
HashSet<>()
+                    : Sets.newHashSet(agentGroup.split(InlongConstants.COMMA));
+        }
+        Set<String> sourceGroups = Stream.of(
                 
sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        return sourceLabels.stream().anyMatch(clusterNodeLabels::contains);
+        return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 2fb8d5608..4232e4392 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Joiner;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
@@ -31,7 +32,6 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
-import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterStatus;
 import org.apache.inlong.manager.common.enums.NodeStatus;
@@ -43,6 +43,7 @@ import 
org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
 import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
 import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -50,9 +51,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -224,24 +223,26 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         clusterNode.setCreator(creator);
         clusterNode.setModifier(creator);
         clusterNode.setDescription(AUTO_REGISTERED);
-        insertOrUpdateLabel(clusterNode, heartbeat);
+        insertOrUpdateNodeGroup(clusterNode, heartbeat);
         return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
     }
 
     private int updateClusterNode(InlongClusterNodeEntity clusterNode, 
HeartbeatMsg heartbeat) {
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
         clusterNode.setNodeLoad(heartbeat.getLoad());
-        insertOrUpdateLabel(clusterNode, heartbeat);
+        insertOrUpdateNodeGroup(clusterNode, heartbeat);
         return clusterNodeMapper.updateById(clusterNode);
     }
 
-    private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, 
HeartbeatMsg heartbeat) {
-        Set<String> groupSet = heartbeat.getNodeGroup() == null ? new 
HashSet<>()
+    private void insertOrUpdateNodeGroup(InlongClusterNodeEntity clusterNode, 
HeartbeatMsg heartbeat) {
+        Set<String> groupSet = StringUtils.isBlank(heartbeat.getNodeGroup()) ? 
new HashSet<>()
                 : 
Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        Map<String, String> extParams = clusterNode.getExtParams() == null ? 
new HashMap<>()
-                : GSON.fromJson(clusterNode.getExtParams(), Map.class);
-        extParams.put(AgentConstants.AGENT_GROUP_KEY, 
String.join(InlongConstants.COMMA, groupSet));
-        clusterNode.setExtParams(GSON.toJson(extParams));
+        AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO();
+        if (StringUtils.isNotBlank(clusterNode.getExtParams())) {
+            agentClusterNodeDTO = 
AgentClusterNodeDTO.getFromJson(clusterNode.getExtParams());
+            
agentClusterNodeDTO.setAgentGroup(Joiner.on(InlongConstants.COMMA).join(groupSet));
+        }
+        clusterNode.setExtParams(GSON.toJson(agentClusterNodeDTO));
     }
 
     private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {

Reply via email to