This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new aa91f5984 [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055) aa91f5984 is described below commit aa91f5984cf0cd230cba9098834b028a554fc65c Author: xuesongxs <54351417+xueson...@users.noreply.github.com> AuthorDate: Wed Oct 12 15:43:49 2022 +0800 [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055) * Add protocol type constant class * Fix unit test error * Update NodeEditModal.tsx Co-authored-by: healchow <healc...@gmail.com> Co-authored-by: Daniel <lee...@apache.org> --- .../common/heartbeat/ComponentHeartbeat.java | 5 +- .../inlong/common/heartbeat/HeartbeatMsg.java | 7 ++- .../common/pojo/dataproxy/DataProxyNodeInfo.java | 4 ++ inlong-dashboard/src/locales/cn.json | 2 + inlong-dashboard/src/locales/en.json | 2 + .../src/pages/Clusters/NodeEditModal.tsx | 19 ++++++++ inlong-dashboard/src/pages/Clusters/NodeManage.tsx | 4 ++ .../inlong/manager/common/consts/ProtocolType.java | 26 +++------- .../dao/entity/InlongClusterNodeEntity.java | 1 + .../dao/mapper/InlongClusterNodeEntityMapper.java | 3 +- .../mappers/InlongClusterNodeEntityMapper.xml | 57 ++++++++++++---------- .../manager/pojo/cluster/ClusterNodeRequest.java | 4 ++ .../manager/pojo/cluster/ClusterNodeResponse.java | 3 ++ .../service/cluster/InlongClusterService.java | 5 +- .../service/cluster/InlongClusterServiceImpl.java | 30 +++++------- .../service/core/heartbeat/HeartbeatManager.java | 2 + .../service/cluster/InlongClusterServiceTest.java | 46 +++++++++++------ .../core/heartbeat/HeartbeatManagerTest.java | 3 ++ .../service/core/impl/HeartbeatServiceTest.java | 2 + .../main/resources/h2/apache_inlong_manager.sql | 31 ++++++------ .../manager-web/sql/apache_inlong_manager.sql | 31 ++++++------ .../controller/openapi/DataProxyController.java | 6 ++- 22 files changed, 181 insertions(+), 112 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java index 260e4618c..8d6beda08 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java @@ -35,18 +35,21 @@ public class ComponentHeartbeat { private int port; + private String protocolType; + private String inCharges; public ComponentHeartbeat() { } public ComponentHeartbeat(String clusterTag, String clusterName, String componentType, String ip, int port, - String inCharges) { + String inCharges, String protocolType) { this.clusterTag = clusterTag; this.clusterName = clusterName; this.componentType = componentType; this.ip = ip; this.port = port; + this.protocolType = protocolType; this.inCharges = inCharges; } } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java index 5277eaf22..42038d993 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java @@ -43,6 +43,11 @@ public class HeartbeatMsg { */ private int port; + /** + * ProtocolType of component + */ + private String protocolType; + /** * Type of component */ @@ -79,6 +84,6 @@ public class HeartbeatMsg { private List<StreamHeartbeat> streamHeartbeats; public ComponentHeartbeat componentHeartbeat() { - return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges); + return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges, protocolType); } } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java index 6425348a8..9aca22240 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java @@ -40,4 +40,8 @@ public class DataProxyNodeInfo { */ private Integer port; + /** + * Node protocol type + */ + private String protocolType; } diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json index 16e5161f7..feeb8b4d6 100644 --- a/inlong-dashboard/src/locales/cn.json +++ b/inlong-dashboard/src/locales/cn.json @@ -432,10 +432,12 @@ "pages.Clusters.Description": "集群描述", "pages.Clusters.Node.Name": "节点", "pages.Clusters.Node.Port": "端口", + "pages.Clusters.Node.ProtocolType": "协议类型", "pages.Clusters.Node.LastModifier": "最后操作", "pages.Clusters.Node.Create": "新建节点", "pages.Clusters.Node.IpRule": "请输入正确的IP地址", "pages.Clusters.Node.PortRule": "请输入正确的端口", + "pages.Clusters.Node.ProtocolTypeRule": "请输入正确的协议类型", "pages.Clusters.Pulsar.Tenant": "默认租户", "pages.Clusters.Pulsar.ServiceUrlHelper": "用于生产和消费数据", "pages.Clusters.Pulsar.AdminUrlHelper": "用于管理(如:创建、修改)租户、命名空间、Topic 和订阅组", diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json index 691e8f98d..e9da7052a 100644 --- a/inlong-dashboard/src/locales/en.json +++ b/inlong-dashboard/src/locales/en.json @@ -433,10 +433,12 @@ "pages.Clusters.Description": "Description", "pages.Clusters.Node.Name": "Node", "pages.Clusters.Node.Port": "Port", + "pages.Clusters.Node.ProtocolType": "Protocol Type", "pages.Clusters.Node.LastModifier": "LastModifier", "pages.Clusters.Node.Create": "Create", "pages.Clusters.Node.IpRule": "Please enter the IP address correctly", "pages.Clusters.Node.PortRule": "Please enter the port address correctly", + "pages.Clusters.Node.ProtocolTypeRule": "Please enter the protocol type correctly", "pages.Clusters.Pulsar.Tenant": "Default Tenant", "pages.Clusters.Pulsar.ServiceUrlHelper": "For producing and consuming data", "pages.Clusters.Pulsar.AdminUrlHelper": "Used to manage (e.g. create, modify) tenants, namespaces, topics and subscription groups", diff --git a/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx b/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx index 167330348..a085ec0c0 100644 --- a/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx +++ b/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx @@ -106,6 +106,25 @@ const NodeEditModal: React.FC<NodeEditModalProps> = ({ id, type, clusterId, ...m max: 65535, }, }, + { + type: 'select', + label: i18n.t('pages.Clusters.Node.ProtocolType'), + name: 'protocolType', + initialValue: 'HTTP', + rules: [{ required: true }], + props: { + options: [ + { + label: 'HTTP', + value: 'HTTP', + }, + { + label: 'TCP', + value: 'TCP', + }, + ], + }, + }, { type: 'textarea', label: i18n.t('pages.Clusters.Description'), diff --git a/inlong-dashboard/src/pages/Clusters/NodeManage.tsx b/inlong-dashboard/src/pages/Clusters/NodeManage.tsx index 393cf41a1..9368f9128 100644 --- a/inlong-dashboard/src/pages/Clusters/NodeManage.tsx +++ b/inlong-dashboard/src/pages/Clusters/NodeManage.tsx @@ -125,6 +125,10 @@ const Comp: React.FC = () => { title: i18n.t('pages.Clusters.Node.Port'), dataIndex: 'port', }, + { + title: i18n.t('pages.Clusters.Node.ProtocolType'), + dataIndex: 'protocolType', + }, { title: i18n.t('pages.Clusters.Node.LastModifier'), dataIndex: 'modifier', diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java similarity index 72% copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java index 6425348a8..8a761fc8b 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java @@ -15,29 +15,17 @@ * limitations under the License. */ -package org.apache.inlong.common.pojo.dataproxy; - -import lombok.Data; +package org.apache.inlong.manager.common.consts; /** - * Data proxy node info. + * Constants of protocol type. */ -@Data -public class DataProxyNodeInfo { - - /** - * DataProxy node id - */ - private Integer id; +public class ProtocolType { - /** - * Node IP - */ - private String ip; + public static final String TCP = "TCP"; + public static final String UDP = "UDP"; - /** - * Node port - */ - private Integer port; + public static final String HTTP = "HTTP"; + public static final String HTTPS = "HTTPS"; } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java index 56fdef9e9..a201cbc60 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java @@ -34,6 +34,7 @@ public class InlongClusterNodeEntity implements Serializable { private String type; private String ip; private Integer port; + private String protocolType; private String extParams; private String description; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java index 29ea012d5..dabc837dd 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java @@ -38,7 +38,8 @@ public interface InlongClusterNodeEntityMapper { List<InlongClusterNodeEntity> selectByCondition(ClusterPageRequest request); - List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId); + List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId, + @Param("protocolType") String protocolType); int updateById(InlongClusterNodeEntity record); diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index c184eab02..294e08845 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -26,6 +26,7 @@ <result column="type" jdbcType="VARCHAR" property="type"/> <result column="ip" jdbcType="VARCHAR" property="ip"/> <result column="port" jdbcType="INTEGER" property="port"/> + <result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/> <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/> <result column="description" jdbcType="VARCHAR" property="description"/> <result column="status" jdbcType="INTEGER" property="status"/> @@ -37,30 +38,32 @@ <result column="version" jdbcType="INTEGER" property="version"/> </resultMap> <sql id="Base_Column_List"> - id, parent_id, type, ip, port, ext_params, description, status, is_deleted, + id, parent_id, type, ip, port, protocol_type, ext_params, description, status, is_deleted, creator, modifier, create_time, modify_time, version </sql> <insert id="insert" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity"> insert into inlong_cluster_node (id, parent_id, type, - ip, port, ext_params, - description, status, + ip, port, protocol_type, + ext_params, description, status, creator, modifier) values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR}, - #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, - #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, + #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, + #{extParams,jdbcType=LONGVARCHAR}, #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) </insert> <insert id="insertOnDuplicateKeyUpdate" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity"> insert into inlong_cluster_node (id, parent_id, type, - ip, port, ext_params, - status, creator, modifier) + ip, port, protocol_type, + ext_params, status, creator, + modifier) values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR}, - #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, - #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) + #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, + #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, + #{modifier,jdbcType=VARCHAR}) ON DUPLICATE KEY UPDATE ext_params = VALUES(ext_params), status = VALUES(status), modifier = VALUES(modifier) @@ -83,6 +86,7 @@ and type = #{type, jdbcType=VARCHAR} and ip = #{ip, jdbcType=VARCHAR} and port = #{port, jdbcType=INTEGER} + and protocol_type = #{protocolType, jdbcType=VARCHAR} </select> <select id="selectByCondition" parameterType="org.apache.inlong.manager.pojo.cluster.ClusterPageRequest" @@ -95,12 +99,9 @@ <if test="type != null and type != ''"> and type = #{type, jdbcType=VARCHAR} </if> - <if test="parentId != null"> + <if test="parentId != null and parentId != ''"> and parent_id = #{parentId, jdbcType=INTEGER} </if> - <if test="status != null"> - and status = #{status, jdbcType=INTEGER} - </if> <if test="keyword != null and keyword != ''"> and ( ip like CONCAT('%', #{keyword}, '%') @@ -114,22 +115,28 @@ select <include refid="Base_Column_List"/> from inlong_cluster_node - where is_deleted = 0 - and parent_id = #{parentId, jdbcType=INTEGER} + <where> + is_deleted = 0 + and parent_id = #{parentId, jdbcType=INTEGER} + <if test="protocolType != null and protocolType != ''"> + and protocol_type = #{protocolType, jdbcType=VARCHAR} + </if> + </where> </select> <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity"> update inlong_cluster_node - set parent_id = #{parentId,jdbcType=INTEGER}, - type = #{type,jdbcType=VARCHAR}, - ip = #{ip,jdbcType=VARCHAR}, - port = #{port,jdbcType=INTEGER}, - ext_params = #{extParams,jdbcType=LONGVARCHAR}, - description = #{description,jdbcType=VARCHAR}, - status = #{status,jdbcType=INTEGER}, - is_deleted = #{isDeleted,jdbcType=INTEGER}, - modifier = #{modifier,jdbcType=VARCHAR}, - version = #{version,jdbcType=INTEGER} + 1 + set parent_id = #{parentId,jdbcType=INTEGER}, + type = #{type,jdbcType=VARCHAR}, + ip = #{ip,jdbcType=VARCHAR}, + port = #{port,jdbcType=INTEGER}, + protocol_type = #{protocolType,jdbcType=VARCHAR}, + ext_params = #{extParams,jdbcType=LONGVARCHAR}, + description = #{description,jdbcType=VARCHAR}, + status = #{status,jdbcType=INTEGER}, + is_deleted = #{isDeleted,jdbcType=INTEGER}, + modifier = #{modifier,jdbcType=VARCHAR}, + version = #{version,jdbcType=INTEGER} + 1 where id = #{id,jdbcType=INTEGER} and version = #{version,jdbcType=INTEGER} </update> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java index 31cc74458..384f8056d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java @@ -52,6 +52,10 @@ public class ClusterNodeRequest { @ApiModelProperty(value = "Cluster port") private Integer port; + @NotBlank(message = "protocolType cannot be blank") + @ApiModelProperty(value = "Cluster protocol type") + private String protocolType; + @ApiModelProperty(value = "Extended params") private String extParams; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java index ee45d9642..cdd426178 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java @@ -52,6 +52,9 @@ public class ClusterNodeResponse { @ApiModelProperty(value = "Cluster port") private Integer port; + @ApiModelProperty(value = "Cluster protocol type") + private String protocolType; + @ApiModelProperty(value = "Extended params") private String extParams; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java index 244ee8372..11ca3120f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java @@ -199,12 +199,13 @@ public interface InlongClusterService { Boolean deleteNode(Integer id, String operator); /** - * Query data proxy nodes by the given inlong group id. + * Query data proxy nodes by the given inlong group id and protocol type * * @param inlongGroupId inlong group id + * @param protocolType protocol type * @return data proxy node response */ - DataProxyNodeResponse getDataProxyNodes(String inlongGroupId); + DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType); /** * Get the configuration of DataProxy through the cluster name to which DataProxy belongs. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index e175f8c00..90ba7c1fa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -36,7 +36,6 @@ import org.apache.inlong.manager.common.consts.MQType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; -import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.enums.UserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -440,7 +439,7 @@ public class InlongClusterServiceImpl implements InlongClusterService { Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()), "Current user does not have permission to delete cluster info"); - List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(id); + List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(id, null); if (CollectionUtils.isNotEmpty(nodeEntities)) { String errMsg = String.format("there are undeleted nodes under the cluster [%s], " + "please delete the node first", entity.getName()); @@ -533,7 +532,7 @@ public class InlongClusterServiceImpl implements InlongClusterService { request.getType()); List<InlongClusterNodeEntity> allNodeList = new ArrayList<>(); for (InlongClusterEntity cluster : clusterList) { - List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(cluster.getId()); + List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(cluster.getId(), null); allNodeList.addAll(nodeList); } return CommonBeanUtils.copyListProperties(allNodeList, ClusterNodeResponse::new); @@ -568,8 +567,7 @@ public class InlongClusterServiceImpl implements InlongClusterService { // check cluster node if exist InlongClusterNodeEntity exist = clusterNodeMapper.selectByUniqueKey(request); if (exist != null && !Objects.equals(id, exist.getId())) { - String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s", - request.getType(), request.getIp(), request.getPort()); + String errMsg = "inlong cluster node already exist for " + request; LOGGER.error(errMsg); throw new BusinessException(errMsg); } @@ -579,8 +577,7 @@ public class InlongClusterServiceImpl implements InlongClusterService { LOGGER.error("cluster node not found by id={}", id); throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); } - String errMsg = String.format("cluster node has already updated with parentId=%s, type=%s, ip=%s, port=%s", - request.getParentId(), request.getType(), request.getIp(), request.getPort()); + String errMsg = "cluster node has already updated for " + request; if (!Objects.equals(entity.getVersion(), request.getVersion())) { LOGGER.warn(errMsg); throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); @@ -617,8 +614,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { entity.setIsDeleted(entity.getId()); entity.setModifier(operator); if (InlongConstants.AFFECTED_ONE_ROW != clusterNodeMapper.updateById(entity)) { - LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}", - entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort()); + LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}, protocolType={}", + entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort(), entity.getProtocolType()); throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } LOGGER.info("success to delete inlong cluster node by id={}", id); @@ -626,8 +623,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { } @Override - public DataProxyNodeResponse getDataProxyNodes(String inlongGroupId) { - LOGGER.debug("begin to get data proxy nodes for inlongGroupId={}", inlongGroupId); + public DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType) { + LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", inlongGroupId, protocolType); InlongGroupEntity groupEntity = groupMapper.selectByGroupId(inlongGroupId); if (groupEntity == null) { String msg = "inlong group not exists for inlongGroupId=" + inlongGroupId; @@ -650,19 +647,17 @@ public class InlongClusterServiceImpl implements InlongClusterService { throw new BusinessException(msg); } + // if more than one data proxy cluster, currently takes first // TODO consider the data proxy load and re-balance List<DataProxyNodeInfo> nodeInfos = new ArrayList<>(); for (InlongClusterEntity entity : clusterList) { - ClusterPageRequest request = ClusterPageRequest.builder() - .parentId(entity.getId()) - .status(NodeStatus.NORMAL.getStatus()) - .build(); - List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByCondition(request); + List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(entity.getId(), protocolType); for (InlongClusterNodeEntity nodeEntity : nodeList) { DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo(); nodeInfo.setId(nodeEntity.getId()); nodeInfo.setIp(nodeEntity.getIp()); nodeInfo.setPort(nodeEntity.getPort()); + nodeInfo.setProtocolType(nodeEntity.getProtocolType()); nodeInfos.add(nodeInfo); } } @@ -672,7 +667,8 @@ public class InlongClusterServiceImpl implements InlongClusterService { response.setNodeList(nodeInfos); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("success to get data proxy nodes for inlongGroupId={}, result={}", inlongGroupId, response); + LOGGER.debug("success to get data proxy nodes for groupId={}, protocol={} result={}", + inlongGroupId, protocolType, response); } return response; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java index 6027fa908..6ae1f8060 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java @@ -131,6 +131,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { nodeRequest.setType(heartbeat.getComponentType()); nodeRequest.setIp(heartbeat.getIp()); nodeRequest.setPort(heartbeat.getPort()); + nodeRequest.setProtocolType(heartbeat.getProtocolType()); return clusterNodeMapper.selectByUniqueKey(nodeRequest); } @@ -140,6 +141,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { clusterNode.setType(heartbeat.getComponentType()); clusterNode.setIp(heartbeat.getIp()); clusterNode.setPort(heartbeat.getPort()); + clusterNode.setProtocolType(heartbeat.getProtocolType()); clusterNode.setStatus(ClusterStatus.NORMAL.getStatus()); clusterNode.setCreator(creator); clusterNode.setModifier(creator); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index 23b2326ce..baba6d294 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -22,6 +22,7 @@ import org.apache.inlong.common.heartbeat.HeartbeatMsg; import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo; import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse; import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.consts.ProtocolType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; @@ -36,6 +37,8 @@ import org.apache.inlong.manager.service.ServiceBaseTest; import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.Comparator; @@ -46,6 +49,7 @@ import java.util.List; */ public class InlongClusterServiceTest extends ServiceBaseTest { + private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceTest.class); @Autowired private InlongClusterService clusterService; @Autowired @@ -112,12 +116,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest { /** * Save cluster node info. */ - public Integer saveClusterNode(Integer parentId, String type, String ip, Integer port) { + public Integer saveClusterNode(Integer parentId, String type, String ip, Integer port, String protocolType) { ClusterNodeRequest request = new ClusterNodeRequest(); request.setParentId(parentId); request.setType(type); request.setIp(ip); request.setPort(port); + request.setProtocolType(protocolType); return clusterService.saveNode(request, GLOBAL_OPERATOR); } @@ -149,6 +154,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest { HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(ip); heartbeatMsg.setPort(port); + heartbeatMsg.setProtocolType(ProtocolType.HTTP); heartbeatMsg.setComponentType(type); heartbeatMsg.setReportTime(System.currentTimeMillis()); heartbeatMsg.setClusterName(clusterName); @@ -193,7 +199,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest { Integer parentId = id; String ip = "127.0.0.1"; Integer port = 8080; - Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port); + Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port, ProtocolType.HTTP); Assertions.assertNotNull(nodeId); // list cluster node @@ -217,7 +223,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest { } @Test - public void testGetDataProxyIp() throws InterruptedException { + public void testGetDataProxyIp() { String clusterTag = "default_cluster"; String clusterName = "test_data_proxy"; String extTag = "ext_1"; @@ -229,11 +235,11 @@ public class InlongClusterServiceTest extends ServiceBaseTest { // save cluster node String ip = "127.0.0.1"; Integer port1 = 46800; - Integer nodeId1 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port1); + Integer nodeId1 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port1, ProtocolType.TCP); Assertions.assertNotNull(nodeId1); Integer port2 = 46801; - Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2); + Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP); Assertions.assertNotNull(nodeId2); // report heartbeat @@ -250,15 +256,27 @@ public class InlongClusterServiceTest extends ServiceBaseTest { groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR); // get the data proxy nodes, the first port should is p1, second port is p2 - DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId); - List<DataProxyNodeInfo> ipList = nodeResponse.getNodeList(); - ipList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId)); - Assertions.assertEquals(ipList.size(), 2); - Assertions.assertEquals(port1, ipList.get(0).getPort()); - Assertions.assertEquals(port2, ipList.get(1).getPort()); - - this.deleteClusterNode(nodeId1); - this.deleteClusterNode(nodeId2); + DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP); + List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList(); + nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId)); + Assertions.assertEquals(nodeInfoList.size(), 2); + Assertions.assertEquals(port1, nodeInfoList.get(0).getPort()); + Assertions.assertEquals(port2, nodeInfoList.get(1).getPort()); + + nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP); + nodeInfoList = nodeResponse.getNodeList(); + nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId)); + Assertions.assertEquals(nodeInfoList.size(), 2); + Assertions.assertEquals(port1, nodeInfoList.get(0).getPort()); + Assertions.assertEquals(port2, nodeInfoList.get(1).getPort()); + + // delete all cluster nodes + // TODO should query by cluster parent id + nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, null); + for (DataProxyNodeInfo nodeInfo : nodeResponse.getNodeList()) { + this.deleteClusterNode(nodeInfo.getId()); + } + this.deleteCluster(id); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java index 84fef8e41..f6082528c 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core.heartbeat; import lombok.extern.slf4j.Slf4j; import org.apache.inlong.common.enums.ComponentTypeEnum; import org.apache.inlong.common.heartbeat.HeartbeatMsg; +import org.apache.inlong.manager.common.consts.ProtocolType; import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; @@ -63,6 +64,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest { nodeRequest.setType(msg.getComponentType()); nodeRequest.setIp(msg.getIp()); nodeRequest.setPort(msg.getPort()); + nodeRequest.setProtocolType(ProtocolType.HTTP); InlongClusterNodeEntity clusterNode = clusterNodeMapper.selectByUniqueKey(nodeRequest); Assertions.assertNotNull(clusterNode); Assertions.assertEquals((int) clusterNode.getStatus(), NodeStatus.NORMAL.getStatus()); @@ -83,6 +85,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest { HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp("127.0.0.1"); heartbeatMsg.setPort(8008); + heartbeatMsg.setProtocolType(ProtocolType.HTTP); heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getName()); heartbeatMsg.setReportTime(System.currentTimeMillis()); return heartbeatMsg; diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java index f7791c43c..76b331b8c 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import org.apache.inlong.common.enums.ComponentTypeEnum; import org.apache.inlong.common.heartbeat.GroupHeartbeat; import org.apache.inlong.common.heartbeat.StreamHeartbeat; +import org.apache.inlong.manager.common.consts.ProtocolType; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.pojo.heartbeat.HeartbeatQueryRequest; import org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest; @@ -53,6 +54,7 @@ public class HeartbeatServiceTest extends ServiceBaseTest { request.setComponentType(ComponentTypeEnum.Agent.getName()); request.setIp("127.0.0.1"); request.setReportTime(Instant.now().toEpochMilli()); + request.setProtocolType(ProtocolType.HTTP); List<GroupHeartbeat> groupHeartbeats = new ArrayList<>(); GroupHeartbeat groupHeartbeat = new GroupHeartbeat(); diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 760d93316..eaba39763 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -122,22 +122,23 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster` -- ---------------------------- CREATE TABLE IF NOT EXISTS `inlong_cluster_node` ( - `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', - `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster', - `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc', - `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081', - `port` int(6) NULL COMMENT 'Cluster port', - `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', - `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', - `status` int(4) DEFAULT '0' COMMENT 'Cluster status', - `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', - `creator` varchar(64) NOT NULL COMMENT 'Creator name', - `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', - `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', - `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', - `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster', + `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc', + `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081', + `port` int(6) NULL COMMENT 'Cluster port', + `protocol_type` varchar(20) NOT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', + `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', + `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', + `status` int(4) DEFAULT '0' COMMENT 'Cluster status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', PRIMARY KEY (`id`), - UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`) + UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `protocol_type`, `is_deleted`) ); -- ---------------------------- diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 825e6841e..699c3a1be 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -132,22 +132,23 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster` -- ---------------------------- CREATE TABLE IF NOT EXISTS `inlong_cluster_node` ( - `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', - `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster', - `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc', - `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081', - `port` int(6) NULL COMMENT 'Cluster port', - `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', - `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', - `status` int(4) DEFAULT '0' COMMENT 'Cluster status', - `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', - `creator` varchar(64) NOT NULL COMMENT 'Creator name', - `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', - `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', - `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', - `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster', + `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc', + `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081', + `port` int(6) NULL COMMENT 'Cluster port', + `protocol_type` varchar(20) NOT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', + `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', + `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', + `status` int(4) DEFAULT '0' COMMENT 'Cluster status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', PRIMARY KEY (`id`), - UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`) + UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `protocol_type`, `is_deleted`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table'; diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java index 8569b3e5d..25ed2969d 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java @@ -50,10 +50,12 @@ public class DataProxyController { @Autowired private DataProxyConfigRepository dataProxyConfigRepository; + // TODO protocol type must be provided by the DataProxy @PostMapping(value = "/dataproxy/getIpList/{inlongGroupId}") @ApiOperation(value = "Get data proxy IP list by InlongGroupId") - public Response<DataProxyNodeResponse> getIpList(@PathVariable String inlongGroupId) { - return Response.success(clusterService.getDataProxyNodes(inlongGroupId)); + public Response<DataProxyNodeResponse> getIpList(@PathVariable String inlongGroupId, + @RequestParam(required = false) String protocolType) { + return Response.success(clusterService.getDataProxyNodes(inlongGroupId, protocolType)); } @PostMapping("/dataproxy/getConfig")