This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 546006564 [INLONG-7135][Manager] Support the connection test of data nodes and clusters (#7136) 546006564 is described below commit 5460065649420aebfb8086dc30a1f715541cb632 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Jan 3 20:20:36 2023 +0800 [INLONG-7135][Manager] Support the connection test of data nodes and clusters (#7136) --- .../service/cluster/AbstractClusterOperator.java | 8 ++++- .../service/cluster/InlongClusterOperator.java | 10 ++++++- .../service/cluster/InlongClusterService.java | 8 +++++ .../service/cluster/InlongClusterServiceImpl.java | 14 ++++++++- .../service/cluster/PulsarClusterOperator.java | 22 ++++++++++++-- .../service/node/AbstractDataNodeOperator.java | 6 ++++ .../manager/service/node/DataNodeOperator.java | 8 +++++ .../manager/service/node/DataNodeServiceImpl.java | 28 ++---------------- .../node/es/ElasticsearchDataNodeOperator.java | 34 ++++++++++++++++++++++ .../service/node/hive/HiveDataNodeOperator.java | 22 ++++++++++++++ .../service/resource/sink/es/ElasticsearchApi.java | 2 +- .../manager/web/controller/DataNodeController.java | 2 +- .../web/controller/InlongClusterController.java | 6 ++++ 13 files changed, 138 insertions(+), 32 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java index ace15bf67..d788c505e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java @@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.cluster; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -77,4 +77,10 @@ public abstract class AbstractClusterOperator implements InlongClusterOperator { } } + @Override + public Boolean testConnection(ClusterRequest request) { + throw new BusinessException( + String.format(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED.getMessage(), request.getType())); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java index dadd500ea..912279ea1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java @@ -17,9 +17,9 @@ package org.apache.inlong.manager.service.cluster; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; /** * Interface of the inlong cluster operator. @@ -63,4 +63,12 @@ public interface InlongClusterOperator { */ void updateOpt(ClusterRequest request, String operator); + /** + * Test connection + * + * @param request request of the cluster + * @return Whether the connection is successful + */ + Boolean testConnection(ClusterRequest request); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java index 289a77c56..8ecccbc8b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java @@ -429,4 +429,12 @@ public interface InlongClusterService { */ String getAllConfig(String clusterName, String md5); + /** + * Test whether the connection can be successfully established. + * + * @param request connection request + * @return true or false + */ + Boolean testConnection(ClusterRequest request); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index a32e1d6f5..a76ec6dcd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -25,6 +25,7 @@ import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig; import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse; @@ -33,7 +34,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse; import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo; import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; @@ -1649,6 +1649,18 @@ public class InlongClusterServiceImpl implements InlongClusterService { return configJson; } + @Override + public Boolean testConnection(ClusterRequest request) { + LOGGER.info("begin test connection for: {}", request); + String type = request.getType(); + + // according to the data node type, test connection + InlongClusterOperator clusterOperator = clusterOperatorFactory.getInstance(request.getType()); + Boolean result = clusterOperator.testConnection(request); + LOGGER.info("connection [{}] for: {}", result ? "success" : "failed", request); + return result; + } + /** * Remove cluster tag from the given cluster entity. */ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java index 3f7770d40..a4c98bb11 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java @@ -22,13 +22,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; +import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -86,4 +88,20 @@ public class PulsarClusterOperator extends AbstractClusterOperator { } } + @Override + public Boolean testConnection(ClusterRequest request) { + PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request; + PulsarClusterInfo pulsarInfo = new PulsarClusterInfo(); + CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo); + try (PulsarAdmin ignored = PulsarUtils.getPulsarAdmin(pulsarInfo)) { + LOGGER.info("pulsar connection not null - connection success for adminUrl={}", pulsarInfo.getAdminUrl()); + return true; + } catch (Exception e) { + String errMsg = String.format("pulsar connection failed for adminUrl=%s, password=%s", + pulsarInfo.getAdminUrl(), pulsarInfo.getToken()); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java index 51eeb91f6..91fae43a8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java @@ -85,4 +85,10 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator { public Map<String, String> parse2SinkParams(DataNodeInfo info) { return JsonUtils.parseObject(info.getExtParams(), HashMap.class); } + + @Override + public Boolean testConnection(DataNodeRequest request) { + throw new BusinessException( + String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), request.getType())); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java index aeae290b7..14c8d35a0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java @@ -71,4 +71,12 @@ public interface DataNodeOperator { * @return Sink params */ Map<String, String> parse2SinkParams(DataNodeInfo info); + + /** + * Test connection + * @param request request of the data node + * @return Whether the connection is successful + */ + Boolean testConnection(DataNodeRequest request); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java index 13b613c77..60935ed54 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java @@ -19,13 +19,10 @@ package org.apache.inlong.manager.service.node; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; - -import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.UserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; import org.apache.inlong.manager.pojo.common.PageResult; @@ -34,14 +31,12 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodePageRequest; import org.apache.inlong.manager.pojo.node.DataNodeRequest; import org.apache.inlong.manager.pojo.user.UserInfo; -import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.sql.Connection; import java.util.List; import java.util.stream.Collectors; @@ -341,28 +336,11 @@ public class DataNodeServiceImpl implements DataNodeService { LOGGER.info("begin test connection for: {}", request); String type = request.getType(); - Boolean result = false; - if (DataNodeType.HIVE.equals(type)) { - result = testHiveConnection(request); - } - + // according to the data node type, test connection + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType()); + Boolean result = dataNodeOperator.testConnection(request); LOGGER.info("connection [{}] for: {}", result ? "success" : "failed", request); return result; } - /** - * Test connection for Hive - */ - private Boolean testHiveConnection(DataNodeRequest request) { - String url = request.getUrl(); - Preconditions.checkNotNull(url, "connection url cannot be empty"); - try (Connection ignored = HiveJdbcUtils.getConnection(url, request.getUsername(), request.getToken())) { - LOGGER.info("hive connection not null - connection success"); - return true; - } catch (Exception e) { - LOGGER.error("hive connection failed: {}", e.getMessage()); - return false; - } - } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java index 699b83a01..e07338f09 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; @@ -30,6 +31,9 @@ import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeDTO; import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeInfo; import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi; +import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig; +import org.elasticsearch.client.RequestOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -82,4 +86,34 @@ public class ElasticsearchDataNodeOperator extends AbstractDataNodeOperator { LOGGER.debug("success to get elasticsearch data node from entity"); return info; } + + @Override + public Boolean testConnection(DataNodeRequest request) { + String url = request.getUrl(); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.checkNotNull(url, "connection url cannot be empty"); + ElasticsearchApi client = new ElasticsearchApi(); + ElasticsearchConfig config = new ElasticsearchConfig(); + if (StringUtils.isNotEmpty(request.getUsername())) { + config.setAuthEnable(true); + config.setUsername(username); + config.setPassword(password); + } + config.setHosts(url); + client.setEsConfig(config); + boolean result; + try { + result = client.getEsClient().ping(RequestOptions.DEFAULT); + LOGGER.info("elasticsearch connection is {} for url={}, username={}, password={}", result, url, username, + password); + return result; + } catch (Exception e) { + String errMsg = String.format("elasticsearch connection failed for url=%s, username=%s, password=%s", url, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java index 7cb25bd5f..6adbe0b1a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; @@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO; import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo; import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.sql.Connection; + @Service public class HiveDataNodeOperator extends AbstractDataNodeOperator { @@ -84,4 +88,22 @@ public class HiveDataNodeOperator extends AbstractDataNodeOperator { } } + @Override + public Boolean testConnection(DataNodeRequest request) { + String url = request.getUrl(); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.checkNotNull(url, "connection url cannot be empty"); + try (Connection ignored = HiveJdbcUtils.getConnection(url, username, password)) { + LOGGER.info("hive connection not null - connection success for url={}, username={}, password={}", url, + username, password); + return true; + } catch (Exception e) { + String errMsg = String.format("hive connection failed for url=%s, username=%s, password=%s", url, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java index 41309e6de..43b187ec2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java @@ -227,7 +227,7 @@ public class ElasticsearchApi { * * @return RestHighLevelClient */ - private RestHighLevelClient getEsClient() { + public RestHighLevelClient getEsClient() { return esConfig.highLevelClient(); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java index 641abc9b9..33ddba8c8 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java @@ -118,7 +118,7 @@ public class DataNodeController { @PostMapping("/node/testConnection") @ApiOperation(value = "Test connection for data node") - public Response<Boolean> testConnection(@Validated @RequestBody DataNodeRequest request) { + public Response<Boolean> testConnection(@RequestBody DataNodeRequest request) { return Response.success(dataNodeService.testConnection(request)); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java index af822e703..3ea4634d3 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java @@ -235,4 +235,10 @@ public class InlongClusterController { String username = LoginUserUtils.getLoginUser().getName(); return Response.success(clusterService.bindNodeTag(request, username)); } + + @PostMapping("/cluster/testConnection") + @ApiOperation(value = "Test connection for inlong cluster") + public Response<Boolean> testConnection(@RequestBody ClusterRequest request) { + return Response.success(clusterService.testConnection(request)); + } }