This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a5aafa7301 [Fix][Connector-JDBC] Fix JDBC driver selection for data 
source connections (#8986)
a5aafa7301 is described below

commit a5aafa7301964715f2efcd0d544ca397a256232c
Author: chenhongyu <2795267...@qq.com>
AuthorDate: Mon Mar 24 10:56:30 2025 +0800

    [Fix][Connector-JDBC] Fix JDBC driver selection for data source connections 
(#8986)
---
 docs/en/connector-v2/changelog/connector-jdbc.md   |   1 +
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  37 ++-
 .../seatunnel/jdbc/catalog/JdbcCatalogOptions.java |   2 +
 .../seatunnel/jdbc/catalog/dm/DamengCatalog.java   |   5 +-
 .../jdbc/catalog/dm/DamengCatalogFactory.java      |   3 +-
 .../jdbc/catalog/highgo/HighGoCatalog.java         |   5 +-
 .../jdbc/catalog/highgo/HighGoCatalogFactory.java  |   3 +-
 .../seatunnel/jdbc/catalog/iris/IrisCatalog.java   |   8 +-
 .../jdbc/catalog/iris/IrisCatalogFactory.java      |   3 +-
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java |   8 +-
 .../jdbc/catalog/mysql/MySqlCatalogFactory.java    |   3 +-
 .../catalog/oceanbase/OceanBaseCatalogFactory.java |   6 +-
 .../catalog/oceanbase/OceanBaseMySqlCatalog.java   |   8 +-
 .../catalog/oceanbase/OceanBaseOracleCatalog.java  |   5 +-
 .../jdbc/catalog/opengauss/OpenGaussCatalog.java   |   5 +-
 .../catalog/opengauss/OpenGaussCatalogFactory.java |   3 +-
 .../jdbc/catalog/oracle/OracleCatalog.java         |  11 +-
 .../jdbc/catalog/oracle/OracleCatalogFactory.java  |   3 +-
 .../jdbc/catalog/psql/PostgresCatalog.java         |   5 +-
 .../jdbc/catalog/psql/PostgresCatalogFactory.java  |   3 +-
 .../jdbc/catalog/redshift/RedshiftCatalog.java     |   5 +-
 .../catalog/redshift/RedshiftCatalogFactory.java   |   3 +-
 .../jdbc/catalog/saphana/SapHanaCatalog.java       |   5 +-
 .../catalog/saphana/SapHanaCatalogFactory.java     |   3 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   |   5 +-
 .../catalog/sqlserver/SqlServerCatalogFactory.java |   3 +-
 .../seatunnel/jdbc/catalog/tidb/TiDBCatalog.java   |   8 +-
 .../jdbc/catalog/tidb/TiDBCatalogFactory.java      |   3 +-
 .../seatunnel/jdbc/catalog/xugu/XuguCatalog.java   |   5 +-
 .../jdbc/catalog/xugu/XuguCatalogFactory.java      |   3 +-
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |   1 +
 .../seatunnel/jdbc/catalog/dm/DamengJdbcTest.java  |   8 +-
 .../jdbc/catalog/driver/DriverSelectionTest.java   | 110 +++++++
 .../jdbc/catalog/driver/ExpectedDriver.java        | 343 +++++++++++++++++++++
 .../seatunnel/jdbc/catalog/driver/OtherDriver.java |  73 +++++
 .../jdbc/catalog/mysql/MySqlCatalogTest.java       |   7 +-
 .../jdbc/catalog/oracle/OracleCatalogTest.java     |   1 +
 .../jdbc/catalog/psql/PostgresCatalogTest.java     |   4 +-
 .../catalog/sqlserver/SqlServerCatalogTest.java    |   7 +-
 .../cdc/oracle/OracleCDCWithSchemaChangeIT.java    |   4 +-
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     |   3 +-
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    |   3 +-
 .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java       |   3 +-
 .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java      |   3 +-
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  |   9 +-
 .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java |   3 +-
 .../seatunnel/jdbc/JdbcMySqlCreateTableIT.java     |   6 +-
 .../seatunnel/jdbc/JdbcSqlServerCreateTableIT.java |   6 +-
 .../seatunnel/jdbc/JdbcOracleLowercaseTableIT.java |   6 +-
 .../connectors/seatunnel/jdbc/JdbcHighGoIT.java    |   3 +-
 .../connectors/seatunnel/jdbc/JdbcIrisIT.java      |   3 +-
 .../seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java |   3 +-
 .../seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java |   3 +-
 .../seatunnel/jdbc/JdbcMysqlSplitIT.java           |   2 +-
 .../connectors/seatunnel/jdbc/JdbcOpenGaussIT.java |   3 +-
 .../connectors/seatunnel/jdbc/JdbcXuguIT.java      |   3 +-
 .../paimon/PaimonSinkWithSchemaEvolutionIT.java    |   3 +-
 57 files changed, 715 insertions(+), 78 deletions(-)

diff --git a/docs/en/connector-v2/changelog/connector-jdbc.md 
b/docs/en/connector-v2/changelog/connector-jdbc.md
index feec543b0c..2e5f50ac72 100644
--- a/docs/en/connector-v2/changelog/connector-jdbc.md
+++ b/docs/en/connector-v2/changelog/connector-jdbc.md
@@ -2,6 +2,7 @@
 
 | Change | Commit | Version |
 | --- | --- | --- |
+|[Fix][Connector-JDBC] Fix JDBC driver selection for data source 
connections(#8986) | https://github.com/apache/seatunnel/pull/8986 | dev |
 |[Fix][Connector-V2] Fix parse SqlServer JDBC Url error 
(#8784)|https://github.com/apache/seatunnel/commit/373d2162d3| dev |
 |[Improve][Jdbc] Support upsert for opengauss 
(#8627)|https://github.com/apache/seatunnel/commit/56110bf392| dev |
 |[Improve][Jdbc] Remove useless utils. 
(#8793)|https://github.com/apache/seatunnel/commit/36a7533e85| dev |
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index bd041cc3ca..a1d82cbe2f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -49,12 +49,14 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.sql.Driver;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -82,12 +84,15 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     protected final Map<String, Connection> connectionMap;
 
+    private final String driverClass;
+
     public AbstractJdbcCatalog(
             String catalogName,
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
+            String defaultSchema,
+            String driverClass) {
 
         checkArgument(StringUtils.isNotBlank(username));
         checkArgument(StringUtils.isNotBlank(urlInfo.getUrlWithoutDatabase()));
@@ -100,6 +105,7 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         this.suffix = urlInfo.getSuffix();
         this.defaultSchema = Optional.ofNullable(defaultSchema);
         this.connectionMap = new ConcurrentHashMap<>();
+        this.driverClass = driverClass;
     }
 
     @Override
@@ -116,6 +122,35 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         if (connectionMap.containsKey(url)) {
             return connectionMap.get(url);
         }
+        if (driverClass != null) {
+            log.info("try to find driver {}", driverClass);
+            java.util.Properties info = new java.util.Properties();
+            if (username != null) {
+                info.put("user", username);
+            }
+            if (pwd != null) {
+                info.put("password", pwd);
+            }
+            Enumeration<Driver> drivers = DriverManager.getDrivers();
+            try {
+                // Driver Manager may load the wrong driver, prioritize 
finding the driver by class
+                // name
+                while (drivers.hasMoreElements()) {
+                    Driver driver = drivers.nextElement();
+                    if (StringUtils.equals(driver.getClass().getName(), 
driverClass)) {
+                        try {
+                            Connection connection = driver.connect(url, info);
+                            connectionMap.put(url, connection);
+                            return connection;
+                        } catch (Exception e) {
+                            log.info("try connector failed", e);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.info("find driver error, back to 
DriverManager.getConnection", e);
+            }
+        }
         try {
             Connection connection = DriverManager.getConnection(url, username, 
pwd);
             connectionMap.put(url, connection);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index c412ca9218..53e40c1e9b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -84,4 +84,6 @@ public interface JdbcCatalogOptions {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Create index or not when auto create 
table");
+
+    Option<String> DRIVER = 
Options.key("driver").stringType().noDefaultValue();
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index ee23c1806f..0878dd007f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -64,8 +64,9 @@ public class DamengCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java
index 5dac3764d2..46f47b40cc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalogFactory.java
@@ -52,7 +52,8 @@ public class DamengCatalogFactory implements CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
index b53f9d60a2..bced60c7dd 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
@@ -26,7 +26,8 @@ public class HighGoCatalog extends PostgresCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
index 2ffa6a21b1..a506ce1fff 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
@@ -42,7 +42,8 @@ public class HighGoCatalogFactory implements CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     public String factoryIdentifier() {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
index e91006fdca..6cd6610af2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
@@ -61,8 +61,12 @@ public class IrisCatalog extends AbstractJdbcCatalog {
             "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables 
WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 
'SYSTEM VIEW'";
 
     public IrisCatalog(
-            String catalogName, String username, String password, 
JdbcUrlUtil.UrlInfo urlInfo) {
-        super(catalogName, username, password, urlInfo, null);
+            String catalogName,
+            String username,
+            String password,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String driverClass) {
+        super(catalogName, username, password, urlInfo, null, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java
index 3304575d2a..c8c1747896 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java
@@ -51,7 +51,8 @@ public class IrisCatalogFactory implements CatalogFactory {
                 catalogName,
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
-                urlInfo);
+                urlInfo,
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 323556e137..5ae0eab200 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -62,8 +62,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
     private MySqlTypeConverter typeConverter;
 
     public MySqlCatalog(
-            String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
-        super(catalogName, username, pwd, urlInfo, null);
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, null, driverClass);
         this.version = resolveVersion();
         this.typeConverter = new MySqlTypeConverter(version);
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
index 5330fe0176..8665c6c07c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
@@ -51,7 +51,8 @@ public class MySqlCatalogFactory implements CatalogFactory {
                 catalogName,
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
-                urlInfo);
+                urlInfo,
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
index 01d035e167..c13779b9e1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
@@ -71,13 +71,15 @@ public class OceanBaseCatalogFactory implements 
CatalogFactory {
                     options.get(JdbcCatalogOptions.USERNAME),
                     options.get(JdbcCatalogOptions.PASSWORD),
                     urlInfo,
-                    options.get(JdbcCatalogOptions.SCHEMA));
+                    options.get(JdbcCatalogOptions.SCHEMA),
+                    options.get(JdbcCatalogOptions.DRIVER));
         }
         return new OceanBaseMySqlCatalog(
                 catalogName,
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
-                urlInfo);
+                urlInfo,
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 4d4527653f..86410b573f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -66,8 +66,12 @@ public class OceanBaseMySqlCatalog extends 
AbstractJdbcCatalog {
     private OceanBaseMySqlTypeConverter typeConverter;
 
     public OceanBaseMySqlCatalog(
-            String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
-        super(catalogName, username, pwd, urlInfo, null);
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, null, driverClass);
         this.typeConverter = new OceanBaseMySqlTypeConverter();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
index 328b3f38b3..4557d7667a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
@@ -38,8 +38,9 @@ public class OceanBaseOracleCatalog extends OracleCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
index 805701cc45..3030dd7b22 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
@@ -34,8 +34,9 @@ public class OpenGaussCatalog extends PostgresCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @VisibleForTesting
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
index bff96ff6d3..6d9dd525da 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
@@ -52,7 +52,8 @@ public class OpenGaussCatalogFactory implements 
CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 3a90c61c6d..d7d86c46b1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -81,14 +81,16 @@ public class OracleCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
+            String defaultSchema,
+            String driverClass) {
         this(
                 catalogName,
                 username,
                 pwd,
                 urlInfo,
                 defaultSchema,
-                JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue());
+                JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue(),
+                driverClass);
     }
 
     public OracleCatalog(
@@ -97,8 +99,9 @@ public class OracleCatalog extends AbstractJdbcCatalog {
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
             String defaultSchema,
-            boolean decimalTypeNarrowing) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            boolean decimalTypeNarrowing,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
         this.decimalTypeNarrowing = decimalTypeNarrowing;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
index 2b51d97621..209d307c40 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
@@ -54,7 +54,8 @@ public class OracleCatalogFactory implements CatalogFactory {
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
                 options.get(JdbcCatalogOptions.SCHEMA),
-                options.get(JdbcOptions.DECIMAL_TYPE_NARROWING));
+                options.get(JdbcOptions.DECIMAL_TYPE_NARROWING),
+                options.get(JdbcOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index 4851f3461a..ccb895edc4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -86,8 +86,9 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java
index dc0d416e6f..8fed8675cb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogFactory.java
@@ -52,7 +52,8 @@ public class PostgresCatalogFactory implements CatalogFactory 
{
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
index 76ea97d166..3d130aec46 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
@@ -45,8 +45,9 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String schema) {
-        super(catalogName, username, pwd, urlInfo, schema);
+            String schema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, schema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
index 31409b3b21..44985ef7f4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
@@ -59,7 +59,8 @@ public class RedshiftCatalogFactory implements CatalogFactory 
{
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
index f63c6d7221..bc1157d3cd 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
@@ -85,8 +85,9 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java
index 1f196f3039..4991bc759e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalogFactory.java
@@ -52,7 +52,8 @@ public class SapHanaCatalogFactory implements CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 7c759e2eda..d87823c535 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -64,8 +64,9 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
index fe15b7dde8..153fea6039 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
@@ -45,7 +45,8 @@ public class SqlServerCatalogFactory implements 
CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
index 869a06586e..64fee6c081 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
@@ -23,7 +23,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog
 public class TiDBCatalog extends MySqlCatalog {
 
     public TiDBCatalog(
-            String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
-        super(catalogName, username, pwd, urlInfo);
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, driverClass);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
index a661f9626d..01d07d6502 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
@@ -51,7 +51,8 @@ public class TiDBCatalogFactory implements CatalogFactory {
                 catalogName,
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
-                urlInfo);
+                urlInfo,
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
index 2881280d32..fab09c1852 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -115,8 +115,9 @@ public class XuguCatalog extends AbstractJdbcCatalog {
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
-            String defaultSchema) {
-        super(catalogName, username, pwd, urlInfo, defaultSchema);
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
index ac0f3e24ae..066ff57dd3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
@@ -53,7 +53,8 @@ public class XuguCatalogFactory implements CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcCatalogOptions.DRIVER));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 73773b559b..0d53e3dec0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -403,6 +403,7 @@ public class JdbcCatalogUtils {
                 .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val));
         catalogConfig.put(
                 JdbcOptions.DECIMAL_TYPE_NARROWING.key(), 
config.isDecimalTypeNarrowing());
+        catalogConfig.put(JdbcCatalogOptions.DRIVER.key(), 
config.getDriverName());
         return ReadonlyConfig.fromMap(catalogConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
index b0f6d42235..d64d0710d6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
@@ -52,7 +52,13 @@ public class DamengJdbcTest {
     @BeforeAll
     static void before() {
         DAMENG_CATALOG =
-                new DamengCatalog("DAMENG_CATALOG", "DM_USER01", "Te$Dt_1234", 
DM_URL_INFO, null);
+                new DamengCatalog(
+                        "DAMENG_CATALOG",
+                        "DM_USER01",
+                        "Te$Dt_1234",
+                        DM_URL_INFO,
+                        null,
+                        "dm.jdbc.driver.DmDriver");
         DAMENG_CATALOG.open();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/DriverSelectionTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/DriverSelectionTest.java
new file mode 100644
index 0000000000..90ab8ed4eb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/DriverSelectionTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.driver;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+public class DriverSelectionTest {
+
+    @Test
+    void assertDriver() {
+        String url = "jdbc:mock://127.0.0.1:3306/test?useSSL=false";
+        String driverName = OtherDriver.class.getName();
+        String expectedDriverName = ExpectedDriver.class.getName();
+        JdbcUrlUtil.UrlInfo MysqlUrlInfo = JdbcUrlUtil.getUrlInfo(url);
+        MySqlCatalog mySqlCatalog =
+                new MySqlCatalog("mock", "root", "123456", MysqlUrlInfo, 
expectedDriverName);
+        try {
+            Class.forName(driverName);
+            Class.forName(expectedDriverName);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+        List<String> driverNames = new ArrayList<>();
+        Enumeration<Driver> drivers = DriverManager.getDrivers();
+        while (drivers.hasMoreElements()) {
+            driverNames.add(drivers.nextElement().getClass().getName());
+        }
+        int expectedDriverIndex = driverNames.indexOf(expectedDriverName);
+        int otherDriverIndex = driverNames.indexOf(driverName);
+        assert expectedDriverIndex != -1 : "ExpectedDriver not registered in 
DriverManager";
+        assert otherDriverIndex != -1 : "OtherDriver not registered in 
DriverManager";
+        System.out.println(
+                "expectedDriverIndex is "
+                        + expectedDriverIndex
+                        + " otherDriverIndex is "
+                        + otherDriverIndex);
+        assert expectedDriverIndex > otherDriverIndex
+                : "ExpectedDriver should be registered after OtherDriver, but 
found ExpectedDriver at index "
+                        + expectedDriverIndex
+                        + " and OtherDriver at index "
+                        + otherDriverIndex;
+        /*
+         * This test verifies that even when the driver is registered later in 
the DriverManager's list,
+         * the system can still load the correct jar/driver based on the 
specified driverName parameter.
+         * This ensures that our connection mechanism correctly prioritizes 
explicitly specified drivers
+         * over the default driver discovery order in DriverManager.
+         */
+        Method getConnectionMethod = 
findGetConnectionMethod(mySqlCatalog.getClass());
+        if (getConnectionMethod != null) {
+            getConnectionMethod.setAccessible(true);
+            Connection connection;
+            try {
+                connection = (Connection) 
getConnectionMethod.invoke(mySqlCatalog, url);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                throw new RuntimeException(e);
+            }
+            System.out.println(
+                    "Connection class: "
+                            + connection
+                                    .getClass()
+                                    .getName()
+                                    
.startsWith(ExpectedDriver.class.getName()));
+            assert 
connection.getClass().getName().startsWith(ExpectedDriver.class.getName())
+                    : "Connection should be created by "
+                            + expectedDriverName
+                            + " but was created by a class named "
+                            + connection.getClass().getName();
+        } else {
+            assert false : "Could not find getConnection method";
+        }
+    }
+
+    private Method findGetConnectionMethod(Class<?> clazz) {
+        if (clazz == null) {
+            return null;
+        }
+        try {
+            return clazz.getDeclaredMethod("getConnection", String.class);
+        } catch (NoSuchMethodException e) {
+            return findGetConnectionMethod(clazz.getSuperclass());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/ExpectedDriver.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/ExpectedDriver.java
new file mode 100644
index 0000000000..78e46a8ba4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/ExpectedDriver.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.driver;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
+public class ExpectedDriver implements Driver {
+
+    static {
+        try {
+            DriverManager.registerDriver(new ExpectedDriver());
+        } catch (SQLException e) {
+            throw new RuntimeException("register expected driver error", e);
+        }
+    }
+
+    @Override
+    public Connection connect(String url, Properties info) throws SQLException 
{
+        return new Connection() {
+            @Override
+            public Statement createStatement() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public PreparedStatement prepareStatement(String sql) throws 
SQLException {
+                return null;
+            }
+
+            @Override
+            public CallableStatement prepareCall(String sql) throws 
SQLException {
+                return null;
+            }
+
+            @Override
+            public String nativeSQL(String sql) throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void setAutoCommit(boolean autoCommit) throws SQLException 
{}
+
+            @Override
+            public boolean getAutoCommit() throws SQLException {
+                return false;
+            }
+
+            @Override
+            public void commit() throws SQLException {}
+
+            @Override
+            public void rollback() throws SQLException {}
+
+            @Override
+            public void close() throws SQLException {}
+
+            @Override
+            public boolean isClosed() throws SQLException {
+                return false;
+            }
+
+            @Override
+            public DatabaseMetaData getMetaData() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void setReadOnly(boolean readOnly) throws SQLException {}
+
+            @Override
+            public boolean isReadOnly() throws SQLException {
+                return false;
+            }
+
+            @Override
+            public void setCatalog(String catalog) throws SQLException {}
+
+            @Override
+            public String getCatalog() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void setTransactionIsolation(int level) throws SQLException 
{}
+
+            @Override
+            public int getTransactionIsolation() throws SQLException {
+                return 0;
+            }
+
+            @Override
+            public SQLWarning getWarnings() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void clearWarnings() throws SQLException {}
+
+            @Override
+            public Statement createStatement(int resultSetType, int 
resultSetConcurrency)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public PreparedStatement prepareStatement(
+                    String sql, int resultSetType, int resultSetConcurrency) 
throws SQLException {
+                return null;
+            }
+
+            @Override
+            public CallableStatement prepareCall(
+                    String sql, int resultSetType, int resultSetConcurrency) 
throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Map<String, Class<?>> getTypeMap() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void setTypeMap(Map<String, Class<?>> map) throws 
SQLException {}
+
+            @Override
+            public void setHoldability(int holdability) throws SQLException {}
+
+            @Override
+            public int getHoldability() throws SQLException {
+                return 0;
+            }
+
+            @Override
+            public Savepoint setSavepoint() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Savepoint setSavepoint(String name) throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void rollback(Savepoint savepoint) throws SQLException {}
+
+            @Override
+            public void releaseSavepoint(Savepoint savepoint) throws 
SQLException {}
+
+            @Override
+            public Statement createStatement(
+                    int resultSetType, int resultSetConcurrency, int 
resultSetHoldability)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public PreparedStatement prepareStatement(
+                    String sql,
+                    int resultSetType,
+                    int resultSetConcurrency,
+                    int resultSetHoldability)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public CallableStatement prepareCall(
+                    String sql,
+                    int resultSetType,
+                    int resultSetConcurrency,
+                    int resultSetHoldability)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public PreparedStatement prepareStatement(String sql, int 
autoGeneratedKeys)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public PreparedStatement prepareStatement(String sql, int[] 
columnIndexes)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public PreparedStatement prepareStatement(String sql, String[] 
columnNames)
+                    throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Clob createClob() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Blob createBlob() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public NClob createNClob() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public SQLXML createSQLXML() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public boolean isValid(int timeout) throws SQLException {
+                return false;
+            }
+
+            @Override
+            public void setClientInfo(String name, String value) throws 
SQLClientInfoException {}
+
+            @Override
+            public void setClientInfo(Properties properties) throws 
SQLClientInfoException {}
+
+            @Override
+            public String getClientInfo(String name) throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Properties getClientInfo() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Array createArrayOf(String typeName, Object[] elements) 
throws SQLException {
+                return null;
+            }
+
+            @Override
+            public Struct createStruct(String typeName, Object[] attributes) 
throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void setSchema(String schema) throws SQLException {}
+
+            @Override
+            public String getSchema() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void abort(Executor executor) throws SQLException {}
+
+            @Override
+            public void setNetworkTimeout(Executor executor, int milliseconds)
+                    throws SQLException {}
+
+            @Override
+            public int getNetworkTimeout() throws SQLException {
+                return 0;
+            }
+
+            @Override
+            public <T> T unwrap(Class<T> iface) throws SQLException {
+                return null;
+            }
+
+            @Override
+            public boolean isWrapperFor(Class<?> iface) throws SQLException {
+                return false;
+            }
+        };
+    }
+
+    @Override
+    public boolean acceptsURL(String url) throws SQLException {
+        return url != null && url.startsWith("jdbc:mock");
+    }
+
+    @Override
+    public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) 
throws SQLException {
+        return new DriverPropertyInfo[0];
+    }
+
+    @Override
+    public int getMajorVersion() {
+        return 0;
+    }
+
+    @Override
+    public int getMinorVersion() {
+        return 0;
+    }
+
+    @Override
+    public boolean jdbcCompliant() {
+        return false;
+    }
+
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/OtherDriver.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/OtherDriver.java
new file mode 100644
index 0000000000..a3e1abd72f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/driver/OtherDriver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.driver;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+public class OtherDriver implements Driver {
+
+    static {
+        try {
+            DriverManager.registerDriver(new OtherDriver());
+        } catch (SQLException e) {
+            throw new RuntimeException("register other driver error", e);
+        }
+    }
+
+    @Override
+    public Connection connect(String url, Properties info) throws SQLException 
{
+        return null;
+    }
+
+    @Override
+    public boolean acceptsURL(String url) throws SQLException {
+        return url != null && url.startsWith("jdbc:mock");
+    }
+
+    @Override
+    public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) 
throws SQLException {
+        return new DriverPropertyInfo[0];
+    }
+
+    @Override
+    public int getMajorVersion() {
+        return 0;
+    }
+
+    @Override
+    public int getMinorVersion() {
+        return 0;
+    }
+
+    @Override
+    public boolean jdbcCompliant() {
+        return false;
+    }
+
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
index bc89d4c8c3..f9d18bfa4d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
@@ -75,9 +75,10 @@ class MySqlCatalogTest {
         tablePathMySql = TablePath.of(databaseName, "mysql_to_mysql");
         tablePathPG = TablePath.of(databaseName, "pg_to_mysql");
         tablePathOracle = TablePath.of(databaseName, "oracle_to_mysql");
-        sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", 
sqlParse, null);
-        mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", 
MysqlUrlInfo);
-        postgresCatalog = new PostgresCatalog("postgres", "postgres", 
"postgres", pg, null);
+        sqlServerCatalog =
+                new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, 
null, null);
+        mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", 
MysqlUrlInfo, null);
+        postgresCatalog = new PostgresCatalog("postgres", "postgres", 
"postgres", pg, null, null);
         mySqlCatalog.open();
         sqlServerCatalog.open();
         postgresCatalog.open();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
index 9f4d8e8619..6a3e9939a5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
@@ -40,6 +40,7 @@ class OracleCatalogTest {
                         "test",
                         "oracle",
                         
OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521:xe"),
+                        null,
                         null);
 
         catalog.open();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
index 05a013ef69..d0800c27fa 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
@@ -43,6 +43,7 @@ class PostgresCatalogTest {
                         "pg",
                         "pg#2024",
                         
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/postgres"),
+                        null,
                         null);
 
         catalog.open();
@@ -55,7 +56,8 @@ class PostgresCatalogTest {
                         "mysql",
                         "root",
                         "root@123",
-                        
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest"));
+                        
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest"),
+                        null);
 
         mySqlCatalog.open();
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
index a18cc4abd9..02536510d3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
@@ -66,9 +66,10 @@ class SqlServerCatalogTest {
         tablePathMySql = TablePath.of(databaseName, schemaName, 
"mysql_to_sqlserver");
         tablePathPG = TablePath.of(databaseName, schemaName, 
"pg_to_sqlserver");
         tablePathOracle = TablePath.of(databaseName, schemaName, 
"oracle_to_sqlserver");
-        sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", 
sqlParse, null);
-        mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", 
MysqlUrlInfo);
-        postgresCatalog = new PostgresCatalog("postgres", "postgres", 
"postgres", pg, null);
+        sqlServerCatalog =
+                new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, 
null, null);
+        mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", 
MysqlUrlInfo, null);
+        postgresCatalog = new PostgresCatalog("postgres", "postgres", 
"postgres", pg, null, null);
         mySqlCatalog.open();
         sqlServerCatalog.open();
         postgresCatalog.open();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java
index a6a1ed920d..2466aa023f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java
@@ -415,13 +415,15 @@ public class OracleCDCWithSchemaChangeIT extends 
AbstractOracleCDCIT implements
                                 "mysql",
                                 MYSQL_CONNECTOR_NAME,
                                 MYSQL_CONNECTOR_PASSWORD,
-                                
JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()));
+                                
JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()),
+                                null);
                 OracleCatalog oracleCatalog =
                         new OracleCatalog(
                                 "oracle",
                                 CONNECTOR_USER,
                                 CONNECTOR_PWD,
                                 
OracleURLParser.parse(ORACLE_CONTAINER.getJdbcUrl()),
+                                null,
                                 null)) {
             mySqlCatalog.open();
             oracleCatalog.open();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index feac8d11ca..1cbc4e8491 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -446,7 +446,8 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         JdbcUrlUtil.getUrlInfo(
-                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())));
+                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())),
+                        null);
         catalog.open();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index 7b21e27364..9d1cd79376 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -308,7 +308,8 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         OracleURLParser.parse(jdbcUrl),
-                        SCHEMA);
+                        SCHEMA,
+                        null);
         catalog.open();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
index ca0391b942..82b78151fa 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -315,7 +315,8 @@ public class JdbcOceanBaseMysqlIT extends 
JdbcOceanBaseITBase {
                         USERNAME,
                         PASSWORD,
                         JdbcUrlUtil.getUrlInfo(
-                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())));
+                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())),
+                        null);
         catalog.open();
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
index ebe8416143..aface49198 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
@@ -213,7 +213,8 @@ public class JdbcOceanBaseOracleIT extends 
JdbcOceanBaseITBase {
                         USERNAME,
                         PASSWORD,
                         
JdbcUrlUtil.getUrlInfo(jdbcCase.getJdbcUrl().replace(HOST, HOSTNAME)),
-                        SCHEMA);
+                        SCHEMA,
+                        null);
         catalog.open();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 342e0317e1..916f3be8ef 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -278,7 +278,8 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                         POSTGRESQL_CONTAINER.getUsername(),
                         POSTGRESQL_CONTAINER.getPassword(),
                         
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
-                        schema);
+                        schema,
+                        null);
         postgresCatalog.open();
 
         CatalogTable catalogTable = postgresCatalog.getTable(sourceTablePath);
