This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9984792341350337ddd5490046c99a23dd083d95 Author: zy-kkk <zhongy...@gmail.com> AuthorDate: Wed Mar 13 12:43:00 2024 +0800 (jdbc catalog) Add a property to test the connection when creating a Jdbc catalog (#32125) --- be/src/service/internal_service.cpp | 60 +++++++++++++ be/src/service/internal_service.h | 5 ++ be/src/vec/exec/vjdbc_connector.cpp | 27 +++++- be/src/vec/exec/vjdbc_connector.h | 5 ++ docs/en/docs/lakehouse/multi-catalog/jdbc.md | 2 + docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md | 3 +- .../org/apache/doris/jdbc/BaseJdbcExecutor.java | 20 +++++ .../org/apache/doris/jdbc/DefaultJdbcExecutor.java | 20 +++++ .../org/apache/doris/catalog/JdbcResource.java | 9 +- .../doris/datasource/jdbc/JdbcExternalCatalog.java | 99 +++++++++++++++++++++ .../doris/datasource/jdbc/client/JdbcClient.java | 18 ++++ .../datasource/jdbc/client/JdbcOracleClient.java | 5 ++ .../datasource/jdbc/client/JdbcSapHanaClient.java | 5 ++ .../org/apache/doris/rpc/BackendServiceClient.java | 5 ++ .../org/apache/doris/rpc/BackendServiceProxy.java | 12 +++ gensrc/proto/internal_service.proto | 11 +++ .../jdbc/test_clickhouse_jdbc_catalog.out | Bin 4991 -> 5426 bytes .../jdbc/test_clickhouse_jdbc_catalog.groovy | 30 ++++++- 18 files changed, 331 insertions(+), 5 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7a37d613263..65cc431b4d5 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -39,6 +39,7 @@ #include <stddef.h> #include <stdint.h> #include <sys/stat.h> +#include <vec/exec/vjdbc_connector.h> #include <algorithm> #include <exception> @@ -786,6 +787,65 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co } } +void PInternalService::test_jdbc_connection(google::protobuf::RpcController* controller, + const PJdbcTestConnectionRequest* request, + PJdbcTestConnectionResult* result, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, result, done]() { + VLOG_RPC << "test jdbc connection"; + brpc::ClosureGuard closure_guard(done); + TTableDescriptor table_desc; + vectorized::JdbcConnectorParam jdbc_param; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)request->jdbc_table().data(); + uint32_t len = request->jdbc_table().size(); + st = deserialize_thrift_msg(buf, &len, false, &table_desc); + if (!st.ok()) { + LOG(WARNING) << "test jdbc connection failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + TJdbcTable jdbc_table = (table_desc.jdbcTable); + jdbc_param.catalog_id = jdbc_table.catalog_id; + jdbc_param.driver_class = jdbc_table.jdbc_driver_class; + jdbc_param.driver_path = jdbc_table.jdbc_driver_url; + jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum; + jdbc_param.jdbc_url = jdbc_table.jdbc_url; + jdbc_param.user = jdbc_table.jdbc_user; + jdbc_param.passwd = jdbc_table.jdbc_password; + jdbc_param.query_string = request->query_str(); + jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type()); + jdbc_param.use_transaction = false; + jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size; + jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size; + jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time; + jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time; + jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive; + + std::unique_ptr<vectorized::JdbcConnector> jdbc_connector; + jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param)); + + st = jdbc_connector->test_connection(); + st.to_protobuf(result->mutable_status()); + + Status clean_st = jdbc_connector->clean_datasource(); + if (!clean_st.ok()) { + LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg(); + } + Status close_st = jdbc_connector->close(); + if (!close_st.ok()) { + LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg(); + } + }); + + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, const PFetchColIdsRequest* request, PFetchColIdsResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 261a3d161dc..4fffdc33875 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -214,6 +214,11 @@ public: PGetWalQueueSizeResponse* response, google::protobuf::Closure* done) override; + void test_jdbc_connection(google::protobuf::RpcController* controller, + const PJdbcTestConnectionRequest* request, + PJdbcTestConnectionResult* result, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 318651ea885..6a5aa0fdc38 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -164,7 +164,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_jdbc_password(_conn_param.passwd); ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); ctor_params.__set_driver_path(local_location); - ctor_params.__set_batch_size(read ? state->batch_size() : 0); + if (state == nullptr) { + ctor_params.__set_batch_size(read ? 1 : 0); + } else { + ctor_params.__set_batch_size(read ? state->batch_size() : 0); + } ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); ctor_params.__set_table_type(_conn_param.table_type); ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size); @@ -195,6 +199,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { return Status::OK(); } +Status JdbcConnector::test_connection() { + RETURN_IF_ERROR(open(nullptr, true)); + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_test_connection_id); + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::clean_datasource() { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_clean_datasource_id); + return JniUtil::GetJniExceptionMsg(env); +} + Status JdbcConnector::query() { if (!_is_open) { return Status::InternalError("Query before open of JdbcConnector."); @@ -380,6 +401,10 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "testConnection", "()V", _executor_test_connection_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "cleanDataSource", "()V", _executor_clean_datasource_id)); return Status::OK(); } diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index ed2afdecdfd..e42097b3abd 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -113,6 +113,9 @@ public: Status close(Status s = Status::OK()) override; + Status test_connection(); + Status clean_datasource(); + protected: JdbcConnectorParam _conn_param; @@ -155,6 +158,8 @@ private: jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; + jmethodID _executor_test_connection_id; + jmethodID _executor_clean_datasource_id; std::map<int, int> _map_column_idx_to_cast_idx_hll; std::vector<DataTypePtr> _input_hll_string_types; diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md b/docs/en/docs/lakehouse/multi-catalog/jdbc.md index 6b3682c61fc..c53dbec7ace 100644 --- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md @@ -56,6 +56,8 @@ PROPERTIES ("key" = "value", ...) | `meta_names_mapping` | No | | When the JDBC external data source has the same name but different case, e.g. DORIS and doris, Doris reports an error when querying the catalog due to ambiguity. In this case, the `meta_names_mapping` parameter needs to be specified to resolve the conflict. | | `include_database_list` | No | | When `only_specified_database = true`,only synchronize the specified databases. Separate with `,`. Database name is case sensitive. | | `exclude_database_list` | No | | When `only_specified_database = true`,do not synchronize the specified databases. Separate with `,`. Database name is case sensitive. | +| `test_connection` | No | "true" | Whether to test the connection when creating the catalog. If set to `true`, the connection will be tested when creating the catalog and will refuse to create the catalog if the connection fails. If set to `false`, the connection will not be tested. | + ### Driver path diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index dde24e1eb53..88cafda0d42 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -44,7 +44,7 @@ PROPERTIES ("key"="value", ...) ## 参数说明 -| 参数 | 必须 | 默认值 | 说明 | +| 参数 | 必须 | 默认值 | 说明 | |---------------------------|-----|---------|-----------------------------------------------------------------------| | `user` | 是 | | 对应数据库的用户名 | | `password` | 是 | | 对应数据库的密码 | @@ -56,6 +56,7 @@ PROPERTIES ("key"="value", ...) | `only_specified_database` | 否 | "false" | 指定是否只同步指定的 database | | `include_database_list` | 否 | "" | 当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。 | | `exclude_database_list` | 否 | "" | 当only_specified_database=true时,指定不需要同步的多个database,以','分割。db名称是大小写敏感的。 | +| `test_connection` | 否 | "true" | 是否在创建 Catalog 时测试连接。如果设置为 `true`,则会在创建 Catalog 时测试连接,如果连接失败,则会拒绝创建 Catalog。如果设置为 `false`,则不会测试连接。 | ### 驱动包路径 diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java index b24002b18a4..a1330b8df17 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java @@ -150,6 +150,26 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { return false; } + public void cleanDataSource() { + if (druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; + } + } + + public void testConnection() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + if (!resultSet.next()) { + throw new UdfRuntimeException( + "Failed to test connection in BE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new UdfRuntimeException("Failed to test connection in BE: ", e); + } + } + public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java index 94d4304db38..aaa13a0f2d1 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java @@ -184,6 +184,26 @@ public class DefaultJdbcExecutor { return false; } + public void cleanDataSource() { + if (druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; + } + } + + public void testConnection() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + if (!resultSet.next()) { + throw new UdfRuntimeException( + "Failed to test connection in BE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new UdfRuntimeException("Failed to test connection in BE: ", e); + } + } + public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index ab6dd7d41f3..65bf7d308ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -105,6 +105,8 @@ public class JdbcResource extends Resource { public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive"; public static final String CHECK_SUM = "checksum"; public static final String CREATE_TIME = "create_time"; + public static final String TEST_CONNECTION = "test_connection"; + private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add( JDBC_URL, USER, @@ -122,7 +124,8 @@ public class JdbcResource extends Resource { CONNECTION_POOL_MAX_SIZE, CONNECTION_POOL_MAX_LIFE_TIME, CONNECTION_POOL_MAX_WAIT_TIME, - CONNECTION_POOL_KEEP_ALIVE + CONNECTION_POOL_KEEP_ALIVE, + TEST_CONNECTION ).build(); private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add( ONLY_SPECIFIED_DATABASE, @@ -134,7 +137,8 @@ public class JdbcResource extends Resource { CONNECTION_POOL_MAX_SIZE, CONNECTION_POOL_MAX_LIFE_TIME, CONNECTION_POOL_MAX_WAIT_TIME, - CONNECTION_POOL_KEEP_ALIVE + CONNECTION_POOL_KEEP_ALIVE, + TEST_CONNECTION ).build(); // The default value of optional properties @@ -152,6 +156,7 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true"); } // timeout for both connection and read. 10 seconds is long enough. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index 93ebb1b2134..0edabdaca08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -17,7 +17,10 @@ package org.apache.doris.datasource.jdbc; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; @@ -25,18 +28,32 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; +import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; +import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; @Getter public class JdbcExternalCatalog extends ExternalCatalog { @@ -57,6 +74,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { throws DdlException { super(catalogId, name, InitCatalogLog.Type.JDBC, comment); this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props, isReplay)); + testJdbcConnection(isReplay); } @Override @@ -74,6 +92,9 @@ public class JdbcExternalCatalog extends ExternalCatalog { JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase()); JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_META_NAMES, getLowerCaseMetaNames()); + JdbcResource.checkBooleanProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE, + String.valueOf(isConnectionPoolKeepAlive())); + JdbcResource.checkBooleanProperty(JdbcResource.TEST_CONNECTION, String.valueOf(isTestConnection())); JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(), getExcludeDatabaseMap()); JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(), @@ -190,6 +211,10 @@ public class JdbcExternalCatalog extends ExternalCatalog { .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE))); } + public boolean isTestConnection() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.TEST_CONNECTION, "false")); + } + @Override protected void initLocalObjectsImpl() { JdbcClientConfig jdbcClientConfig = new JdbcClientConfig() @@ -268,4 +293,78 @@ public class JdbcExternalCatalog extends ExternalCatalog { makeSureInitialized(); jdbcClient.executeStmt(stmt); } + + private void testJdbcConnection(boolean isReplay) throws DdlException { + if (!isReplay) { + if (isTestConnection()) { + try { + initLocalObjectsImpl(); + testFeToJdbcConnection(); + testBeToJdbcConnection(); + } finally { + jdbcClient.closeClient(); + jdbcClient = null; + } + } + } + } + + private void testFeToJdbcConnection() throws DdlException { + try { + jdbcClient.testConnection(); + } catch (JdbcClientException e) { + String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage(); + LOG.error(errorMessage, e); + throw new DdlException(errorMessage, e); + } + } + + private void testBeToJdbcConnection() throws DdlException { + Backend aliveBe = null; + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + aliveBe = be; + } + } + if (aliveBe == null) { + throw new DdlException("Test BE Connection to JDBC Failed: No Alive backends"); + } + TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort()); + try { + JdbcTable jdbcTable = getTestConnectionJdbcTable(); + PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder() + .setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(jdbcTable.toThrift()))) + .setJdbcTableType(jdbcTable.getJdbcTableType().getValue()) + .setQueryStr(jdbcClient.getTestQuery()).build(); + InternalService.PJdbcTestConnectionResult result = null; + Future<PJdbcTestConnectionResult> future = BackendServiceProxy.getInstance() + .testJdbcConnection(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0)); + } + } catch (TException | RpcException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private JdbcTable getTestConnectionJdbcTable() throws DdlException { + JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(), + TableType.JDBC_EXTERNAL_TABLE); + jdbcTable.setCatalogId(this.getId()); + jdbcTable.setJdbcTypeName(this.getDatabaseTypeName()); + jdbcTable.setJdbcUrl(this.getJdbcUrl()); + jdbcTable.setJdbcUser(this.getJdbcUser()); + jdbcTable.setJdbcPasswd(this.getJdbcPasswd()); + jdbcTable.setDriverClass(this.getDriverClass()); + jdbcTable.setDriverUrl(this.getDriverUrl()); + jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl())); + jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize()); + jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize()); + jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime()); + jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime()); + jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive()); + return jdbcTable; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 98ed34204ef..a211c0d0e37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -453,4 +453,22 @@ public abstract class JdbcClient { } return ScalarType.createStringType(); } + + public void testConnection() { + String testQuery = getTestQuery(); + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(testQuery)) { + if (!rs.next()) { + throw new JdbcClientException( + "Failed to test connection in FE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to test connection in FE: " + e.getMessage(), e); + } + } + + public String getTestQuery() { + return "select 1"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index 4d536a4ef3f..4afec31c9d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -45,6 +45,11 @@ public class JdbcOracleClient extends JdbcClient { return conn.getCatalog(); } + @Override + public String getTestQuery() { + return "SELECT 1 FROM dual"; + } + @Override public List<String> getDatabaseNameList() { Connection conn = getConnection(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java index 4e29bba7d74..1d5f590ed47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java @@ -36,6 +36,11 @@ public class JdbcSapHanaClient extends JdbcClient { return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"}; } + @Override + public String getTestQuery() { + return "SELECT 1 FROM DUMMY"; + } + @Override protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { String hanaType = fieldSchema.getDataTypeName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index f8363755731..50c24a42330 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -110,6 +110,11 @@ public class BackendServiceClient { return stub.fetchTableSchema(request); } + public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection( + InternalService.PJdbcTestConnectionRequest request) { + return stub.testJdbcConnection(request); + } + public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) { return stub.updateCache(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index df9a90433db..d78e055a1ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -320,6 +320,18 @@ public class BackendServiceProxy { } } + public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection( + TNetworkAddress address, InternalService.PJdbcTestConnectionRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.testJdbcConnection(request); + } catch (Throwable e) { + LOG.warn("test jdbc connection catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future<InternalService.PReportStreamLoadStatusResponse> reportStreamLoadStatus( TNetworkAddress address, InternalService.PReportStreamLoadStatusRequest request) throws RpcException { try { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8b9cce2acb9..8b4795c4b7e 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -642,6 +642,16 @@ message PFetchTableSchemaResult { repeated PTypeDesc column_types = 4; } +message PJdbcTestConnectionRequest { + optional bytes jdbc_table = 1; + optional int32 jdbc_table_type = 2; + optional string query_str = 3; +} + +message PJdbcTestConnectionResult { + optional PStatus status = 1; +} + message PRowLocation { optional int64 tablet_id = 1; optional string rowset_id = 2; @@ -879,5 +889,6 @@ service PBackendService { rpc get_wal_queue_size(PGetWalQueueSizeRequest) returns(PGetWalQueueSizeResponse); rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); + rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); }; diff --git a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out index 4eba3e43641..73dc89da69f 100644 Binary files a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out and b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out differ diff --git a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy index 4b8aee3abe7..bc67d89ff4f 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy @@ -101,6 +101,32 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex order_qt_dt_with_tz """ select * from dt_with_tz order by id; """ + sql """create catalog if not exists clickhouse_catalog_test_conn_correct properties( + "type"="jdbc", + "user"="default", + "password"="123456", + "jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.clickhouse.jdbc.ClickHouseDriver", + "test_connection" = "true" + ); + """ + order_qt_test_conn_correct """ select * from clickhouse_catalog_test_conn_correct.doris_test.type; """ + + test { + sql """create catalog if not exists clickhouse_catalog_test_conn_mistake properties( + "type"="jdbc", + "user"="default", + "password"="1234567", + "jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.clickhouse.jdbc.ClickHouseDriver", + "test_connection" = "true" + ); + """ + exception "Test FE Connection to JDBC Failed: Can not connect to jdbc due to error: Code: 516. DB::Exception: default: Authentication failed: password is incorrect, or there is no user with such name." + } + }finally { res_dbs_log = sql "show databases;" for(int i = 0;i < res_dbs_log.size();i++) { @@ -108,7 +134,9 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex log.info( "database = ${res_dbs_log[i][0]} => tables = "+tbs.toString()) } } - + sql """ drop catalog if exists ${catalog_name} """ + sql """ drop catalog if exists clickhouse_catalog_test_conn_correct """ + sql """ drop catalog if exists clickhouse_catalog_test_conn_mistake """ } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org