This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 7db9cb06ad3 branch-4.1: [Feature](iceberg) Support Iceberg JDBC
Catalog #59502 (#61360)
7db9cb06ad3 is described below
commit 7db9cb06ad3461d6837df816f458bd0dc458b77e
Author: Chenjunwei <[email protected]>
AuthorDate: Tue Mar 17 00:34:21 2026 +0800
branch-4.1: [Feature](iceberg) Support Iceberg JDBC Catalog #59502 (#61360)
Cherry-pick #59502 to branch-4.1
---
.../datasource/iceberg/IcebergExternalCatalog.java | 1 +
.../iceberg/IcebergExternalCatalogFactory.java | 2 +
.../iceberg/IcebergJdbcExternalCatalog.java | 31 ++
.../datasource/iceberg/source/IcebergScanNode.java | 1 +
.../metastore/IcebergJdbcMetaStoreProperties.java | 310 ++++++++++++++++++++
.../metastore/IcebergPropertiesFactory.java | 1 +
.../org/apache/doris/persist/gson/GsonUtils.java | 2 +
.../IcebergJdbcMetaStorePropertiesTest.java | 83 ++++++
.../iceberg/test_iceberg_jdbc_catalog.out | 42 +++
.../iceberg/test_iceberg_jdbc_catalog.groovy | 316 +++++++++++++++++++++
10 files changed, 789 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index ee8ad8b4fc0..66aabd58a33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -50,6 +50,7 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_HADOOP = "hadoop";
public static final String ICEBERG_GLUE = "glue";
public static final String ICEBERG_DLF = "dlf";
+ public static final String ICEBERG_JDBC = "jdbc";
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
public static final String ICEBERG_TABLE_CACHE_ENABLE =
"meta.cache.iceberg.table.enable";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
index 748c0805393..824d20e7000 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -39,6 +39,8 @@ public class IcebergExternalCatalogFactory {
return new IcebergGlueExternalCatalog(catalogId, name,
resource, props, comment);
case IcebergExternalCatalog.ICEBERG_DLF:
return new IcebergDLFExternalCatalog(catalogId, name,
resource, props, comment);
+ case IcebergExternalCatalog.ICEBERG_JDBC:
+ return new IcebergJdbcExternalCatalog(catalogId, name,
resource, props, comment);
case IcebergExternalCatalog.ICEBERG_HADOOP:
return new IcebergHadoopExternalCatalog(catalogId, name,
resource, props, comment);
case IcebergExternalCatalog.ICEBERG_S3_TABLES:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java
new file mode 100644
index 00000000000..aeb2fd9deec
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.CatalogProperty;
+
+import java.util.Map;
+
+public class IcebergJdbcExternalCatalog extends IcebergExternalCatalog {
+
+ public IcebergJdbcExternalCatalog(long catalogId, String name, String
resource, Map<String, String> props,
+ String comment) {
+ super(catalogId, name, comment);
+ catalogProperty = new CatalogProperty(resource, props);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index bcb89d3f221..624de1fde9d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -179,6 +179,7 @@ public class IcebergScanNode extends FileQueryScanNode {
case IcebergExternalCatalog.ICEBERG_DLF:
case IcebergExternalCatalog.ICEBERG_GLUE:
case IcebergExternalCatalog.ICEBERG_HADOOP:
+ case IcebergExternalCatalog.ICEBERG_JDBC:
case IcebergExternalCatalog.ICEBERG_S3_TABLES:
source = new IcebergApiSource((IcebergExternalTable)
table, desc, columnNameToRange);
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
new file mode 100644
index 00000000000..5c81532edd4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.catalog.JdbcResource;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties {
+ private static final Logger LOG =
LogManager.getLogger(IcebergJdbcMetaStoreProperties.class);
+
+ private static final String JDBC_PREFIX = "jdbc.";
+ private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new
ConcurrentHashMap<>();
+
+ private Map<String, String> icebergJdbcCatalogProperties;
+
+ @ConnectorProperty(
+ names = {"uri", "iceberg.jdbc.uri"},
+ required = true,
+ description = "JDBC connection URI for the Iceberg JDBC catalog."
+ )
+ private String uri = "";
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.user"},
+ required = false,
+ description = "Username for the Iceberg JDBC catalog."
+ )
+ private String jdbcUser;
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.password"},
+ required = false,
+ sensitive = true,
+ description = "Password for the Iceberg JDBC catalog."
+ )
+ private String jdbcPassword;
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.init-catalog-tables"},
+ required = false,
+ description = "Whether to create catalog tables if they do not
exist."
+ )
+ private String jdbcInitCatalogTables;
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.schema-version"},
+ required = false,
+ description = "Iceberg JDBC catalog schema version (V0/V1)."
+ )
+ private String jdbcSchemaVersion;
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.strict-mode"},
+ required = false,
+ description = "Whether to enforce strict JDBC catalog schema
checks."
+ )
+ private String jdbcStrictMode;
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.driver_url"},
+ required = false,
+ description = "JDBC driver JAR file path or URL. "
+ + "Can be a local file name (will look in
$DORIS_HOME/plugins/jdbc_drivers/) "
+ + "or a full URL (http://, https://, file://)."
+ )
+ private String driverUrl;
+
+ @ConnectorProperty(
+ names = {"iceberg.jdbc.driver_class"},
+ required = false,
+ description = "JDBC driver class name. If not specified, will be
auto-detected from the JDBC URI."
+ )
+ private String driverClass;
+
+ public IcebergJdbcMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public String getIcebergCatalogType() {
+ return IcebergExternalCatalog.ICEBERG_JDBC;
+ }
+
+ @Override
+ public void initNormalizeAndCheckProps() {
+ super.initNormalizeAndCheckProps();
+ initIcebergJdbcCatalogProperties();
+ }
+
+ @Override
+ protected void checkRequiredProperties() {
+ super.checkRequiredProperties();
+ if (StringUtils.isBlank(warehouse)) {
+ throw new IllegalArgumentException("Property warehouse is
required.");
+ }
+ }
+
+ @Override
+ public Catalog initCatalog(String catalogName, Map<String, String>
catalogProps,
+ List<StorageProperties> storagePropertiesList) {
+ Map<String, String> fileIOProperties = Maps.newHashMap();
+ Configuration conf = new Configuration();
+ toFileIOProperties(storagePropertiesList, fileIOProperties, conf);
+
+ Map<String, String> options =
Maps.newHashMap(getIcebergJdbcCatalogProperties());
+ options.putAll(fileIOProperties);
+
+ // Support dynamic JDBC driver loading
+ // We need to register the driver with DriverManager because Iceberg
uses DriverManager.getConnection()
+ // which doesn't respect Thread.contextClassLoader
+ if (StringUtils.isNotBlank(driverUrl)) {
+ registerJdbcDriver(driverUrl, driverClass);
+ LOG.info("Using dynamic JDBC driver from: {}", driverUrl);
+ }
+ return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
+ }
+
+ /**
+ * Register JDBC driver with DriverManager.
+ * This is necessary because DriverManager.getConnection() doesn't use
Thread.contextClassLoader,
+ * it uses the caller's ClassLoader. By registering the driver,
DriverManager can find it.
+ *
+ * @param driverUrl Path or URL to the JDBC driver JAR
+ * @param driverClassName Driver class name to register
+ */
+ private void registerJdbcDriver(String driverUrl, String driverClassName) {
+ try {
+ String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
+ URL url = new URL(fullDriverUrl);
+
+ ClassLoader classLoader =
DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
+ ClassLoader parent = getClass().getClassLoader();
+ return URLClassLoader.newInstance(new URL[]{u}, parent);
+ });
+
+ if (StringUtils.isBlank(driverClassName)) {
+ throw new IllegalArgumentException("driver_class is required
when driver_url is specified");
+ }
+
+ // Load the driver class and register it with DriverManager
+ Class<?> driverClass = Class.forName(driverClassName, true,
classLoader);
+ java.sql.Driver driver = (java.sql.Driver)
driverClass.getDeclaredConstructor().newInstance();
+
+ // Wrap with a shim driver because DriverManager refuses to use a
driver not loaded by system classloader
+ java.sql.DriverManager.registerDriver(new DriverShim(driver));
+ LOG.info("Successfully registered JDBC driver: {} from {}",
driverClassName, fullDriverUrl);
+
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Invalid driver URL: " +
driverUrl, e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Failed to load JDBC driver
class: " + driverClassName, e);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to register JDBC driver: " +
driverClassName, e);
+ }
+ }
+
+ /**
+ * A shim driver that wraps the actual driver loaded from a custom
ClassLoader.
+ * This is needed because DriverManager refuses to use a driver that
wasn't loaded by the system classloader.
+ */
+ private static class DriverShim implements java.sql.Driver {
+ private final java.sql.Driver delegate;
+
+ DriverShim(java.sql.Driver delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public java.sql.Connection connect(String url, java.util.Properties
info) throws java.sql.SQLException {
+ return delegate.connect(url, info);
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws java.sql.SQLException {
+ return delegate.acceptsURL(url);
+ }
+
+ @Override
+ public java.sql.DriverPropertyInfo[] getPropertyInfo(String url,
java.util.Properties info)
+ throws java.sql.SQLException {
+ return delegate.getPropertyInfo(url, info);
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return delegate.getMajorVersion();
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return delegate.getMinorVersion();
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return delegate.jdbcCompliant();
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws
java.sql.SQLFeatureNotSupportedException {
+ return delegate.getParentLogger();
+ }
+ }
+
+ public Map<String, String> getIcebergJdbcCatalogProperties() {
+ return Collections.unmodifiableMap(icebergJdbcCatalogProperties);
+ }
+
+ private void initIcebergJdbcCatalogProperties() {
+ icebergJdbcCatalogProperties = new HashMap<>();
+ icebergJdbcCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC);
+ icebergJdbcCatalogProperties.put(CatalogProperties.URI, uri);
+ if (StringUtils.isNotBlank(warehouse)) {
+
icebergJdbcCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION,
warehouse);
+ }
+ addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.user", jdbcUser);
+ addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.password",
jdbcPassword);
+ addIfNotBlank(icebergJdbcCatalogProperties,
"jdbc.init-catalog-tables", jdbcInitCatalogTables);
+ addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.schema-version",
jdbcSchemaVersion);
+ addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.strict-mode",
jdbcStrictMode);
+
+ if (origProps != null) {
+ for (Map.Entry<String, String> entry : origProps.entrySet()) {
+ String key = entry.getKey();
+ if (key != null && key.startsWith(JDBC_PREFIX)
+ && !icebergJdbcCatalogProperties.containsKey(key)) {
+ icebergJdbcCatalogProperties.put(key, entry.getValue());
+ }
+ }
+ }
+ }
+
+ private static void addIfNotBlank(Map<String, String> props, String key,
String value) {
+ if (StringUtils.isNotBlank(value)) {
+ props.put(key, value);
+ }
+ }
+
+ private static void toFileIOProperties(List<StorageProperties>
storagePropertiesList,
+ Map<String, String> fileIOProperties, Configuration conf) {
+ for (StorageProperties storageProperties : storagePropertiesList) {
+ if (storageProperties instanceof AbstractS3CompatibleProperties) {
+ toS3FileIOProperties((AbstractS3CompatibleProperties)
storageProperties, fileIOProperties);
+ }
+ if (storageProperties.getHadoopStorageConfig() != null) {
+ conf.addResource(storageProperties.getHadoopStorageConfig());
+ }
+ }
+ }
+
+ private static void toS3FileIOProperties(AbstractS3CompatibleProperties
s3Properties,
+ Map<String, String> options) {
+ if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
+ options.put(S3FileIOProperties.ENDPOINT,
s3Properties.getEndpoint());
+ }
+ if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
+ options.put(S3FileIOProperties.PATH_STYLE_ACCESS,
s3Properties.getUsePathStyle());
+ }
+ if (StringUtils.isNotBlank(s3Properties.getRegion())) {
+ options.put(AwsClientProperties.CLIENT_REGION,
s3Properties.getRegion());
+ }
+ if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
+ options.put(S3FileIOProperties.ACCESS_KEY_ID,
s3Properties.getAccessKey());
+ }
+ if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
+ options.put(S3FileIOProperties.SECRET_ACCESS_KEY,
s3Properties.getSecretKey());
+ }
+ if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+ options.put(S3FileIOProperties.SESSION_TOKEN,
s3Properties.getSessionToken());
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
index 64fd28216cf..333c6c44806 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
@@ -43,6 +43,7 @@ public class IcebergPropertiesFactory extends
AbstractMetastorePropertiesFactory
register("hadoop", IcebergFileSystemMetaStoreProperties::new);
register("s3tables", IcebergS3TablesMetaStoreProperties::new);
register("dlf", IcebergAliyunDLFMetaStoreProperties::new);
+ register("jdbc", IcebergJdbcMetaStoreProperties::new);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 6c495bc78c1..db06dada60d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -145,6 +145,7 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergJdbcExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergS3TablesExternalCatalog;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
@@ -419,6 +420,7 @@ public class GsonUtils {
.registerSubtype(IcebergRestExternalCatalog.class,
IcebergRestExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergDLFExternalCatalog.class,
IcebergDLFExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergHadoopExternalCatalog.class,
IcebergHadoopExternalCatalog.class.getSimpleName())
+ .registerSubtype(IcebergJdbcExternalCatalog.class,
IcebergJdbcExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergS3TablesExternalCatalog.class,
IcebergS3TablesExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonExternalCatalog.class,
PaimonExternalCatalog.class.getSimpleName())
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
new file mode 100644
index 00000000000..b35782b2033
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergJdbcMetaStorePropertiesTest {
+
+ @Test
+ public void testBasicJdbcProperties() {
+ Map<String, String> props = new HashMap<>();
+ props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("jdbc.user", "iceberg");
+ props.put("jdbc.password", "secret");
+
+ IcebergJdbcMetaStoreProperties jdbcProps = new
IcebergJdbcMetaStoreProperties(props);
+ jdbcProps.initNormalizeAndCheckProps();
+
+ Map<String, String> catalogProps =
jdbcProps.getIcebergJdbcCatalogProperties();
+ Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC,
+ catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+ Assertions.assertEquals("jdbc:mysql://localhost:3306/iceberg",
catalogProps.get(CatalogProperties.URI));
+ Assertions.assertEquals("s3://warehouse/path",
catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION));
+ Assertions.assertEquals("iceberg", catalogProps.get("jdbc.user"));
+ Assertions.assertEquals("secret", catalogProps.get("jdbc.password"));
+ }
+
+ @Test
+ public void testJdbcPrefixPassthrough() {
+ Map<String, String> props = new HashMap<>();
+ props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("jdbc.useSSL", "true");
+ props.put("jdbc.verifyServerCertificate", "true");
+
+ IcebergJdbcMetaStoreProperties jdbcProps = new
IcebergJdbcMetaStoreProperties(props);
+ jdbcProps.initNormalizeAndCheckProps();
+
+ Map<String, String> catalogProps =
jdbcProps.getIcebergJdbcCatalogProperties();
+ Assertions.assertEquals("true", catalogProps.get("jdbc.useSSL"));
+ Assertions.assertEquals("true",
catalogProps.get("jdbc.verifyServerCertificate"));
+ }
+
+ @Test
+ public void testMissingWarehouse() {
+ Map<String, String> props = new HashMap<>();
+ props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
+
+ IcebergJdbcMetaStoreProperties jdbcProps = new
IcebergJdbcMetaStoreProperties(props);
+ Assertions.assertThrows(IllegalArgumentException.class,
jdbcProps::initNormalizeAndCheckProps);
+ }
+
+ @Test
+ public void testMissingUri() {
+ Map<String, String> props = new HashMap<>();
+ props.put("warehouse", "s3://warehouse/path");
+
+ IcebergJdbcMetaStoreProperties jdbcProps = new
IcebergJdbcMetaStoreProperties(props);
+ Assertions.assertThrows(IllegalArgumentException.class,
jdbcProps::initNormalizeAndCheckProps);
+ }
+}
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out
new file mode 100644
index 00000000000..9e7a05f3757
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out
@@ -0,0 +1,42 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !datatypes_select --
+false 2 200000000000 2.5 3.5 234.56 world 2025-01-02
2025-01-02T11:00
+true 1 100000000000 1.5 2.5 123.45 hello 2025-01-01
2025-01-01T10:00
+true 3 300000000000 3.5 4.5 345.67 test 2025-01-03
2025-01-03T12:00
+
+-- !datatypes_count --
+3
+
+-- !datatypes_filter --
+1 hello
+3 test
+
+-- !partition_select --
+1 Item1 A 2025-01-01
+2 Item2 A 2025-01-01
+3 Item3 B 2025-01-02
+4 Item4 B 2025-01-02
+5 Item5 A 2025-01-03
+
+-- !partition_filter --
+1 Item1 A 2025-01-01
+2 Item2 A 2025-01-01
+5 Item5 A 2025-01-03
+
+-- !sys_snapshots --
+1
+
+-- !sys_history --
+1
+
+-- !after_overwrite --
+1 Item1 A 2025-01-01
+2 Item2 A 2025-01-01
+3 Item3 B 2025-01-02
+4 Item4 B 2025-01-02
+5 Item5 A 2025-01-03
+
+-- !mysql_select --
+1 Alice 2025-01-01T10:00
+2 Bob 2025-01-02T11:00
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy
new file mode 100644
index 00000000000..412d305da1c
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_jdbc_catalog",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("Iceberg test is not enabled, skip this test")
+ return;
+ }
+
+ String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) {
+ logger.info("Iceberg JDBC catalog test requires enableJdbcTest, skip
this test")
+ return;
+ }
+
+ // Get test environment configuration
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String jdbc_port = context.config.otherConfigs.get("pg_14_port")
+
+ // JDBC Catalog specific test - uses PostgreSQL as the metadata store
+ // If PostgreSQL port is not configured, this test will be skipped
+ if (jdbc_port == null || jdbc_port.isEmpty()) {
+ logger.info("Iceberg JDBC catalog PostgreSQL port not configured
(pg_14_port), skip this test")
+ return;
+ }
+
+ if (minio_port == null || minio_port.isEmpty() || externalEnvIp == null) {
+ logger.info("Iceberg test environment not fully configured, skip this
test")
+ return;
+ }
+
+ String catalog_name = "test_iceberg_jdbc_catalog"
+ String db_name = "jdbc_test_db"
+ String driver_name = "postgresql-42.5.0.jar"
+ String driver_download_url =
"${getS3Url()}/regression/jdbc_driver/${driver_name}"
+ String jdbc_drivers_dir = getFeConfig("jdbc_drivers_dir")
+ String local_driver_dir = "${context.config.dataPath}/jdbc_driver"
+ String local_driver_path = "${local_driver_dir}/${driver_name}"
+ String pg_db = "postgres"
+ String mysql_db = "iceberg_db"
+
+ // MySQL driver config
+ String mysql_driver_name = "mysql-connector-java-5.1.49-v2.jar"
+ String mysql_driver_download_url =
"${getS3Url()}/regression/jdbc_driver/mysql-connector-java-5.1.49.jar"
+ String local_mysql_driver_path = "${local_driver_dir}/${mysql_driver_name}"
+
+ def executeCommand = { String cmd, Boolean mustSuc ->
+ try {
+ logger.info("execute ${cmd}")
+ def proc = new ProcessBuilder("/bin/bash", "-c",
cmd).redirectErrorStream(true).start()
+ int exitcode = proc.waitFor()
+ if (exitcode != 0) {
+ logger.info("exit code: ${exitcode}, output\n: ${proc.text}")
+ if (mustSuc == true) {
+ assertTrue(false, "Execute failed: ${cmd}")
+ }
+ }
+ } catch (IOException e) {
+ assertTrue(false, "Execute timeout: ${cmd}")
+ }
+ }
+
+ // Ensure the PostgreSQL JDBC driver is available on all FE/BE nodes.
+ def host_ips = new ArrayList()
+ String[][] backends = sql """ show backends """
+ for (def b in backends) {
+ host_ips.add(b[1])
+ }
+ String[][] frontends = sql """ show frontends """
+ for (def f in frontends) {
+ host_ips.add(f[1])
+ }
+ host_ips = host_ips.unique()
+
+ executeCommand("mkdir -p ${local_driver_dir}", false)
+ if (!new File(local_driver_path).exists()) {
+ executeCommand("/usr/bin/curl --max-time 600 ${driver_download_url}
--output ${local_driver_path}", true)
+ }
+ if (!new File(local_mysql_driver_path).exists()) {
+ executeCommand("/usr/bin/curl --max-time 600
${mysql_driver_download_url} --output ${local_mysql_driver_path}", true)
+ }
+ for (def ip in host_ips) {
+ executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"mkdir -p
${jdbc_drivers_dir}\"", false)
+ scpFiles("root", ip, local_driver_path, jdbc_drivers_dir, false)
+ scpFiles("root", ip, local_mysql_driver_path, jdbc_drivers_dir, false)
+ }
+
+ try {
+ // Clean up existing catalog
+ sql """DROP CATALOG IF EXISTS ${catalog_name}"""
+
+ // Create Iceberg JDBC Catalog with PostgreSQL backend and MinIO
storage
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type' = 'iceberg',
+ 'iceberg.catalog.type' = 'jdbc',
+ 'uri' =
'jdbc:postgresql://${externalEnvIp}:${jdbc_port}/${pg_db}',
+ 'warehouse' = 's3://warehouse/jdbc_wh/',
+ 'iceberg.jdbc.driver_url' = '${driver_name}',
+ 'iceberg.jdbc.driver_class' = 'org.postgresql.Driver',
+ 'iceberg.jdbc.user' = 'postgres',
+ 'iceberg.jdbc.password' = '123456',
+ 'iceberg.jdbc.init-catalog-tables' = 'true',
+ 'iceberg.jdbc.schema-version' = 'V1',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.region' = 'us-east-1'
+ )
+ """
+
+ // Switch to the catalog
+ sql """SWITCH ${catalog_name}"""
+
+ // Test: Show catalogs
+ def catalogs = sql """SHOW CATALOGS"""
+ assertTrue(catalogs.toString().contains(catalog_name))
+
+ // Test: Create database
+ sql """DROP DATABASE IF EXISTS ${db_name} FORCE"""
+ sql """CREATE DATABASE ${db_name}"""
+
+ def databases = sql """SHOW DATABASES"""
+ assertTrue(databases.toString().contains(db_name))
+
+ sql """USE ${db_name}"""
+
+ // Test: Create non-partitioned table with various data types
+ sql """DROP TABLE IF EXISTS test_datatypes"""
+ sql """
+ CREATE TABLE test_datatypes (
+ c_boolean BOOLEAN,
+ c_int INT,
+ c_bigint BIGINT,
+ c_float FLOAT,
+ c_double DOUBLE,
+ c_decimal DECIMAL(10, 2),
+ c_string STRING,
+ c_date DATE,
+ c_datetime DATETIME
+ ) PROPERTIES (
+ 'write-format' = 'parquet',
+ 'compression-codec' = 'zstd'
+ )
+ """
+
+ def tables = sql """SHOW TABLES"""
+ assertTrue(tables.toString().contains("test_datatypes"))
+
+ // Test: Insert data with various types
+ sql """
+ INSERT INTO test_datatypes VALUES
+ (true, 1, 100000000000, 1.5, 2.5, 123.45, 'hello', '2025-01-01',
'2025-01-01 10:00:00'),
+ (false, 2, 200000000000, 2.5, 3.5, 234.56, 'world', '2025-01-02',
'2025-01-02 11:00:00'),
+ (true, 3, 300000000000, 3.5, 4.5, 345.67, 'test', '2025-01-03',
'2025-01-03 12:00:00')
+ """
+
+ // Test: Query data with different data types
+ order_qt_datatypes_select """SELECT * FROM test_datatypes ORDER BY
c_int"""
+ order_qt_datatypes_count """SELECT count(*) FROM test_datatypes"""
+ order_qt_datatypes_filter """SELECT c_int, c_string FROM
test_datatypes WHERE c_boolean = true ORDER BY c_int"""
+
+ // Test: Create partitioned table
+ sql """DROP TABLE IF EXISTS test_partitioned"""
+ sql """
+ CREATE TABLE test_partitioned (
+ id INT,
+ name STRING,
+ category STRING,
+ event_date DATE
+ )
+ PARTITION BY LIST (category) ()
+ PROPERTIES (
+ 'write-format' = 'parquet'
+ )
+ """
+
+ // Test: Insert into partitioned table
+ sql """
+ INSERT INTO test_partitioned VALUES
+ (1, 'Item1', 'A', '2025-01-01'),
+ (2, 'Item2', 'A', '2025-01-01'),
+ (3, 'Item3', 'B', '2025-01-02'),
+ (4, 'Item4', 'B', '2025-01-02'),
+ (5, 'Item5', 'A', '2025-01-03')
+ """
+
+ order_qt_partition_select """SELECT * FROM test_partitioned ORDER BY
id"""
+ order_qt_partition_filter """SELECT * FROM test_partitioned WHERE
category = 'A' ORDER BY id"""
+
+ // Test: System tables
+ order_qt_sys_snapshots """SELECT count(*) FROM
test_datatypes\$snapshots"""
+ order_qt_sys_history """SELECT count(*) FROM test_datatypes\$history"""
+
+ // Test: DESCRIBE TABLE
+ def desc = sql """DESCRIBE test_datatypes"""
+ assertTrue(desc.toString().contains("c_int"))
+ assertTrue(desc.toString().contains("c_string"))
+
+ // Test: INSERT OVERWRITE
+ sql """
+ INSERT OVERWRITE TABLE test_partitioned
+ SELECT * FROM test_partitioned WHERE category = 'A'
+ """
+ order_qt_after_overwrite """SELECT * FROM test_partitioned ORDER BY
id"""
+
+ // Test: Drop table
+ sql """DROP TABLE IF EXISTS test_datatypes"""
+ sql """DROP TABLE IF EXISTS test_partitioned"""
+
+ // Test: Drop database
+ sql """DROP DATABASE IF EXISTS ${db_name} FORCE"""
+
+ logger.info("Iceberg JDBC Catalog test completed successfully")
+
+ // MySQL Catalog Test
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ if (mysql_port != null) {
+ // Clean up MySQL database to remove old metadata
+ // This prevents issues where the database contains metadata
pointing to invalid S3 locations
+ String cleanupCmd = "mysql -h ${externalEnvIp} -P ${mysql_port} -u
root -p123456 -e 'DROP DATABASE IF EXISTS iceberg_db; CREATE DATABASE
iceberg_db;'"
+ executeCommand(cleanupCmd, false)
+
+ String mysql_catalog_name = "iceberg_jdbc_mysql"
+ try {
+ sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}"""
+ sql """
+ CREATE CATALOG ${mysql_catalog_name} PROPERTIES (
+ 'type' = 'iceberg',
+ 'iceberg.catalog.type' = 'jdbc',
+ 'uri' =
'jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysql_db}',
+ 'warehouse' = 's3://warehouse/jdbc_wh_mysql/',
+ 'iceberg.jdbc.driver_url' =
'file://${jdbc_drivers_dir}/${mysql_driver_name}',
+ 'iceberg.jdbc.driver_class' = 'com.mysql.jdbc.Driver',
+ 'iceberg.jdbc.user' = 'root',
+ 'iceberg.jdbc.password' = '123456',
+ 'iceberg.jdbc.init-catalog-tables' = 'true',
+ 'iceberg.jdbc.schema-version' = 'V1',
+ 's3.endpoint' =
'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.region' = 'us-east-1'
+ )
+ """
+
+ sql """SWITCH ${mysql_catalog_name}"""
+
+ String mysql_db_name = "mysql_test_db"
+ sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE"""
+ sql """CREATE DATABASE ${mysql_db_name}"""
+ sql """USE ${mysql_db_name}"""
+
+ sql """DROP TABLE IF EXISTS test_mysql_catalog"""
+ sql """
+ CREATE TABLE test_mysql_catalog (
+ id INT,
+ name STRING,
+ ts DATETIME
+ ) PROPERTIES (
+ 'write-format' = 'parquet'
+ )
+ """
+
+ sql """
+ INSERT INTO test_mysql_catalog VALUES
+ (1, 'Alice', '2025-01-01 10:00:00'),
+ (2, 'Bob', '2025-01-02 11:00:00')
+ """
+
+ order_qt_mysql_select """SELECT * FROM test_mysql_catalog
ORDER BY id"""
+
+ sql """DROP TABLE IF EXISTS test_mysql_catalog"""
+ sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE"""
+
+ logger.info("Iceberg JDBC Catalog (MySQL) test completed
successfully")
+ } catch (Exception e) {
+ logger.warn("MySQL Catalog test failed: ${e.message}")
+ // Don't fail the whole suite if MySQL is optional or
misconfigured
+ // But user asked for it, so maybe we should let it fail or
log error
+ throw e
+ } finally {
+ try {
+ sql """SWITCH internal"""
+ sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}"""
+ } catch (Exception e) {
+ logger.warn("Failed to cleanup MySQL catalog:
${e.message}")
+ }
+ }
+ }
+
+ } finally {
+ // Cleanup
+ try {
+ sql """SWITCH internal"""
+ sql """DROP CATALOG IF EXISTS ${catalog_name}"""
+ } catch (Exception e) {
+ logger.warn("Failed to cleanup catalog: ${e.message}")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]