@@ -361,7 +362,8 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                         POSTGRESQL_CONTAINER.getUsername(),
                         POSTGRESQL_CONTAINER.getPassword(),
                         
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
-                        schema);
+                        schema,
+                        null);
         catalog.open();
 
         TablePath tablePath = new TablePath(databaseName, schema, tableName);
@@ -544,7 +546,8 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                         POSTGRESQL_CONTAINER.getUsername(),
                         POSTGRESQL_CONTAINER.getPassword(),
                         
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
-                        schema);
+                        schema,
+                        null);
         postgresCatalog.open();
         CatalogTable catalogTable = postgresCatalog.getTable(tablePathPG);
         // sink tableExists ?
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index a7055df8e9..ac66d9fb54 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -340,7 +340,8 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                         jdbcCase.getPassword(),
                         SqlServerURLParser.parse(
                                 jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())),
-                        SQLSERVER_SCHEMA);
+                        SQLSERVER_SCHEMA,
+                        null);
         catalog.open();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
index a1db4fadd4..b4ac622a69 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
@@ -270,10 +270,10 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase 
implements TestResourc
         TablePath tablePathPG = TablePath.of("pg", "public", 
"mysql_auto_create_pg");
 
         SqlServerCatalog sqlServerCatalog =
-                new SqlServerCatalog("sqlserver", "sa", PASSWORD, sqlParse, 
"dbo");
-        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", 
PASSWORD, MysqlUrlInfo);
+                new SqlServerCatalog("sqlserver", "sa", PASSWORD, sqlParse, 
"dbo", null);
+        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", 
PASSWORD, MysqlUrlInfo, null);
         PostgresCatalog postgresCatalog =
