This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit c7af0fcc0790d10ba0d4180c342aae8715a13f32 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Jan 4 18:48:49 2023 +0800 [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142) --- .../service/cluster/KafkaClusterOperator.java | 25 +++++++++++++++++++ .../service/cluster/TubeClusterOperator.java | 29 ++++++++++++++++++++-- .../node/ck/ClickHouseDataNodeOperator.java | 22 ++++++++++++++++ .../node/iceberg/IcebergDataNodeOperator.java | 23 +++++++++++++++++ .../service/node/mysql/MySQLDataNodeOperator.java | 23 +++++++++++++++++ .../node/starrocks/StarRocksDataNodeOperator.java | 24 ++++++++++++++++++ 6 files changed, 144 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java index 8c4eb37b7..dceac1864 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java @@ -23,17 +23,24 @@ import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO; import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo; import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Properties; + /** * Kafka cluster operator. */ @@ -86,4 +93,22 @@ public class KafkaClusterOperator extends AbstractClusterOperator { } } + @Override + public Boolean testConnection(ClusterRequest request) { + String bootstrapServers = request.getUrl(); + Preconditions.checkNotNull(bootstrapServers, "connection url cannot be empty"); + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + try (Admin ignored = Admin.create(props)) { + ListTopicsResult topics = ignored.listTopics(new ListTopicsOptions().timeoutMs(30000)); + topics.names().get(); + LOGGER.info("kafka connection not null - connection success for bootstrapServers={}", topics); + return true; + } catch (Exception e) { + String errMsg = String.format("kafka connection failed for bootstrapServers=%s", bootstrapServers); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java index cb24bd741..b3cf872fa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java @@ -19,22 +19,27 @@ package org.apache.inlong.manager.service.cluster; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.HttpUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterDTO; import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo; import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterRequest; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.service.group.InlongGroupOperator4NoneMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.concurrent.TimeUnit; + /** * TubeMQ cluster operator. */ @@ -86,4 +91,24 @@ public class TubeClusterOperator extends AbstractClusterOperator { return tubeClusterInfo; } + @Override + public Boolean testConnection(ClusterRequest request) { + String masterUrl = request.getUrl(); + int hostBeginIndex = masterUrl.lastIndexOf(InlongConstants.SLASH); + int portBeginIndex = masterUrl.lastIndexOf(InlongConstants.COLON); + String host = masterUrl.substring(hostBeginIndex + 1, portBeginIndex); + int port = Integer.parseInt(masterUrl.substring(portBeginIndex + 1)); + Preconditions.checkNotNull(masterUrl, "connection url cannot be empty"); + boolean result; + try { + result = HttpUtils.checkConnectivity(host, port, 10, TimeUnit.SECONDS); + LOGGER.info("tube connection not null - connection success for masterUrl={}", masterUrl); + return result; + } catch (Exception e) { + String errMsg = String.format("tube connection failed for masterUrl=%s", masterUrl); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java index 08e15de38..7f386416b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; @@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO; import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo; import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseJdbcUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.sql.Connection; + @Service public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator { @@ -82,4 +86,22 @@ public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator { } } + @Override + public Boolean testConnection(DataNodeRequest request) { + String url = request.getUrl(); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.checkNotNull(url, "connection url cannot be empty"); + try (Connection ignored = ClickHouseJdbcUtils.getConnection(url, username, password)) { + LOGGER.info("clickhouse connection not null - connection success for url={}, username={}, password={}", url, + username, password); + return true; + } catch (Exception e) { + String errMsg = String.format("clickhouse connection failed for url=%s, username=%s, password=%s", url, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java index 991764610..74506feaa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java @@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.node.iceberg; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.hive.HiveCatalog; import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; @@ -30,6 +32,7 @@ import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeDTO; import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo; import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.iceberg.IcebergCatalogUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -83,4 +86,24 @@ public class IcebergDataNodeOperator extends AbstractDataNodeOperator { } } + @Override + public Boolean testConnection(DataNodeRequest request) { + IcebergDataNodeRequest icebergDataNodeRequest = (IcebergDataNodeRequest) request; + String metastoreUri = icebergDataNodeRequest.getUrl(); + String warehouse = icebergDataNodeRequest.getWarehouse(); + Preconditions.checkNotNull(metastoreUri, "connection url cannot be empty"); + try { + HiveCatalog catalog = IcebergCatalogUtils.getCatalog(metastoreUri, warehouse); + catalog.listNamespaces(); + LOGGER.info("iceberg connection not null - connection success for metastoreUri={}, warehouse={}", + metastoreUri, warehouse); + return true; + } catch (Exception e) { + String errMsg = String.format("iceberg connection failed for metastoreUri=%s, warhouse=%s", metastoreUri, + warehouse); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java index aacd09dd9..9848b23c3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; @@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO; import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo; import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.sql.Connection; + /** * MySQL data node operator */ @@ -86,4 +90,23 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator { throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage()); } } + + @Override + public Boolean testConnection(DataNodeRequest request) { + String jdbcUrl = request.getUrl(); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty"); + try (Connection ignored = MySQLJdbcUtils.getConnection(jdbcUrl, username, password)) { + LOGGER.info("mysql connection not null - connection success for jdbcUrl={}, username={}, password={}", + jdbcUrl, username, password); + return true; + } catch (Exception e) { + String errMsg = String.format("mysql connection failed for jdbcUrl=%s, username=%s, password=%s", jdbcUrl, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java index fb82d894c..e4249ac08 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; @@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO; import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo; import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.starrocks.StarRocksJdbcUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.sql.Connection; + @Service public class StarRocksDataNodeOperator extends AbstractDataNodeOperator { @@ -83,4 +87,24 @@ public class StarRocksDataNodeOperator extends AbstractDataNodeOperator { throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage()); } } + + @Override + public Boolean testConnection(DataNodeRequest request) { + String jdbcUrl = request.getUrl(); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty"); + try (Connection ignored = StarRocksJdbcUtils.getConnection(jdbcUrl, username, password)) { + LOGGER.info("starRocks connection not null - connection success for jdbcUrl={}, username={}, password={}", + jdbcUrl, username, password); + return true; + } catch (Exception e) { + String errMsg = String.format("starRocks connection failed for jdbcUrl=%s, username=%s, password=%s", + jdbcUrl, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + }