This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new a5aafa7301 [Fix][Connector-JDBC] Fix JDBC driver selection for data source connections (#8986) a5aafa7301 is described below commit a5aafa7301964715f2efcd0d544ca397a256232c Author: chenhongyu <2795267...@qq.com> AuthorDate: Mon Mar 24 10:56:30 2025 +0800 [Fix][Connector-JDBC] Fix JDBC driver selection for data source connections (#8986) --- docs/en/connector-v2/changelog/connector-jdbc.md | 1 + .../jdbc/catalog/AbstractJdbcCatalog.java | 37 ++- .../seatunnel/jdbc/catalog/JdbcCatalogOptions.java | 2 + .../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 5 +- .../jdbc/catalog/dm/DamengCatalogFactory.java | 3 +- .../jdbc/catalog/highgo/HighGoCatalog.java | 5 +- .../jdbc/catalog/highgo/HighGoCatalogFactory.java | 3 +- .../seatunnel/jdbc/catalog/iris/IrisCatalog.java | 8 +- .../jdbc/catalog/iris/IrisCatalogFactory.java | 3 +- .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 8 +- .../jdbc/catalog/mysql/MySqlCatalogFactory.java | 3 +- .../catalog/oceanbase/OceanBaseCatalogFactory.java | 6 +- .../catalog/oceanbase/OceanBaseMySqlCatalog.java | 8 +- .../catalog/oceanbase/OceanBaseOracleCatalog.java | 5 +- .../jdbc/catalog/opengauss/OpenGaussCatalog.java | 5 +- .../catalog/opengauss/OpenGaussCatalogFactory.java | 3 +- .../jdbc/catalog/oracle/OracleCatalog.java | 11 +- .../jdbc/catalog/oracle/OracleCatalogFactory.java | 3 +- .../jdbc/catalog/psql/PostgresCatalog.java | 5 +- .../jdbc/catalog/psql/PostgresCatalogFactory.java | 3 +- .../jdbc/catalog/redshift/RedshiftCatalog.java | 5 +- .../catalog/redshift/RedshiftCatalogFactory.java | 3 +- .../jdbc/catalog/saphana/SapHanaCatalog.java | 5 +- .../catalog/saphana/SapHanaCatalogFactory.java | 3 +- .../jdbc/catalog/sqlserver/SqlServerCatalog.java | 5 +- .../catalog/sqlserver/SqlServerCatalogFactory.java | 3 +- .../seatunnel/jdbc/catalog/tidb/TiDBCatalog.java | 8 +- .../jdbc/catalog/tidb/TiDBCatalogFactory.java | 3 +- .../seatunnel/jdbc/catalog/xugu/XuguCatalog.java | 5 +- .../jdbc/catalog/xugu/XuguCatalogFactory.java | 3 +- .../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 1 + .../seatunnel/jdbc/catalog/dm/DamengJdbcTest.java | 8 +- .../jdbc/catalog/driver/DriverSelectionTest.java | 110 +++++++ .../jdbc/catalog/driver/ExpectedDriver.java | 343 +++++++++++++++++++++ .../seatunnel/jdbc/catalog/driver/OtherDriver.java | 73 +++++ .../jdbc/catalog/mysql/MySqlCatalogTest.java | 7 +- .../jdbc/catalog/oracle/OracleCatalogTest.java | 1 + .../jdbc/catalog/psql/PostgresCatalogTest.java | 4 +- .../catalog/sqlserver/SqlServerCatalogTest.java | 7 +- .../cdc/oracle/OracleCDCWithSchemaChangeIT.java | 4 +- .../connectors/seatunnel/jdbc/JdbcMysqlIT.java | 3 +- .../connectors/seatunnel/jdbc/JdbcOracleIT.java | 3 +- .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 3 +- .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java | 3 +- .../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 9 +- .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 3 +- .../seatunnel/jdbc/JdbcMySqlCreateTableIT.java | 6 +- .../seatunnel/jdbc/JdbcSqlServerCreateTableIT.java | 6 +- .../seatunnel/jdbc/JdbcOracleLowercaseTableIT.java | 6 +- .../connectors/seatunnel/jdbc/JdbcHighGoIT.java | 3 +- .../connectors/seatunnel/jdbc/JdbcIrisIT.java | 3 +- .../seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java | 3 +- .../seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java | 3 +- .../seatunnel/jdbc/JdbcMysqlSplitIT.java | 2 +- .../connectors/seatunnel/jdbc/JdbcOpenGaussIT.java | 3 +- .../connectors/seatunnel/jdbc/JdbcXuguIT.java | 3 +- .../paimon/PaimonSinkWithSchemaEvolutionIT.java | 3 +- 57 files changed, 715 insertions(+), 78 deletions(-) diff --git a/docs/en/connector-v2/changelog/connector-jdbc.md b/docs/en/connector-v2/changelog/connector-jdbc.md index feec543b0c..2e5f50ac72 100644 --- a/docs/en/connector-v2/changelog/connector-jdbc.md +++ b/docs/en/connector-v2/changelog/connector-jdbc.md @@ -2,6 +2,7 @@ | Change | Commit | Version | | --- | --- | --- | +|[Fix][Connector-JDBC] Fix JDBC driver selection for data source connections(#8986) | https://github.com/apache/seatunnel/pull/8986 | dev | |[Fix][Connector-V2] Fix parse SqlServer JDBC Url error (#8784)|https://github.com/apache/seatunnel/commit/373d2162d3| dev | |[Improve][Jdbc] Support upsert for opengauss (#8627)|https://github.com/apache/seatunnel/commit/56110bf392| dev | |[Improve][Jdbc] Remove useless utils. (#8793)|https://github.com/apache/seatunnel/commit/36a7533e85| dev | diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index bd041cc3ca..a1d82cbe2f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -49,12 +49,14 @@ import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.Driver; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -82,12 +84,15 @@ public abstract class AbstractJdbcCatalog implements Catalog { protected final Map<String, Connection> connectionMap; + private final String driverClass; + public AbstractJdbcCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { + String defaultSchema, + String driverClass) { checkArgument(StringUtils.isNotBlank(username)); checkArgument(StringUtils.isNotBlank(urlInfo.getUrlWithoutDatabase())); @@ -100,6 +105,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { this.suffix = urlInfo.getSuffix(); this.defaultSchema = Optional.ofNullable(defaultSchema); this.connectionMap = new ConcurrentHashMap<>(); + this.driverClass = driverClass; } @Override @@ -116,6 +122,35 @@ public abstract class AbstractJdbcCatalog implements Catalog { if (connectionMap.containsKey(url)) { return connectionMap.get(url); } + if (driverClass != null) { + log.info("try to find driver {}", driverClass); + java.util.Properties info = new java.util.Properties(); + if (username != null) { + info.put("user", username); + } + if (pwd != null) { + info.put("password", pwd); + } + Enumeration<Driver> drivers = DriverManager.getDrivers(); + try { + // Driver Manager may load the wrong driver, prioritize finding the driver by class + // name + while (drivers.hasMoreElements()) { + Driver driver = drivers.nextElement(); + if (StringUtils.equals(driver.getClass().getName(), driverClass)) { + try { + Connection connection = driver.connect(url, info); + connectionMap.put(url, connection); + return connection; + } catch (Exception e) { + log.info("try connector failed", e); + } + } + } + } catch (Exception e) { + log.info("find driver error, back to DriverManager.getConnection", e); + } + } try { Connection connection = DriverManager.getConnection(url, username, pwd); connectionMap.put(url, connection); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java index c412ca9218..53e40c1e9b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java @@ -84,4 +84,6 @@ public interface JdbcCatalogOptions { .booleanType() .defaultValue(true) .withDescription("Create index or not when auto create table"); + + Option<String> DRIVER = Options.key("driver").stringType().noDefaultValue(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java index ee23c1806f..0878dd007f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java @@ -64,8 +64,9 @@ public class DamengCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java index 5dac3764d2..46f47b40cc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java @@ -52,7 +52,8 @@ public class DamengCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java index b53f9d60a2..bced60c7dd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java @@ -26,7 +26,8 @@ public class HighGoCatalog extends PostgresCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java index 2ffa6a21b1..a506ce1fff 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java @@ -42,7 +42,8 @@ public class HighGoCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } public String factoryIdentifier() { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java index e91006fdca..6cd6610af2 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java @@ -61,8 +61,12 @@ public class IrisCatalog extends AbstractJdbcCatalog { "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW'"; public IrisCatalog( - String catalogName, String username, String password, JdbcUrlUtil.UrlInfo urlInfo) { - super(catalogName, username, password, urlInfo, null); + String catalogName, + String username, + String password, + JdbcUrlUtil.UrlInfo urlInfo, + String driverClass) { + super(catalogName, username, password, urlInfo, null, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java index 3304575d2a..c8c1747896 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java @@ -51,7 +51,8 @@ public class IrisCatalogFactory implements CatalogFactory { catalogName, options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), - urlInfo); + urlInfo, + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index 323556e137..5ae0eab200 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -62,8 +62,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog { private MySqlTypeConverter typeConverter; public MySqlCatalog( - String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { - super(catalogName, username, pwd, urlInfo, null); + String catalogName, + String username, + String pwd, + JdbcUrlUtil.UrlInfo urlInfo, + String driverClass) { + super(catalogName, username, pwd, urlInfo, null, driverClass); this.version = resolveVersion(); this.typeConverter = new MySqlTypeConverter(version); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java index 5330fe0176..8665c6c07c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java @@ -51,7 +51,8 @@ public class MySqlCatalogFactory implements CatalogFactory { catalogName, options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), - urlInfo); + urlInfo, + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java index 01d035e167..c13779b9e1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java @@ -71,13 +71,15 @@ public class OceanBaseCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } return new OceanBaseMySqlCatalog( catalogName, options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), - urlInfo); + urlInfo, + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java index 4d4527653f..86410b573f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java @@ -66,8 +66,12 @@ public class OceanBaseMySqlCatalog extends AbstractJdbcCatalog { private OceanBaseMySqlTypeConverter typeConverter; public OceanBaseMySqlCatalog( - String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { - super(catalogName, username, pwd, urlInfo, null); + String catalogName, + String username, + String pwd, + JdbcUrlUtil.UrlInfo urlInfo, + String driverClass) { + super(catalogName, username, pwd, urlInfo, null, driverClass); this.typeConverter = new OceanBaseMySqlTypeConverter(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java index 328b3f38b3..4557d7667a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java @@ -38,8 +38,9 @@ public class OceanBaseOracleCatalog extends OracleCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java index 805701cc45..3030dd7b22 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java @@ -34,8 +34,9 @@ public class OpenGaussCatalog extends PostgresCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @VisibleForTesting diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java index bff96ff6d3..6d9dd525da 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java @@ -52,7 +52,8 @@ public class OpenGaussCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index 3a90c61c6d..d7d86c46b1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -81,14 +81,16 @@ public class OracleCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { + String defaultSchema, + String driverClass) { this( catalogName, username, pwd, urlInfo, defaultSchema, - JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue()); + JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue(), + driverClass); } public OracleCatalog( @@ -97,8 +99,9 @@ public class OracleCatalog extends AbstractJdbcCatalog { String pwd, JdbcUrlUtil.UrlInfo urlInfo, String defaultSchema, - boolean decimalTypeNarrowing) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + boolean decimalTypeNarrowing, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); this.decimalTypeNarrowing = decimalTypeNarrowing; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java index 2b51d97621..209d307c40 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java @@ -54,7 +54,8 @@ public class OracleCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.PASSWORD), urlInfo, options.get(JdbcCatalogOptions.SCHEMA), - options.get(JdbcOptions.DECIMAL_TYPE_NARROWING)); + options.get(JdbcOptions.DECIMAL_TYPE_NARROWING), + options.get(JdbcOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index 4851f3461a..ccb895edc4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -86,8 +86,9 @@ public class PostgresCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java index dc0d416e6f..8fed8675cb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java @@ -52,7 +52,8 @@ public class PostgresCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java index 76ea97d166..3d130aec46 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java @@ -45,8 +45,9 @@ public class RedshiftCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String schema) { - super(catalogName, username, pwd, urlInfo, schema); + String schema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, schema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java index 31409b3b21..44985ef7f4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java @@ -59,7 +59,8 @@ public class RedshiftCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java index f63c6d7221..bc1157d3cd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java @@ -85,8 +85,9 @@ public class SapHanaCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java index 1f196f3039..4991bc759e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java @@ -52,7 +52,8 @@ public class SapHanaCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index 7c759e2eda..d87823c535 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -64,8 +64,9 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java index fe15b7dde8..153fea6039 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java @@ -45,7 +45,8 @@ public class SqlServerCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java index 869a06586e..64fee6c081 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java @@ -23,7 +23,11 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog public class TiDBCatalog extends MySqlCatalog { public TiDBCatalog( - String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { - super(catalogName, username, pwd, urlInfo); + String catalogName, + String username, + String pwd, + JdbcUrlUtil.UrlInfo urlInfo, + String driverClass) { + super(catalogName, username, pwd, urlInfo, driverClass); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java index a661f9626d..01d07d6502 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java @@ -51,7 +51,8 @@ public class TiDBCatalogFactory implements CatalogFactory { catalogName, options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), - urlInfo); + urlInfo, + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java index 2881280d32..fab09c1852 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java @@ -115,8 +115,9 @@ public class XuguCatalog extends AbstractJdbcCatalog { String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo, - String defaultSchema) { - super(catalogName, username, pwd, urlInfo, defaultSchema); + String defaultSchema, + String driverClass) { + super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java index ac0f3e24ae..066ff57dd3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java @@ -53,7 +53,8 @@ public class XuguCatalogFactory implements CatalogFactory { options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo, - options.get(JdbcCatalogOptions.SCHEMA)); + options.get(JdbcCatalogOptions.SCHEMA), + options.get(JdbcCatalogOptions.DRIVER)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java index 73773b559b..0d53e3dec0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java @@ -403,6 +403,7 @@ public class JdbcCatalogUtils { .ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val)); catalogConfig.put( JdbcOptions.DECIMAL_TYPE_NARROWING.key(), config.isDecimalTypeNarrowing()); + catalogConfig.put(JdbcCatalogOptions.DRIVER.key(), config.getDriverName()); return ReadonlyConfig.fromMap(catalogConfig); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java index b0f6d42235..d64d0710d6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java @@ -52,7 +52,13 @@ public class DamengJdbcTest { @BeforeAll static void before() { DAMENG_CATALOG = - new DamengCatalog("DAMENG_CATALOG", "DM_USER01", "Te$Dt_1234", DM_URL_INFO, null); + new DamengCatalog( + "DAMENG_CATALOG", + "DM_USER01", + "Te$Dt_1234", + DM_URL_INFO, + null, + "dm.jdbc.driver.DmDriver"); DAMENG_CATALOG.open(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/DriverSelectionTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/DriverSelectionTest.java new file mode 100644 index 0000000000..90ab8ed4eb --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/DriverSelectionTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.driver; + +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +public class DriverSelectionTest { + + @Test + void assertDriver() { + String url = "jdbc:mock://127.0.0.1:3306/test?useSSL=false"; + String driverName = OtherDriver.class.getName(); + String expectedDriverName = ExpectedDriver.class.getName(); + JdbcUrlUtil.UrlInfo MysqlUrlInfo = JdbcUrlUtil.getUrlInfo(url); + MySqlCatalog mySqlCatalog = + new MySqlCatalog("mock", "root", "123456", MysqlUrlInfo, expectedDriverName); + try { + Class.forName(driverName); + Class.forName(expectedDriverName); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + List<String> driverNames = new ArrayList<>(); + Enumeration<Driver> drivers = DriverManager.getDrivers(); + while (drivers.hasMoreElements()) { + driverNames.add(drivers.nextElement().getClass().getName()); + } + int expectedDriverIndex = driverNames.indexOf(expectedDriverName); + int otherDriverIndex = driverNames.indexOf(driverName); + assert expectedDriverIndex != -1 : "ExpectedDriver not registered in DriverManager"; + assert otherDriverIndex != -1 : "OtherDriver not registered in DriverManager"; + System.out.println( + "expectedDriverIndex is " + + expectedDriverIndex + + " otherDriverIndex is " + + otherDriverIndex); + assert expectedDriverIndex > otherDriverIndex + : "ExpectedDriver should be registered after OtherDriver, but found ExpectedDriver at index " + + expectedDriverIndex + + " and OtherDriver at index " + + otherDriverIndex; + /* + * This test verifies that even when the driver is registered later in the DriverManager's list, + * the system can still load the correct jar/driver based on the specified driverName parameter. + * This ensures that our connection mechanism correctly prioritizes explicitly specified drivers + * over the default driver discovery order in DriverManager. + */ + Method getConnectionMethod = findGetConnectionMethod(mySqlCatalog.getClass()); + if (getConnectionMethod != null) { + getConnectionMethod.setAccessible(true); + Connection connection; + try { + connection = (Connection) getConnectionMethod.invoke(mySqlCatalog, url); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + System.out.println( + "Connection class: " + + connection + .getClass() + .getName() + .startsWith(ExpectedDriver.class.getName())); + assert connection.getClass().getName().startsWith(ExpectedDriver.class.getName()) + : "Connection should be created by " + + expectedDriverName + + " but was created by a class named " + + connection.getClass().getName(); + } else { + assert false : "Could not find getConnection method"; + } + } + + private Method findGetConnectionMethod(Class<?> clazz) { + if (clazz == null) { + return null; + } + try { + return clazz.getDeclaredMethod("getConnection", String.class); + } catch (NoSuchMethodException e) { + return findGetConnectionMethod(clazz.getSuperclass()); + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/ExpectedDriver.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/ExpectedDriver.java new file mode 100644 index 0000000000..78e46a8ba4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/ExpectedDriver.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.driver; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +public class ExpectedDriver implements Driver { + + static { + try { + DriverManager.registerDriver(new ExpectedDriver()); + } catch (SQLException e) { + throw new RuntimeException("register expected driver error", e); + } + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return new Connection() { + @Override + public Statement createStatement() throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return null; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException {} + + @Override + public boolean getAutoCommit() throws SQLException { + return false; + } + + @Override + public void commit() throws SQLException {} + + @Override + public void rollback() throws SQLException {} + + @Override + public void close() throws SQLException {} + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException {} + + @Override + public boolean isReadOnly() throws SQLException { + return false; + } + + @Override + public void setCatalog(String catalog) throws SQLException {} + + @Override + public String getCatalog() throws SQLException { + return null; + } + + @Override + public void setTransactionIsolation(int level) throws SQLException {} + + @Override + public int getTransactionIsolation() throws SQLException { + return 0; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException {} + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) + throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement( + String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall( + String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return null; + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException {} + + @Override + public void setHoldability(int holdability) throws SQLException {} + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return null; + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException {} + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException {} + + @Override + public Statement createStatement( + int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement( + String sql, + int resultSetType, + int resultSetConcurrency, + int resultSetHoldability) + throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall( + String sql, + int resultSetType, + int resultSetConcurrency, + int resultSetHoldability) + throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) + throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) + throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) + throws SQLException { + return null; + } + + @Override + public Clob createClob() throws SQLException { + return null; + } + + @Override + public Blob createBlob() throws SQLException { + return null; + } + + @Override + public NClob createNClob() throws SQLException { + return null; + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return null; + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return false; + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException {} + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException {} + + @Override + public String getClientInfo(String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return null; + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + @Override + public void setSchema(String schema) throws SQLException {} + + @Override + public String getSchema() throws SQLException { + return null; + } + + @Override + public void abort(Executor executor) throws SQLException {} + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) + throws SQLException {} + + @Override + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } + }; + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return url != null && url.startsWith("jdbc:mock"); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/OtherDriver.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/OtherDriver.java new file mode 100644 index 0000000000..a3e1abd72f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/OtherDriver.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.driver; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + +public class OtherDriver implements Driver { + + static { + try { + DriverManager.registerDriver(new OtherDriver()); + } catch (SQLException e) { + throw new RuntimeException("register other driver error", e); + } + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return null; + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return url != null && url.startsWith("jdbc:mock"); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java index bc89d4c8c3..f9d18bfa4d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java @@ -75,9 +75,10 @@ class MySqlCatalogTest { tablePathMySql = TablePath.of(databaseName, "mysql_to_mysql"); tablePathPG = TablePath.of(databaseName, "pg_to_mysql"); tablePathOracle = TablePath.of(databaseName, "oracle_to_mysql"); - sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, null); - mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", MysqlUrlInfo); - postgresCatalog = new PostgresCatalog("postgres", "postgres", "postgres", pg, null); + sqlServerCatalog = + new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, null, null); + mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", MysqlUrlInfo, null); + postgresCatalog = new PostgresCatalog("postgres", "postgres", "postgres", pg, null, null); mySqlCatalog.open(); sqlServerCatalog.open(); postgresCatalog.open(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java index 9f4d8e8619..6a3e9939a5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java @@ -40,6 +40,7 @@ class OracleCatalogTest { "test", "oracle", OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521:xe"), + null, null); catalog.open(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java index 05a013ef69..d0800c27fa 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java @@ -43,6 +43,7 @@ class PostgresCatalogTest { "pg", "pg#2024", JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/postgres"), + null, null); catalog.open(); @@ -55,7 +56,8 @@ class PostgresCatalogTest { "mysql", "root", "root@123", - JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest")); + JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest"), + null); mySqlCatalog.open(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java index a18cc4abd9..02536510d3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java @@ -66,9 +66,10 @@ class SqlServerCatalogTest { tablePathMySql = TablePath.of(databaseName, schemaName, "mysql_to_sqlserver"); tablePathPG = TablePath.of(databaseName, schemaName, "pg_to_sqlserver"); tablePathOracle = TablePath.of(databaseName, schemaName, "oracle_to_sqlserver"); - sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, null); - mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", MysqlUrlInfo); - postgresCatalog = new PostgresCatalog("postgres", "postgres", "postgres", pg, null); + sqlServerCatalog = + new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, null, null); + mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", MysqlUrlInfo, null); + postgresCatalog = new PostgresCatalog("postgres", "postgres", "postgres", pg, null, null); mySqlCatalog.open(); sqlServerCatalog.open(); postgresCatalog.open(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java index a6a1ed920d..2466aa023f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java @@ -415,13 +415,15 @@ public class OracleCDCWithSchemaChangeIT extends AbstractOracleCDCIT implements "mysql", MYSQL_CONNECTOR_NAME, MYSQL_CONNECTOR_PASSWORD, - JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl())); + JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()), + null); OracleCatalog oracleCatalog = new OracleCatalog( "oracle", CONNECTOR_USER, CONNECTOR_PWD, OracleURLParser.parse(ORACLE_CONTAINER.getJdbcUrl()), + null, null)) { mySqlCatalog.open(); oracleCatalog.open(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index feac8d11ca..1cbc4e8491 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -446,7 +446,8 @@ public class JdbcMysqlIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), JdbcUrlUtil.getUrlInfo( - jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()))); + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), + null); catalog.open(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java index 7b21e27364..9d1cd79376 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java @@ -308,7 +308,8 @@ public class JdbcOracleIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), OracleURLParser.parse(jdbcUrl), - SCHEMA); + SCHEMA, + null); catalog.open(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java index ca0391b942..82b78151fa 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java @@ -315,7 +315,8 @@ public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase { USERNAME, PASSWORD, JdbcUrlUtil.getUrlInfo( - jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()))); + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), + null); catalog.open(); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java index ebe8416143..aface49198 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java @@ -213,7 +213,8 @@ public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase { USERNAME, PASSWORD, JdbcUrlUtil.getUrlInfo(jdbcCase.getJdbcUrl().replace(HOST, HOSTNAME)), - SCHEMA); + SCHEMA, + null); catalog.open(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java index 342e0317e1..916f3be8ef 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java @@ -278,7 +278,8 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { POSTGRESQL_CONTAINER.getUsername(), POSTGRESQL_CONTAINER.getPassword(), JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()), - schema); + schema, + null); postgresCatalog.open(); CatalogTable catalogTable = postgresCatalog.getTable(sourceTablePath); @@ -361,7 +362,8 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { POSTGRESQL_CONTAINER.getUsername(), POSTGRESQL_CONTAINER.getPassword(), JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()), - schema); + schema, + null); catalog.open(); TablePath tablePath = new TablePath(databaseName, schema, tableName); @@ -544,7 +546,8 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { POSTGRESQL_CONTAINER.getUsername(), POSTGRESQL_CONTAINER.getPassword(), JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()), - schema); + schema, + null); postgresCatalog.open(); CatalogTable catalogTable = postgresCatalog.getTable(tablePathPG); // sink tableExists ? diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java index a7055df8e9..ac66d9fb54 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java @@ -340,7 +340,8 @@ public class JdbcSqlServerIT extends AbstractJdbcIT { jdbcCase.getPassword(), SqlServerURLParser.parse( jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), - SQLSERVER_SCHEMA); + SQLSERVER_SCHEMA, + null); catalog.open(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java index a1db4fadd4..b4ac622a69 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java @@ -270,10 +270,10 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase implements TestResourc TablePath tablePathPG = TablePath.of("pg", "public", "mysql_auto_create_pg"); SqlServerCatalog sqlServerCatalog = - new SqlServerCatalog("sqlserver", "sa", PASSWORD, sqlParse, "dbo"); - MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", PASSWORD, MysqlUrlInfo); + new SqlServerCatalog("sqlserver", "sa", PASSWORD, sqlParse, "dbo", null); + MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", PASSWORD, MysqlUrlInfo, null); PostgresCatalog postgresCatalog = - new PostgresCatalog("postgres", "testUser", PASSWORD, pg, "public"); + new PostgresCatalog("postgres", "testUser", PASSWORD, pg, "public", null); mySqlCatalog.open(); sqlServerCatalog.open(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java index e818d7c8fe..9eb962d44f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java @@ -282,10 +282,10 @@ public class JdbcSqlServerCreateTableIT extends TestSuiteBase implements TestRes TablePath tablePathPG = TablePath.of("pg", "public", "sqlserver_auto_create_pg"); SqlServerCatalog sqlServerCatalog = - new SqlServerCatalog("sqlserver", "sa", password, sqlParse, "dbo"); - MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", PASSWORD, MysqlUrlInfo); + new SqlServerCatalog("sqlserver", "sa", password, sqlParse, "dbo", null); + MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", PASSWORD, MysqlUrlInfo, null); PostgresCatalog postgresCatalog = - new PostgresCatalog("postgres", "testUser", PASSWORD, pg, "public"); + new PostgresCatalog("postgres", "testUser", PASSWORD, pg, "public", null); mySqlCatalog.open(); sqlServerCatalog.open(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java index 2f3a2119c4..506d8ece42 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java @@ -234,7 +234,8 @@ public class JdbcOracleLowercaseTableIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), OracleURLParser.parse(jdbcUrl), - SCHEMA); + SCHEMA, + null); catalog.open(); } @@ -250,7 +251,8 @@ public class JdbcOracleLowercaseTableIT extends AbstractJdbcIT { jdbcCase.getPassword(), OracleURLParser.parse( jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), - SCHEMA); + SCHEMA, + null); oracleCatalog.open(); catalog.executeSql( tablePathOracle, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java index d3d05fd016..3decf63fde 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java @@ -303,7 +303,8 @@ public class JdbcHighGoIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), JdbcUrlUtil.getUrlInfo(jdbcUrl), - SCHEMA); + SCHEMA, + null); catalog.open(); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java index 609710b2ca..43663b5ca4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java @@ -585,7 +585,8 @@ public class JdbcIrisIT extends AbstractJdbcIT { "iris", jdbcCase.getUserName(), jdbcCase.getPassword(), - JdbcUrlUtil.getUrlInfo(jdbcUrl)); + JdbcUrlUtil.getUrlInfo(jdbcUrl), + "com.intersystems.jdbc.IRISDriver"); // set connection ((IrisCatalog) catalog).setConnection(jdbcUrl, connection); catalog.open(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java index 41a357f47a..b1361b58e5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java @@ -156,7 +156,8 @@ public class JdbcMySqlSaveModeCatalogIT extends TestSuiteBase implements TestRes public void testCatalog() { TablePath tablePathMySql = TablePath.of("auto", "mysql_auto_create"); TablePath tablePathMySqlSink = TablePath.of("auto", "mysql_auto_create_sink"); - MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", MYSQL_PASSWORD, MysqlUrlInfo); + MySqlCatalog mySqlCatalog = + new MySqlCatalog("mysql", "root", MYSQL_PASSWORD, MysqlUrlInfo, null); mySqlCatalog.open(); CatalogTable catalogTable = mySqlCatalog.getTable(tablePathMySql); // source comment diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java index 1b6faa0a75..db8f6b758e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java @@ -323,7 +323,8 @@ public class JdbcMysqlSaveModeHandlerIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), JdbcUrlUtil.getUrlInfo( - jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()))); + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), + null); catalog.open(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java index 35d29c37fb..6dcc6e51ba 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java @@ -362,7 +362,7 @@ public class JdbcMysqlSplitIT extends TestSuiteBase implements TestResource { TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE); MySqlCatalog mySqlCatalog = - new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, mysqlUrlInfo); + new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, mysqlUrlInfo, null); mySqlCatalog.open(); Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql)); CatalogTable table = mySqlCatalog.getTable(tablePathMySql); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java index 2c237284f0..7fff1a2db1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java @@ -325,7 +325,8 @@ public class JdbcOpenGaussIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), JdbcUrlUtil.getUrlInfo(jdbcUrl), - SCHEMA); + SCHEMA, + null); // set connection ((OpenGaussCatalog) catalog).setConnection(jdbcUrl, connection); catalog.open(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java index 42f8affe63..ab3d0450bd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java @@ -244,7 +244,8 @@ public class JdbcXuguIT extends AbstractJdbcIT { jdbcCase.getUserName(), jdbcCase.getPassword(), JdbcUrlUtil.getUrlInfo(jdbcUrl), - XUGU_SCHEMA); + XUGU_SCHEMA, + null); catalog.open(); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java index 0b9f699329..d82ef45278 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java @@ -368,7 +368,8 @@ public class PaimonSinkWithSchemaEvolutionIT extends AbstractPaimonIT implements "mysql", MYSQL_USER_NAME, MYSQL_USER_PASSWORD, - JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()))) { + JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()), + null)) { mySqlCatalog.open(); CatalogTable mySqlCatalogTable = mySqlCatalog.getTable(TablePath.of(MYSQL_DATABASE, SOURCE_TABLE));