-                new PostgresCatalog("postgres", "testUser", PASSWORD, pg, 
"public");
+                new PostgresCatalog("postgres", "testUser", PASSWORD, pg, 
"public", null);
 
         mySqlCatalog.open();
         sqlServerCatalog.open();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
index e818d7c8fe..9eb962d44f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
@@ -282,10 +282,10 @@ public class JdbcSqlServerCreateTableIT extends 
TestSuiteBase implements TestRes
         TablePath tablePathPG = TablePath.of("pg", "public", 
"sqlserver_auto_create_pg");
 
         SqlServerCatalog sqlServerCatalog =
-                new SqlServerCatalog("sqlserver", "sa", password, sqlParse, 
"dbo");
-        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", 
PASSWORD, MysqlUrlInfo);
+                new SqlServerCatalog("sqlserver", "sa", password, sqlParse, 
"dbo", null);
+        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", 
PASSWORD, MysqlUrlInfo, null);
         PostgresCatalog postgresCatalog =
-                new PostgresCatalog("postgres", "testUser", PASSWORD, pg, 
"public");
+                new PostgresCatalog("postgres", "testUser", PASSWORD, pg, 
"public", null);
 
         mySqlCatalog.open();
         sqlServerCatalog.open();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
index 2f3a2119c4..506d8ece42 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
@@ -234,7 +234,8 @@ public class JdbcOracleLowercaseTableIT extends 
AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         OracleURLParser.parse(jdbcUrl),
-                        SCHEMA);
+                        SCHEMA,
+                        null);
         catalog.open();
     }
 
@@ -250,7 +251,8 @@ public class JdbcOracleLowercaseTableIT extends 
AbstractJdbcIT {
                         jdbcCase.getPassword(),
                         OracleURLParser.parse(
                                 jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())),
-                        SCHEMA);
+                        SCHEMA,
+                        null);
         oracleCatalog.open();
         catalog.executeSql(
                 tablePathOracle,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
index d3d05fd016..3decf63fde 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
@@ -303,7 +303,8 @@ public class JdbcHighGoIT extends AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         JdbcUrlUtil.getUrlInfo(jdbcUrl),
-                        SCHEMA);
+                        SCHEMA,
+                        null);
         catalog.open();
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
index 609710b2ca..43663b5ca4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
@@ -585,7 +585,8 @@ public class JdbcIrisIT extends AbstractJdbcIT {
                         "iris",
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
-                        JdbcUrlUtil.getUrlInfo(jdbcUrl));
+                        JdbcUrlUtil.getUrlInfo(jdbcUrl),
+                        "com.intersystems.jdbc.IRISDriver");
         // set connection
         ((IrisCatalog) catalog).setConnection(jdbcUrl, connection);
         catalog.open();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java
index 41a357f47a..b1361b58e5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlSaveModeCatalogIT.java
@@ -156,7 +156,8 @@ public class JdbcMySqlSaveModeCatalogIT extends 
TestSuiteBase implements TestRes
     public void testCatalog() {
         TablePath tablePathMySql = TablePath.of("auto", "mysql_auto_create");
         TablePath tablePathMySqlSink = TablePath.of("auto", 
"mysql_auto_create_sink");
-        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", "root", 
MYSQL_PASSWORD, MysqlUrlInfo);
+        MySqlCatalog mySqlCatalog =
+                new MySqlCatalog("mysql", "root", MYSQL_PASSWORD, 
MysqlUrlInfo, null);
         mySqlCatalog.open();
         CatalogTable catalogTable = mySqlCatalog.getTable(tablePathMySql);
         // source comment
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
index 1b6faa0a75..db8f6b758e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
@@ -323,7 +323,8 @@ public class JdbcMysqlSaveModeHandlerIT extends 
AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         JdbcUrlUtil.getUrlInfo(
-                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())));
+                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())),
+                        null);
         catalog.open();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
index 35d29c37fb..6dcc6e51ba 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
@@ -362,7 +362,7 @@ public class JdbcMysqlSplitIT extends TestSuiteBase 
implements TestResource {
 
         TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
         MySqlCatalog mySqlCatalog =
-                new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, 
mysqlUrlInfo);
+                new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, 
mysqlUrlInfo, null);
         mySqlCatalog.open();
         Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql));
         CatalogTable table = mySqlCatalog.getTable(tablePathMySql);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
index 2c237284f0..7fff1a2db1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
@@ -325,7 +325,8 @@ public class JdbcOpenGaussIT extends AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         JdbcUrlUtil.getUrlInfo(jdbcUrl),
-                        SCHEMA);
+                        SCHEMA,
+                        null);
         // set connection
         ((OpenGaussCatalog) catalog).setConnection(jdbcUrl, connection);
         catalog.open();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
index 42f8affe63..ab3d0450bd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
@@ -244,7 +244,8 @@ public class JdbcXuguIT extends AbstractJdbcIT {
                         jdbcCase.getUserName(),
                         jdbcCase.getPassword(),
                         JdbcUrlUtil.getUrlInfo(jdbcUrl),
-                        XUGU_SCHEMA);
+                        XUGU_SCHEMA,
+                        null);
         catalog.open();
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index 0b9f699329..d82ef45278 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -368,7 +368,8 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
                         "mysql",
                         MYSQL_USER_NAME,
                         MYSQL_USER_PASSWORD,
-                        JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()))) 
{
+                        JdbcUrlUtil.getUrlInfo(MYSQL_CONTAINER.getJdbcUrl()),
+                        null)) {
             mySqlCatalog.open();
             CatalogTable mySqlCatalogTable =
                     mySqlCatalog.getTable(TablePath.of(MYSQL_DATABASE, 
SOURCE_TABLE));


Reply via email to