This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dde3104f76 [Improve][Connector-v2][Jdbc] Refactor AbstractJdbcCatalog 
(#5096)
dde3104f76 is described below

commit dde3104f761f9e3f4a298f2ba9a61e2e0dcdbee1
Author: He Wang <[email protected]>
AuthorDate: Wed Sep 6 11:13:10 2023 +0800

    [Improve][Connector-v2][Jdbc] Refactor AbstractJdbcCatalog (#5096)
    
    * refactor jdbc connector AbstractJdbcCatalog
    
    * add testCatalog for pg
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 342 +++++++++++++++++----
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 309 ++++---------------
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |   2 +-
 .../jdbc/catalog/oracle/OracleCatalog.java         | 224 +++-----------
 .../jdbc/catalog/psql/PostgresCatalog.java         | 311 +++----------------
 .../psql/PostgresCreateTableSqlBuilder.java        |   2 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 295 ++++--------------
 .../jdbc/catalog/oracle/OracleCatalogTest.java     |  19 +-
 .../jdbc/catalog/psql/PostgresCatalogTest.java     |   4 +-
 .../sql/MysqlCreateTableSqlBuilderTest.java        |   2 +-
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  |  65 +++-
 .../connectors/seatunnel/jdbc/JdbcCase.java        |   5 +
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     |  17 +
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    |  47 ++-
 .../src/test/resources/sql/oracle_init.sql         |  22 ++
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  |  43 +++
 .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java |  34 +-
 17 files changed, 732 insertions(+), 1011 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 22752ba501..ddc327fbc3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -20,9 +20,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -40,14 +43,19 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -56,6 +64,8 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
 
+    protected static final Set<String> SYS_DATABASES = new HashSet<>();
+
     protected final String catalogName;
     protected final String defaultDatabase;
     protected final String username;
@@ -66,7 +76,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
 
     protected final Optional<String> defaultSchema;
 
-    protected Connection defaultConnection;
+    protected final Map<String, Connection> connectionMap;
 
     public AbstractJdbcCatalog(
             String catalogName,
@@ -88,6 +98,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         this.defaultUrl = urlInfo.getOrigin();
         this.suffix = urlInfo.getSuffix();
         this.defaultSchema = Optional.ofNullable(defaultSchema);
+        this.connectionMap = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -95,51 +106,101 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         return defaultDatabase;
     }
 
-    public String getCatalogName() {
-        return catalogName;
+    protected Connection getConnection(String url) {
+        if (connectionMap.containsKey(url)) {
+            return connectionMap.get(url);
+        }
+        try {
+            Connection connection = DriverManager.getConnection(url, username, 
pwd);
+            connectionMap.put(url, connection);
+            return connection;
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Failed connecting to %s 
via JDBC.", url), e);
+        }
     }
 
-    public String getUsername() {
-        return username;
+    @Override
+    public void open() throws CatalogException {
+        getConnection(defaultUrl);
+        LOG.info("Catalog {} established connection to {}", catalogName, 
defaultUrl);
     }
 
-    public String getPassword() {
-        return pwd;
+    @Override
+    public void close() throws CatalogException {
+        for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (SQLException e) {
+                throw new CatalogException(
+                        String.format("Failed to close %s via JDBC.", 
entry.getKey()), e);
+            }
+        }
+        connectionMap.clear();
+        LOG.info("Catalog {} closing", catalogName);
     }
 
-    public String getBaseUrl() {
-        return baseUrl;
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        throw new UnsupportedOperationException();
     }
 
-    @Override
-    public void open() throws CatalogException {
-        try {
-            defaultConnection = DriverManager.getConnection(defaultUrl, 
username, pwd);
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed connecting to %s via JDBC.", 
defaultUrl), e);
-        }
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
 
-        LOG.info("Catalog {} established connection to {}", catalogName, 
defaultUrl);
+    protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+        return TableIdentifier.of(
+                catalogName,
+                tablePath.getDatabaseName(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
     }
 
-    @Override
-    public void close() throws CatalogException {
-        if (defaultConnection == null) {
-            return;
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
         }
+
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        Connection conn = getConnection(dbUrl);
         try {
-            defaultConnection.close();
-        } catch (SQLException e) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, 
tablePath);
+            List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, 
tablePath);
+            try (PreparedStatement ps = 
conn.prepareStatement(getSelectColumnsSql(tablePath));
+                    ResultSet resultSet = ps.executeQuery()) {
+
+                TableSchema.Builder builder = TableSchema.builder();
+                while (resultSet.next()) {
+                    builder.column(buildColumn(resultSet));
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier = 
getTableIdentifier(tablePath);
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "",
+                        catalogName);
+            }
+
+        } catch (Exception e) {
             throw new CatalogException(
-                    String.format("Failed to close %s via JDBC.", defaultUrl), 
e);
+                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
         }
-        LOG.info("Catalog {} closing", catalogName);
     }
 
-    protected Optional<PrimaryKey> getPrimaryKey(
-            DatabaseMetaData metaData, String database, String table) throws 
SQLException {
-        return getPrimaryKey(metaData, database, table, table);
+    protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        return getPrimaryKey(
+                metaData,
+                tablePath.getDatabaseName(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
     }
 
     protected Optional<PrimaryKey> getPrimaryKey(
@@ -174,9 +235,13 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         return Optional.of(PrimaryKey.of(pkName, pkFields));
     }
 
-    protected List<ConstraintKey> getConstraintKeys(
-            DatabaseMetaData metaData, String database, String table) throws 
SQLException {
-        return getConstraintKeys(metaData, database, table, table);
+    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        return getConstraintKeys(
+                metaData,
+                tablePath.getDatabaseName(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
     }
 
     protected List<ConstraintKey> getConstraintKeys(
@@ -217,16 +282,24 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         return new ArrayList<>(constraintKeyMap.values());
     }
 
-    protected Optional<String> getColumnDefaultValue(
-            DatabaseMetaData metaData, String database, String schema, String 
table, String column)
-            throws SQLException {
-        try (ResultSet resultSet = metaData.getColumns(database, schema, 
table, column)) {
-            while (resultSet.next()) {
-                String defaultValue = resultSet.getString("COLUMN_DEF");
-                return Optional.ofNullable(defaultValue);
-            }
+    protected String getListDatabaseSql() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try {
+            return queryString(
+                    defaultUrl,
+                    getListDatabaseSql(),
+                    rs -> {
+                        String s = rs.getString(1);
+                        return SYS_DATABASES.contains(s) ? null : s;
+                    });
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", 
this.catalogName), e);
         }
-        return Optional.empty();
     }
 
     @Override
@@ -236,11 +309,44 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         return listDatabases().contains(databaseName);
     }
 
+    protected String getListTableSql(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected String getTableName(ResultSet rs) throws SQLException {
+        String schemaName = rs.getString(1);
+        String tableName = rs.getString(2);
+        if (StringUtils.isNotBlank(schemaName) && 
!SYS_DATABASES.contains(schemaName)) {
+            return schemaName + "." + tableName;
+        }
+        return null;
+    }
+
+    protected String getTableName(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try {
+            return queryString(dbUrl, getListTableSql(databaseName), 
this::getTableName);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
     @Override
     public boolean tableExists(TablePath tablePath) throws CatalogException {
         try {
             return databaseExists(tablePath.getDatabaseName())
-                    && 
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+                    && 
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
         } catch (DatabaseNotExistException e) {
             return false;
         }
@@ -261,24 +367,61 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
                             defaultSchema.get(),
                             tablePath.getTableName());
         }
-        if (!createTableInternal(tablePath, table) && !ignoreIfExists) {
+
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            }
             throw new TableAlreadyExistException(catalogName, tablePath);
         }
+
+        createTableInternal(tablePath, table);
+    }
+
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        throw new UnsupportedOperationException();
     }
 
-    protected abstract boolean createTableInternal(TablePath tablePath, 
CatalogTable table)
-            throws CatalogException;
+    protected void createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try {
+            executeInternal(dbUrl, getCreateTableSql(tablePath, table));
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed creating table %s", 
tablePath.getFullName()), e);
+        }
+    }
 
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
         checkNotNull(tablePath, "Table path cannot be null");
-        if (!dropTableInternal(tablePath) && !ignoreIfNotExists) {
+
+        if (!tableExists(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            }
             throw new TableNotExistException(catalogName, tablePath);
         }
+
+        dropTableInternal(tablePath);
     }
 
-    protected abstract boolean dropTableInternal(TablePath tablePath) throws 
CatalogException;
+    protected String getDropTableSql(TablePath tablePath) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void dropTableInternal(TablePath tablePath) throws 
CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try {
+            // Will there exist concurrent drop for one table?
+            executeInternal(dbUrl, getDropTableSql(tablePath));
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", 
tablePath.getFullName()), e);
+        }
+    }
 
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -287,14 +430,42 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         checkNotNull(tablePath.getDatabaseName(), "Database name cannot be 
null");
 
         if (databaseExists(tablePath.getDatabaseName())) {
+            if (ignoreIfExists) {
+                return;
+            }
             throw new DatabaseAlreadyExistException(catalogName, 
tablePath.getDatabaseName());
         }
-        if (!createDatabaseInternal(tablePath.getDatabaseName()) && 
!ignoreIfExists) {
-            throw new DatabaseAlreadyExistException(catalogName, 
tablePath.getDatabaseName());
+
+        createDatabaseInternal(tablePath.getDatabaseName());
+    }
+
+    protected String getCreateDatabaseSql(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void createDatabaseInternal(String databaseName) {
+        try {
+            executeInternal(defaultUrl, getCreateDatabaseSql(databaseName));
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
         }
     }
 
-    protected abstract boolean createDatabaseInternal(String databaseName);
+    protected void closeDatabaseConnection(String databaseName) {
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try {
+            Connection connection = connectionMap.remove(dbUrl);
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Failed to close %s via 
JDBC.", dbUrl), e);
+        }
+    }
 
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
@@ -302,10 +473,77 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         checkNotNull(tablePath, "Table path cannot be null");
         checkNotNull(tablePath.getDatabaseName(), "Database name cannot be 
null");
 
-        if (!dropDatabaseInternal(tablePath.getDatabaseName()) && 
!ignoreIfNotExists) {
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            if (ignoreIfNotExists) {
+                return;
+            }
             throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
         }
+
+        dropDatabaseInternal(tablePath.getDatabaseName());
+    }
+
+    protected String getDropDatabaseSql(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void dropDatabaseInternal(String databaseName) throws 
CatalogException {
+        try {
+            executeInternal(defaultUrl, getDropDatabaseSql(databaseName));
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    protected String getUrlFromDatabaseName(String databaseName) {
+        String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+        return url + databaseName + suffix;
+    }
+
+    protected String getOptionTableName(TablePath tablePath) {
+        return tablePath.getFullName();
+    }
+
+    @SuppressWarnings("MagicNumber")
+    protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put("url", 
getUrlFromDatabaseName(tablePath.getDatabaseName()));
+        options.put("table-name", getOptionTableName(tablePath));
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    @FunctionalInterface
+    public interface ResultSetConsumer<T> {
+        T apply(ResultSet rs) throws SQLException;
+    }
+
+    protected List<String> queryString(String url, String sql, 
ResultSetConsumer<String> consumer)
+            throws SQLException {
+        try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
+            List<String> result = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String value = consumer.apply(rs);
+                if (value != null) {
+                    result.add(value);
+                }
+            }
+            return result;
+        }
     }
 
-    protected abstract boolean dropDatabaseInternal(String databaseName) 
throws CatalogException;
+    // If sql is DDL, the execute() method always returns false, so the return 
value
+    // should not be used to determine whether changes were made in database.
+    protected boolean executeInternal(String url, String sql) throws 
SQLException {
+        try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
+            return ps.execute();
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 267a68f0ee..b558926e45 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -19,47 +19,34 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import com.mysql.cj.MysqlType;
-import com.mysql.cj.jdbc.result.ResultSetImpl;
-import com.mysql.cj.util.StringUtils;
 import lombok.extern.slf4j.Slf4j;
 
-import java.sql.Connection;
 import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
-    protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
-    private final String SELECT_COLUMNS =
+    private static final MysqlDataTypeConvertor DATA_TYPE_CONVERTOR = new 
MysqlDataTypeConvertor();
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
             "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 
'%s' AND TABLE_NAME ='%s'";
 
     static {
@@ -69,137 +56,65 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         SYS_DATABASES.add("sys");
     }
 
-    protected final Map<String, Connection> connectionMap;
-
     public MySqlCatalog(
             String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
         super(catalogName, username, pwd, urlInfo, null);
-        this.connectionMap = new ConcurrentHashMap<>();
     }
 
-    public Connection getConnection(String url) {
-        if (connectionMap.containsKey(url)) {
-            return connectionMap.get(url);
-        }
-        try {
-            Connection connection = DriverManager.getConnection(url, username, 
pwd);
-            connectionMap.put(url, connection);
-            return connection;
-        } catch (SQLException e) {
-            throw new CatalogException(String.format("Failed connecting to %s 
via JDBC.", url), e);
-        }
+    @Override
+    protected String getListDatabaseSql() {
+        return "SHOW DATABASES;";
     }
 
     @Override
-    public void close() throws CatalogException {
-        for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
-            try {
-                entry.getValue().close();
-            } catch (SQLException e) {
-                throw new CatalogException(
-                        String.format("Failed to close %s via JDBC.", 
entry.getKey()), e);
-            }
-        }
-        super.close();
+    protected String getListTableSql(String databaseName) {
+        return "SHOW TABLES;";
     }
 
     @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (PreparedStatement ps = defaultConnection.prepareStatement("SHOW 
DATABASES;")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
this.catalogName), e);
-        }
+    protected String getTableName(ResultSet rs) throws SQLException {
+        return rs.getString(1);
     }
 
     @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        Connection connection = getConnection(dbUrl);
-        try (PreparedStatement ps = connection.prepareStatement("SHOW 
TABLES;")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
catalogName), e);
-        }
+    protected String getTableName(TablePath tablePath) {
+        return tablePath.getTableName();
     }
 
     @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        Connection conn = getConnection(dbUrl);
-        try {
-            DatabaseMetaData metaData = conn.getMetaData();
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
 
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(metaData, tablePath.getDatabaseName(), 
tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData, tablePath.getDatabaseName(), 
tablePath.getTableName());
-            String sql =
-                    String.format(
-                            SELECT_COLUMNS, tablePath.getDatabaseName(), 
tablePath.getTableName());
-            try (PreparedStatement ps = conn.prepareStatement(sql);
-                    ResultSet resultSet = ps.executeQuery(); ) {
+    @Override
+    protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+        return TableIdentifier.of(
+                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
 
-                TableSchema.Builder builder = TableSchema.builder();
-                while (resultSet.next()) {
-                    buildTable(resultSet, builder);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "",
-                        "mysql");
-            }
+    @Override
+    protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        return getPrimaryKey(
+                metaData,
+                tablePath.getDatabaseName(),
+                tablePath.getTableName(),
+                tablePath.getTableName());
+    }
 
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
-        }
+    @Override
+    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        return getConstraintKeys(
+                metaData,
+                tablePath.getDatabaseName(),
+                tablePath.getTableName(),
+                tablePath.getTableName());
     }
 
-    private void buildTable(ResultSet resultSet, TableSchema.Builder builder) 
throws SQLException {
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
         String sourceType = resultSet.getString("COLUMN_TYPE");
         String typeName = resultSet.getString("DATA_TYPE").toUpperCase();
@@ -243,121 +158,39 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                 break;
         }
 
-        PhysicalColumn physicalColumn =
-                PhysicalColumn.of(
-                        columnName,
-                        type,
-                        0,
-                        isNullable,
-                        defaultValue,
-                        comment,
-                        sourceType,
-                        sourceType.contains("unsigned"),
-                        sourceType.contains("zerofill"),
-                        bitLen,
-                        null,
-                        columnLength);
-        builder.column(physicalColumn);
+        return PhysicalColumn.of(
+                columnName,
+                type,
+                0,
+                isNullable,
+                defaultValue,
+                comment,
+                sourceType,
+                sourceType.contains("unsigned"),
+                sourceType.contains("zerofill"),
+                bitLen,
+                null,
+                columnLength);
     }
 
-    public static Map<String, Object> getColumnsDefaultValue(TablePath 
tablePath, Connection conn) {
-        StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
-        queryBuf.append(StringUtils.quoteIdentifier(tablePath.getTableName(), 
"`", false));
-        queryBuf.append(" FROM ");
-        
queryBuf.append(StringUtils.quoteIdentifier(tablePath.getDatabaseName(), "`", 
false));
-        try (PreparedStatement ps2 = 
conn.prepareStatement(queryBuf.toString())) {
-            ResultSet rs = ps2.executeQuery();
-            Map<String, Object> result = new HashMap<>();
-            while (rs.next()) {
-                String field = rs.getString("Field");
-                Object defaultValue = rs.getObject("Default");
-                result.put(field, defaultValue);
-            }
-            return result;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed getting table(%s) columns default value",
-                            tablePath.getFullName()),
-                    e);
-        }
-    }
-
-    // todo: If the origin source is mysql, we can directly use create table 
like to create the
     @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable 
table)
-            throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-
-        String createTableSql =
-                MysqlCreateTableSqlBuilder.builder(tablePath, 
table).build(table.getCatalogName());
-        Connection connection = getConnection(dbUrl);
-        log.info("create table sql: {}", createTableSql);
-        try (PreparedStatement ps = 
connection.prepareStatement(createTableSql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", 
tablePath.getFullName()), e);
-        }
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        return MysqlCreateTableSqlBuilder.builder(tablePath, 
table).build(table.getCatalogName());
     }
 
     @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws 
CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        Connection connection = getConnection(dbUrl);
-        try (PreparedStatement ps =
-                connection.prepareStatement(
-                        String.format("DROP TABLE IF EXISTS %s;", 
tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", 
tablePath.getFullName()), e);
-        }
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format("DROP TABLE %s;", tablePath.getFullName());
     }
 
     @Override
-    protected boolean createDatabaseInternal(String databaseName) throws 
CatalogException {
-        try (PreparedStatement ps =
-                defaultConnection.prepareStatement(
-                        String.format("CREATE DATABASE `%s`;", databaseName))) 
{
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
+    protected String getCreateDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE `%s`;", databaseName);
     }
 
     @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws 
CatalogException {
-        try (PreparedStatement ps =
-                defaultConnection.prepareStatement(
-                        String.format("DROP DATABASE `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    /**
-     * @see com.mysql.cj.MysqlType
-     * @see ResultSetImpl#getObjectStoredProc(int, int)
-     */
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex)
-            throws SQLException {
-        MysqlType mysqlType = 
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
-        Map<String, Object> dataTypeProperties = new HashMap<>();
-        dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, 
metadata.getPrecision(colIndex));
-        dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, 
metadata.getScale(colIndex));
-        return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, 
dataTypeProperties);
+    protected String getDropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE `%s`;", databaseName);
     }
 
     private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision, 
int scale) {
@@ -365,22 +198,6 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, precision);
         dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, scale);
-        return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, 
dataTypeProperties);
-    }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", baseUrl + tablePath.getDatabaseName());
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    private String getUrlFromDatabaseName(String databaseName) {
-        String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
-        return url + databaseName + suffix;
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(mysqlType, 
dataTypeProperties);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 608062fc99..490ecd30ff 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -119,7 +119,7 @@ public class MysqlCreateTableSqlBuilder {
         List<String> sqls = new ArrayList<>();
         sqls.add(
                 String.format(
-                        "CREATE TABLE IF NOT EXISTS %s (\n%s\n)",
+                        "CREATE TABLE %s (\n%s\n)",
                         tableName, buildColumnsIdentifySql(catalogName)));
         if (engine != null) {
             sqls.add("ENGINE = " + engine);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 261f4f7fb6..b90a86a7ab 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -18,33 +18,22 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BFILE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BLOB;
@@ -61,8 +50,10 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.Orac
 
 @Slf4j
 public class OracleCatalog extends AbstractJdbcCatalog {
+
     private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR =
             new OracleDataTypeConvertor();
+
     private static final List<String> EXCLUDED_SCHEMAS =
             Collections.unmodifiableList(
                     Arrays.asList(
@@ -87,7 +78,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
                             "EXFSYS",
                             "SYSMAN"));
 
-    private static final String SELECT_COLUMNS_SQL =
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
             "SELECT\n"
                     + "    cols.COLUMN_NAME,\n"
                     + "    CASE \n"
@@ -127,158 +118,50 @@ public class OracleCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (PreparedStatement ps =
-                defaultConnection.prepareStatement("SELECT name FROM 
v$database")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                databases.add(databaseName);
-            }
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
this.catalogName), e);
-        }
+    protected String getListDatabaseSql() {
+        return "SELECT name FROM v$database";
     }
 
     @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable 
table)
-            throws CatalogException {
-        String createTableSql = new 
OracleCreateTableSqlBuilder(table).build(tablePath);
-        String[] createTableSqls = createTableSql.split(";");
-        for (String sql : createTableSqls) {
-            log.info("create table sql: {}", sql);
-            try (PreparedStatement ps = 
defaultConnection.prepareStatement(sql)) {
-                ps.execute();
-            } catch (Exception e) {
-                throw new CatalogException(
-                        String.format("Failed creating table %s", 
tablePath.getFullName()), e);
-            }
-        }
-        return true;
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        return new OracleCreateTableSqlBuilder(table).build(tablePath);
     }
 
     @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws 
CatalogException {
-        return false;
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format("DROP TABLE %s", getTableName(tablePath));
     }
 
     @Override
-    protected boolean createDatabaseInternal(String databaseName) {
-        return false;
+    protected String getTableName(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName().toUpperCase();
     }
 
     @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws 
CatalogException {
-        return false;
+    protected String getListTableSql(String databaseName) {
+        return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES"
+                + "  WHERE TABLE_NAME NOT LIKE 'MDRT_%'"
+                + "  AND TABLE_NAME NOT LIKE 'MDRS_%'"
+                + "  AND TABLE_NAME NOT LIKE 'MDXT_%'"
+                + "  AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS 
NULL)";
     }
 
     @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            
.contains(tablePath.getSchemaAndTableName().toUpperCase());
-        } catch (DatabaseNotExistException e) {
-            return false;
+    protected String getTableName(ResultSet rs) throws SQLException {
+        if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+            return null;
         }
+        return rs.getString(1) + "." + rs.getString(2);
     }
 
     @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
-        }
-
-        try (PreparedStatement ps =
-                defaultConnection.prepareStatement(
-                        "SELECT OWNER, TABLE_NAME FROM ALL_TABLES\n"
-                                + "WHERE TABLE_NAME NOT LIKE 'MDRT_%'\n"
-                                + "  AND TABLE_NAME NOT LIKE 'MDRS_%'\n"
-                                + "  AND TABLE_NAME NOT LIKE 'MDXT_%'\n"
-                                + "  AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' 
AND IOT_NAME IS NULL)")) {
-
-            ResultSet rs = ps.executeQuery();
-            List<String> tables = new ArrayList<>();
-            while (rs.next()) {
-                if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
-                    continue;
-                }
-                tables.add(rs.getString(1) + "." + rs.getString(2));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
catalogName), e);
-        }
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), 
tablePath.getTableName());
     }
 
     @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        try {
-            DatabaseMetaData metaData = defaultConnection.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            String sql =
-                    String.format(
-                            SELECT_COLUMNS_SQL,
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            try (PreparedStatement ps = 
defaultConnection.prepareStatement(sql);
-                    ResultSet resultSet = ps.executeQuery()) {
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                while (resultSet.next()) {
-                    buildColumn(resultSet, builder);
-                }
-
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
-        }
-    }
-
-    private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) 
throws SQLException {
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
         String typeName = resultSet.getString("TYPE_NAME");
         String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
@@ -314,31 +197,19 @@ public class OracleCatalog extends AbstractJdbcCatalog {
                 break;
         }
 
-        PhysicalColumn physicalColumn =
-                PhysicalColumn.of(
-                        columnName,
-                        type,
-                        0,
-                        isNullable,
-                        defaultValue,
-                        columnComment,
-                        fullTypeName,
-                        false,
-                        false,
-                        bitLen,
-                        null,
-                        columnLength);
-        builder.column(physicalColumn);
-    }
-
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex)
-            throws SQLException {
-        String columnType = metadata.getColumnTypeName(colIndex);
-        Map<String, Object> dataTypeProperties = new HashMap<>();
-        dataTypeProperties.put(OracleDataTypeConvertor.PRECISION, 
metadata.getPrecision(colIndex));
-        dataTypeProperties.put(OracleDataTypeConvertor.SCALE, 
metadata.getScale(colIndex));
-        return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnType, 
dataTypeProperties);
+        return PhysicalColumn.of(
+                columnName,
+                type,
+                0,
+                isNullable,
+                defaultValue,
+                columnComment,
+                fullTypeName,
+                false,
+                false,
+                bitLen,
+                null,
+                columnLength);
     }
 
     private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, 
long scale) {
@@ -348,14 +219,13 @@ public class OracleCatalog extends AbstractJdbcCatalog {
         return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, 
dataTypeProperties);
     }
 
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", baseUrl);
-        options.put("table-name", tablePath.getSchemaAndTableName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
+    @Override
+    protected String getUrlFromDatabaseName(String databaseName) {
+        return defaultUrl;
+    }
+
+    @Override
+    protected String getOptionTableName(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index e3507666d0..2769d09ebb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -18,39 +18,20 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
-import com.mysql.cj.MysqlType;
-import com.mysql.cj.jdbc.result.ResultSetImpl;
 import lombok.extern.slf4j.Slf4j;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor.PG_BIT;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor.PG_BYTEA;
@@ -65,7 +46,10 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.Postgr
 @Slf4j
 public class PostgresCatalog extends AbstractJdbcCatalog {
 
-    private static final String SELECT_COLUMNS_SQL =
+    private static final PostgresDataTypeConvertor DATA_TYPE_CONVERTOR =
+            new PostgresDataTypeConvertor();
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
             "SELECT \n"
                     + "    a.attname AS column_name, \n"
                     + "\t\tt.typname as type_name,\n"
@@ -102,8 +86,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
                     + "ORDER BY \n"
                     + "    a.attnum;";
 
-    protected static final Set<String> SYS_DATABASES = new HashSet<>(9);
-
     static {
         SYS_DATABASES.add("information_schema");
         SYS_DATABASES.add("pg_catalog");
@@ -116,8 +98,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         SYS_DATABASES.add("template1");
     }
 
-    protected final Map<String, Connection> connectionMap;
-
     public PostgresCatalog(
             String catalogName,
             String username,
@@ -125,154 +105,26 @@ public class PostgresCatalog extends AbstractJdbcCatalog 
{
             JdbcUrlUtil.UrlInfo urlInfo,
             String defaultSchema) {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
-        this.connectionMap = new ConcurrentHashMap<>();
-    }
-
-    public Connection getConnection(String url) {
-        if (connectionMap.containsKey(url)) {
-            return connectionMap.get(url);
-        }
-        try {
-            Connection connection = DriverManager.getConnection(url, username, 
pwd);
-            connectionMap.put(url, connection);
-            return connection;
-        } catch (SQLException e) {
-            throw new CatalogException(String.format("Failed connecting to %s 
via JDBC.", url), e);
-        }
     }
 
     @Override
-    public void close() throws CatalogException {
-        for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
-            try {
-                entry.getValue().close();
-            } catch (SQLException e) {
-                throw new CatalogException(
-                        String.format("Failed to close %s via JDBC.", 
entry.getKey()), e);
-            }
-        }
-        super.close();
+    protected String getListDatabaseSql() {
+        return "select datname from pg_database;";
     }
 
     @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (PreparedStatement ps =
-                defaultConnection.prepareStatement("select datname from 
pg_database;")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
this.catalogName), e);
-        }
+    protected String getListTableSql(String databaseName) {
+        return "SELECT table_schema, table_name FROM 
information_schema.tables;";
     }
 
     @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        Connection connection = getConnection(dbUrl);
-        try (PreparedStatement ps =
-                connection.prepareStatement(
-                        "SELECT table_schema, table_name FROM 
information_schema.tables;")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                String schemaName = rs.getString("table_schema");
-                String tableName = rs.getString("table_name");
-                if (org.apache.commons.lang3.StringUtils.isNotBlank(schemaName)
-                        && !SYS_DATABASES.contains(schemaName)) {
-                    tables.add(schemaName + "." + tableName);
-                }
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
catalogName), e);
-        }
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), 
tablePath.getTableName());
     }
 
     @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        Connection conn = getConnection(dbUrl);
-        try {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            String sql =
-                    String.format(
-                            SELECT_COLUMNS_SQL,
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            try (PreparedStatement ps = conn.prepareStatement(sql);
-                    ResultSet resultSet = ps.executeQuery()) {
-                TableSchema.Builder builder = TableSchema.builder();
-
-                // add column
-                while (resultSet.next()) {
-                    buildColumn(resultSet, builder);
-                }
-
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "",
-                        "postgres");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
-        }
-    }
-
-    private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) 
throws SQLException {
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("column_name");
         String typeName = resultSet.getString("type_name");
         String fullTypeName = resultSet.getString("full_type_name");
@@ -282,8 +134,9 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         Object defaultValue = resultSet.getObject("default_value");
         boolean isNullable = resultSet.getString("is_nullable").equals("YES");
 
-        if (defaultValue != null && 
defaultValue.toString().contains("regclass"))
+        if (defaultValue != null && 
defaultValue.toString().contains("regclass")) {
             defaultValue = null;
+        }
 
         SeaTunnelDataType<?> type = fromJdbcType(typeName, columnLength, 
columnScale);
         long bitLen = 0;
@@ -311,131 +164,55 @@ public class PostgresCatalog extends AbstractJdbcCatalog 
{
                 break;
         }
 
-        PhysicalColumn physicalColumn =
-                PhysicalColumn.of(
-                        columnName,
-                        type,
-                        0,
-                        isNullable,
-                        defaultValue,
-                        columnComment,
-                        fullTypeName,
-                        false,
-                        false,
-                        bitLen,
-                        null,
-                        columnLength);
-        builder.column(physicalColumn);
+        return PhysicalColumn.of(
+                columnName,
+                type,
+                0,
+                isNullable,
+                defaultValue,
+                columnComment,
+                fullTypeName,
+                false,
+                false,
+                bitLen,
+                null,
+                columnLength);
     }
 
     @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable 
table)
-            throws CatalogException {
-        String createTableSql = new 
PostgresCreateTableSqlBuilder(table).build(tablePath);
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        Connection conn = getConnection(dbUrl);
-        log.info("create table sql: {}", createTableSql);
-        try (PreparedStatement ps = conn.prepareStatement(createTableSql)) {
-            ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", 
tablePath.getFullName()), e);
-        }
-        return true;
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        return new PostgresCreateTableSqlBuilder(table).build(tablePath);
     }
 
     @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws 
CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-
-        String schemaName = tablePath.getSchemaName();
-        String tableName = tablePath.getTableName();
-
-        String sql = "DROP TABLE IF EXISTS \"" + schemaName + "\".\"" + 
tableName + "\"";
-        Connection connection = getConnection(dbUrl);
-        try (PreparedStatement ps = connection.prepareStatement(sql)) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", 
tablePath.getFullName()), e);
-        }
+    protected String getDropTableSql(TablePath tablePath) {
+        return "DROP TABLE \""
+                + tablePath.getSchemaName()
+                + "\".\""
+                + tablePath.getTableName()
+                + "\"";
     }
 
     @Override
-    protected boolean createDatabaseInternal(String databaseName) throws 
CatalogException {
-        String sql = "CREATE DATABASE \"" + databaseName + "\"";
-        try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
+    protected String getCreateDatabaseSql(String databaseName) {
+        return "CREATE DATABASE \"" + databaseName + "\"";
     }
 
     @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            .contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
+    protected String getDropDatabaseSql(String databaseName) {
+        return "DROP DATABASE \"" + databaseName + "\"";
     }
 
     @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws 
CatalogException {
-        String sql = "DROP DATABASE IF EXISTS \"" + databaseName + "\"";
-        try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    /**
-     * @see MysqlType
-     * @see ResultSetImpl#getObjectStoredProc(int, int)
-     */
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex)
-            throws SQLException {
-        String columnTypeName = metadata.getColumnTypeName(colIndex);
-        Map<String, Object> dataTypeProperties = new HashMap<>();
-        dataTypeProperties.put(
-                PostgresDataTypeConvertor.PRECISION, 
metadata.getPrecision(colIndex));
-        dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, 
metadata.getScale(colIndex));
-        return new PostgresDataTypeConvertor().toSeaTunnelType(columnTypeName, 
dataTypeProperties);
+    protected void dropDatabaseInternal(String databaseName) throws 
CatalogException {
+        closeDatabaseConnection(databaseName);
+        super.dropDatabaseInternal(databaseName);
     }
 
     private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, 
long scale) {
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(PostgresDataTypeConvertor.PRECISION, precision);
         dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, scale);
-        return new PostgresDataTypeConvertor().toSeaTunnelType(typeName, 
dataTypeProperties);
-    }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", baseUrl + tablePath.getDatabaseName());
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    private String getUrlFromDatabaseName(String databaseName) {
-        String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
-        return url + databaseName + suffix;
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, 
dataTypeProperties);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
index 85f4468bef..d423f18301 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
@@ -48,7 +48,7 @@ public class PostgresCreateTableSqlBuilder {
     public String build(TablePath tablePath) {
         StringBuilder createTableSql = new StringBuilder();
         createTableSql
-                .append("CREATE TABLE IF NOT EXISTS ")
+                .append("CREATE TABLE ")
                 .append(tablePath.getSchemaAndTableName())
                 .append(" (\n");
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index ea04c60bff..7d18ed2d90 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,15 +19,10 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -37,33 +32,35 @@ import org.apache.commons.lang3.tuple.Pair;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
 @Slf4j
 public class SqlServerCatalog extends AbstractJdbcCatalog {
 
-    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
-    static {
-        SYS_DATABASES.add("master");
-        SYS_DATABASES.add("tempdb");
-        SYS_DATABASES.add("model");
-        SYS_DATABASES.add("msdb");
-    }
+    private static final SqlServerDataTypeConvertor DATA_TYPE_CONVERTOR =
+            new SqlServerDataTypeConvertor();
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+            "SELECT tbl.name AS table_name,\n"
+                    + "       col.name AS column_name,\n"
+                    + "       ext.value AS comment,\n"
+                    + "       col.column_id AS column_id,\n"
+                    + "       types.name AS type,\n"
+                    + "       col.max_length AS max_length,\n"
+                    + "       col.precision AS precision,\n"
+                    + "       col.scale AS scale,\n"
+                    + "       col.is_nullable AS is_nullable,\n"
+                    + "       def.definition AS default_value\n"
+                    + "FROM sys.tables tbl\n"
+                    + "    INNER JOIN sys.columns col ON tbl.object_id = 
col.object_id\n"
+                    + "    LEFT JOIN sys.types types ON col.user_type_id = 
types.user_type_id\n"
+                    + "    LEFT JOIN sys.extended_properties ext ON 
ext.major_id = col.object_id AND ext.minor_id = col.column_id\n"
+                    + "    LEFT JOIN sys.default_constraints def ON 
col.default_object_id = def.object_id AND ext.minor_id = col.column_id AND 
ext.name = 'MS_Description'\n"
+                    + "WHERE schema_name(tbl.schema_id) = '%s' %s\n"
+                    + "ORDER BY tbl.name, col.column_id";
 
     public SqlServerCatalog(
             String catalogName,
@@ -75,133 +72,29 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM 
sys.databases")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(databaseName);
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
-                                        + databaseName
-                                        + ".INFORMATION_SCHEMA.TABLES WHERE 
TABLE_TYPE = 'BASE TABLE'")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1) + "." + rs.getString(2));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", 
catalogName), e);
-        }
+    protected String getListDatabaseSql() {
+        return "SELECT NAME FROM sys.databases";
     }
 
     @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            .contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
+    protected String getListTableSql(String databaseName) {
+        return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+                + databaseName
+                + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
     }
 
     @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
+    protected String getSelectColumnsSql(TablePath tablePath) {
         String tableSql =
                 StringUtils.isNotEmpty(tablePath.getTableName())
                         ? "AND tbl.name = '" + tablePath.getTableName() + "'"
                         : "";
 
-        String columnSql =
-                String.format(
-                        "    SELECT tbl.name AS table_name, \n           
col.name AS column_name, \n           ext.value AS comment, \n           
col.column_id AS column_id, \n           types.name AS type, \n           
col.max_length AS max_length, \n           col.precision AS precision, \n       
    col.scale AS scale, \n           col.is_nullable AS is_nullable, \n 
def.definition AS default_value\n     FROM sys.tables tbl \nINNER JOIN 
sys.columns col \n        ON tbl.object_id = col. [...]
-                        tablePath.getSchemaName(), tableSql);
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            try (PreparedStatement ps = conn.prepareStatement(columnSql);
-                    ResultSet resultSet = ps.executeQuery(); ) {
-                TableSchema.Builder builder = TableSchema.builder();
-                while (resultSet.next()) {
-                    buildTable(resultSet, builder);
-                }
-
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "",
-                        "sqlserver");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
-        }
+        return String.format(SELECT_COLUMNS_SQL_TEMPLATE, 
tablePath.getSchemaName(), tableSql);
     }
 
-    private void buildTable(ResultSet resultSet, TableSchema.Builder builder) 
throws SQLException {
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("column_name");
         String sourceType = resultSet.getString("type");
         //        String typeName = 
resultSet.getString("DATA_TYPE").toUpperCase();
@@ -266,21 +159,19 @@ public class SqlServerCatalog extends AbstractJdbcCatalog 
{
             default:
                 break;
         }
-        PhysicalColumn physicalColumn =
-                PhysicalColumn.of(
-                        columnName,
-                        type,
-                        0,
-                        isNullable,
-                        defaultValue,
-                        comment,
-                        sourceType,
-                        false,
-                        false,
-                        bitLen,
-                        null,
-                        columnLength);
-        builder.column(physicalColumn);
+        return PhysicalColumn.of(
+                columnName,
+                type,
+                0,
+                isNullable,
+                defaultValue,
+                comment,
+                sourceType,
+                false,
+                false,
+                bitLen,
+                null,
+                columnLength);
     }
 
     private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision, 
int scale) {
@@ -288,103 +179,37 @@ public class SqlServerCatalog extends 
AbstractJdbcCatalog {
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(SqlServerDataTypeConvertor.PRECISION, 
precision);
         dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, scale);
-        return new 
SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), 
dataTypeProperties);
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(pair.getLeft(), 
dataTypeProperties);
     }
 
     @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable 
table)
-            throws CatalogException {
-
-        String createTableSql =
-                SqlServerCreateTableSqlBuilder.builder(tablePath, 
table).build(tablePath, table);
-        log.info("create table sql: {}", createTableSql);
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
-            System.out.println(createTableSql);
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", 
tablePath.getFullName()), e);
-        }
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        return SqlServerCreateTableSqlBuilder.builder(tablePath, 
table).build(tablePath, table);
     }
 
     @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws 
CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format(
-                                        "DROP TABLE IF EXISTS %s", 
tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", 
tablePath.getFullName()), e);
-        }
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format("DROP TABLE %s", tablePath.getFullName());
     }
 
     @Override
-    protected boolean createDatabaseInternal(String databaseName) throws 
CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("CREATE DATABASE `%s`", 
databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
+    protected String getCreateDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE %s", databaseName);
     }
 
     @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws 
CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("DROP DATABASE IF EXISTS `%s`;", 
databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
+    protected String getDropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE %s;", databaseName);
     }
 
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex)
-            throws SQLException {
-        Pair<SqlServerType, Map<String, Object>> pair =
-                SqlServerType.parse(metadata.getColumnTypeName(colIndex));
-        Map<String, Object> dataTypeProperties = new HashMap<>();
-        dataTypeProperties.put(
-                SqlServerDataTypeConvertor.PRECISION, 
metadata.getPrecision(colIndex));
-        dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, 
metadata.getScale(colIndex));
-        return new 
SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), 
dataTypeProperties);
-    }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", 
getUrlFromDatabaseName(tablePath.getDatabaseName()));
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
+    @Override
+    protected void dropDatabaseInternal(String databaseName) throws 
CatalogException {
+        closeDatabaseConnection(databaseName);
+        super.dropDatabaseInternal(databaseName);
     }
 
-    private String getUrlFromDatabaseName(String databaseName) {
+    @Override
+    protected String getUrlFromDatabaseName(String databaseName) {
         return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
     }
-
-    private String getCreateTableSql(TablePath tablePath, CatalogTable table) {
-
-        return "";
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
index 6b8c49bc0a..1c5fb5a2b2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
@@ -19,8 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
 
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -41,25 +39,10 @@ class OracleCatalogTest {
 
         catalog.open();
 
-        MySqlCatalog mySqlCatalog =
-                new MySqlCatalog(
-                        "mysql",
-                        "root",
-                        "root@123",
-                        
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest"));
-
-        mySqlCatalog.open();
-
-        CatalogTable table1 =
-                mySqlCatalog.getTable(TablePath.of("mingdongtest", 
"all_types_table_02"));
-
         List<String> strings = catalog.listDatabases();
-        System.out.println(strings);
-
-        List<String> strings1 = catalog.listTables("XE");
 
         CatalogTable table = catalog.getTable(TablePath.of("XE", "TEST", 
"PG_TYPES_TABLE_CP1"));
 
-        catalog.createTableInternal(new TablePath("XE", "TEST", "TEST003"), 
table);
+        catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table, 
false);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
index badab864fc..6ef4d9e654 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
@@ -53,7 +53,7 @@ class PostgresCatalogTest {
                 catalog.getTable(TablePath.of("st_test", "public", 
"all_types_table_02"));
         System.out.println("find table: " + table);
 
-        catalog.createTableInternal(
-                new TablePath("liulitest", "public", "all_types_table_02"), 
table);
+        catalog.createTable(
+                new TablePath("liulitest", "public", "all_types_table_02"), 
table, false);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
index 3de5c65bf8..b75ac68223 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -93,7 +93,7 @@ public class MysqlCreateTableSqlBuilderTest {
                 MysqlCreateTableSqlBuilder.builder(tablePath, 
catalogTable).build("mysql");
         // create table sql is change; The old unit tests are no longer 
applicable
         String expect =
-                "CREATE TABLE IF NOT EXISTS test_table (\n"
+                "CREATE TABLE test_table (\n"
                         + "\tid null NOT NULL COMMENT 'id', \n"
                         + "\tname null NOT NULL COMMENT 'name', \n"
                         + "\tage null NULL COMMENT 'age', \n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 6528be0e1f..a38fb2217f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -31,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
@@ -76,6 +80,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
     protected GenericContainer<?> dbServer;
     protected JdbcCase jdbcCase;
     protected Connection connection;
+    protected Catalog catalog;
 
     abstract JdbcCase getJdbcCase();
 
@@ -141,12 +146,16 @@ public abstract class AbstractJdbcIT extends 
TestSuiteBase implements TestResour
                     String.format(
                             createTemplate,
                             buildTableInfoWithSchema(
-                                    jdbcCase.getDatabase(), 
jdbcCase.getSourceTable()));
+                                    jdbcCase.getDatabase(),
+                                    jdbcCase.getSchema(),
+                                    jdbcCase.getSourceTable()));
             String createSink =
                     String.format(
                             createTemplate,
                             buildTableInfoWithSchema(
-                                    jdbcCase.getDatabase(), 
jdbcCase.getSinkTable()));
+                                    jdbcCase.getDatabase(),
+                                    jdbcCase.getSchema(),
+                                    jdbcCase.getSinkTable()));
 
             statement.execute(createSource);
             statement.execute(createSink);
@@ -173,6 +182,14 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
                 + ")";
     }
 
+    protected void clearTable(String database, String schema, String table) {
+        clearTable(database, table);
+    }
+
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(database, table);
+    }
+
     public void clearTable(String schema, String table) {
         try (Statement statement = connection.createStatement()) {
             statement.execute("TRUNCATE TABLE " + 
buildTableInfoWithSchema(schema, table));
@@ -215,6 +232,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
         createSchemaIfNeeded();
         createNeededTables();
         insertTestData();
+        initCatalog();
     }
 
     @Override
@@ -226,6 +244,10 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
         if (connection != null) {
             connection.close();
         }
+
+        if (catalog != null) {
+            catalog.close();
+        }
     }
 
     @TestTemplate
@@ -238,6 +260,43 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
         }
 
         compareResult();
-        clearTable(jdbcCase.getDatabase(), jdbcCase.getSinkTable());
+        clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+    }
+
+    protected void initCatalog() {}
+
+    @Test
+    public void testCatalog() {
+        if (catalog == null) {
+            return;
+        }
+
+        TablePath sourceTablePath =
+                new TablePath(
+                        jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSourceTable());
+        TablePath targetTablePath =
+                new TablePath(
+                        jdbcCase.getCatalogDatabase(),
+                        jdbcCase.getCatalogSchema(),
+                        jdbcCase.getCatalogTable());
+        boolean createdDb = false;
+
+        if (!catalog.databaseExists(targetTablePath.getDatabaseName())) {
+            catalog.createDatabase(targetTablePath, false);
+            
Assertions.assertTrue(catalog.databaseExists(targetTablePath.getDatabaseName()));
+            createdDb = true;
+        }
+
+        CatalogTable catalogTable = catalog.getTable(sourceTablePath);
+        catalog.createTable(targetTablePath, catalogTable, false);
+        Assertions.assertTrue(catalog.tableExists(targetTablePath));
+
+        catalog.dropTable(targetTablePath, false);
+        Assertions.assertFalse(catalog.tableExists(targetTablePath));
+
+        if (createdDb) {
+            catalog.dropDatabase(targetTablePath, false);
+            
Assertions.assertFalse(catalog.databaseExists(targetTablePath.getDatabaseName()));
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index 805fcbd16b..5f17eacc51 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -41,6 +41,7 @@ public class JdbcCase {
     private int port;
     private int localPort;
     private String database;
+    private String schema;
     private String sourceTable;
     private String sinkTable;
     private String jdbcTemplate;
@@ -50,4 +51,8 @@ public class JdbcCase {
     private List<String> configFile;
     private Pair<String[], List<SeaTunnelRow>> testData;
     private Map<String, String> containerEnv;
+
+    private String catalogDatabase;
+    private String catalogSchema;
+    private String catalogTable;
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index f4b1338b15..b10aa0c222 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -19,6 +19,8 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -48,6 +50,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
     private static final String MYSQL_DATABASE = "seatunnel";
     private static final String MYSQL_SOURCE = "source";
     private static final String MYSQL_SINK = "sink";
+    private static final String CATALOG_DATABASE = "catalog_database";
 
     private static final String MYSQL_USERNAME = "root";
     private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel";
@@ -138,6 +141,8 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                 .configFile(CONFIG_FILE)
                 .insertSql(insertSql)
                 .testData(testDataSet)
+                .catalogDatabase(CATALOG_DATABASE)
+                .catalogTable(MYSQL_SINK)
                 .build();
     }
 
@@ -282,4 +287,16 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
 
         return container;
     }
+
+    @Override
+    protected void initCatalog() {
+        catalog =
+                new MySqlCatalog(
+                        "mysql",
+                        jdbcCase.getUserName(),
+                        jdbcCase.getPassword(),
+                        JdbcUrlUtil.getUrlInfo(
+                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())));
+        catalog.open();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index d0f8ce3b68..75bdffbd6c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -19,6 +19,8 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -27,6 +29,7 @@ import org.testcontainers.containers.OracleContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
 
 import com.google.common.collect.Lists;
 
@@ -47,11 +50,13 @@ public class JdbcOracleIT extends AbstractJdbcIT {
     private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
     private static final int ORACLE_PORT = 1521;
     private static final String ORACLE_URL = "jdbc:oracle:thin:@" + HOST + 
":%s/%s";
-    private static final String USERNAME = "testUser";
+    private static final String USERNAME = "TESTUSER";
     private static final String PASSWORD = "testPassword";
-    private static final String DATABASE = "TESTUSER";
+    private static final String DATABASE = "XE";
+    private static final String SCHEMA = USERNAME;
     private static final String SOURCE_TABLE = "E2E_TABLE_SOURCE";
     private static final String SINK_TABLE = "E2E_TABLE_SINK";
+    private static final String CATALOG_TABLE = "E2E_TABLE_CATALOG";
     private static final List<String> CONFIG_FILE =
             Lists.newArrayList("/jdbc_oracle_source_to_sink.conf");
 
@@ -78,11 +83,11 @@ public class JdbcOracleIT extends AbstractJdbcIT {
         containerEnv.put("ORACLE_PASSWORD", PASSWORD);
         containerEnv.put("APP_USER", USERNAME);
         containerEnv.put("APP_USER_PASSWORD", PASSWORD);
-        String jdbcUrl = String.format(ORACLE_URL, ORACLE_PORT, DATABASE);
+        String jdbcUrl = String.format(ORACLE_URL, ORACLE_PORT, SCHEMA);
         Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
         String[] fieldNames = testDataSet.getKey();
 
-        String insertSql = insertTable(DATABASE, SOURCE_TABLE, fieldNames);
+        String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames);
 
         return JdbcCase.builder()
                 .dockerImage(ORACLE_IMAGE)
@@ -97,8 +102,12 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                 .userName(USERNAME)
                 .password(PASSWORD)
                 .database(DATABASE)
+                .schema(SCHEMA)
                 .sourceTable(SOURCE_TABLE)
                 .sinkTable(SINK_TABLE)
+                .catalogDatabase(DATABASE)
+                .catalogSchema(SCHEMA)
+                .catalogTable(CATALOG_TABLE)
                 .createSql(CREATE_SQL)
                 .configFile(CONFIG_FILE)
                 .insertSql(insertSql)
@@ -162,9 +171,10 @@ public class JdbcOracleIT extends AbstractJdbcIT {
 
         GenericContainer<?> container =
                 new OracleContainer(imageName)
-                        .withDatabaseName(DATABASE)
-                        .withUsername(USERNAME)
-                        .withPassword(PASSWORD)
+                        .withDatabaseName(SCHEMA)
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("sql/oracle_init.sql"),
+                                "/container-entrypoint-startdb.d/init.sql")
                         .withNetwork(NETWORK)
                         .withNetworkAliases(ORACLE_NETWORK_ALIASES)
                         .withExposedPorts(ORACLE_PORT)
@@ -181,4 +191,27 @@ public class JdbcOracleIT extends AbstractJdbcIT {
     public String quoteIdentifier(String field) {
         return "\"" + field + "\"";
     }
+
+    @Override
+    protected void clearTable(String database, String schema, String table) {
+        clearTable(schema, table);
+    }
+
+    @Override
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(schema, table);
+    }
+
+    @Override
+    protected void initCatalog() {
+        String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost());
+        catalog =
+                new OracleCatalog(
+                        "oracle",
+                        jdbcCase.getUserName(),
+                        jdbcCase.getPassword(),
+                        OracleURLParser.parse(jdbcUrl),
+                        SCHEMA);
+        catalog.open();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql
new file mode 100644
index 0000000000..ba77de271e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+ALTER SESSION SET CONTAINER = TESTUSER;
+
+CREATE USER TESTUSER IDENTIFIED BY testPassword;
+
+GRANT DBA TO TESTUSER;
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index f66ef615d7..a5796c1aaa 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -26,6 +31,7 @@ import 
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -252,6 +258,43 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @Test
+    public void testCatalog() {
+        String schema = "public";
+        String databaseName = POSTGRESQL_CONTAINER.getDatabaseName();
+        String tableName = "pg_e2e_sink_table";
+        String catalogDatabaseName = "pg_e2e_catalog_database";
+        String catalogTableName = "pg_e2e_catalog_table";
+
+        Catalog catalog =
+                new PostgresCatalog(
+                        "postgres",
+                        POSTGRESQL_CONTAINER.getUsername(),
+                        POSTGRESQL_CONTAINER.getPassword(),
+                        
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
+                        schema);
+        catalog.open();
+
+        TablePath tablePath = new TablePath(databaseName, schema, tableName);
+        TablePath catalogTablePath = new TablePath(catalogDatabaseName, 
schema, catalogTableName);
+
+        
Assertions.assertFalse(catalog.databaseExists(catalogTablePath.getDatabaseName()));
+        catalog.createDatabase(catalogTablePath, false);
+        
Assertions.assertTrue(catalog.databaseExists(catalogTablePath.getDatabaseName()));
+
+        CatalogTable catalogTable = catalog.getTable(tablePath);
+        catalog.createTable(catalogTablePath, catalogTable, false);
+        Assertions.assertTrue(catalog.tableExists(catalogTablePath));
+
+        catalog.dropTable(catalogTablePath, false);
+        Assertions.assertFalse(catalog.tableExists(catalogTablePath));
+
+        catalog.dropDatabase(catalogTablePath, false);
+        
Assertions.assertFalse(catalog.databaseExists(catalogTablePath.getDatabaseName()));
+
+        catalog.close();
+    }
+
     private void initializeJdbcTable() {
         try (Connection connection = getJdbcConnection()) {
             Statement statement = connection.createStatement();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index f615b6656e..0a170ff4be 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -44,9 +46,16 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
     private static final String SQLSERVER_CONTAINER_HOST = "sqlserver";
     private static final String SQLSERVER_SOURCE = "source";
     private static final String SQLSERVER_SINK = "sink";
+    private static final String SQLSERVER_DATABASE = "master";
+    private static final String SQLSERVER_SCHEMA = "dbo";
+    private static final String SQLSERVER_CATALOG_DATABASE = "catalog_test";
+
     private static final int SQLSERVER_CONTAINER_PORT = 1433;
     private static final String SQLSERVER_URL =
-            "jdbc:sqlserver://" + AbstractJdbcIT.HOST + ":%s;encrypt=false;";
+            "jdbc:sqlserver://"
+                    + AbstractJdbcIT.HOST
+                    + ":%s;encrypt=false;databaseName="
+                    + SQLSERVER_DATABASE;
     private static final String DRIVER_CLASS = 
"com.microsoft.sqlserver.jdbc.SQLServerDriver";
     private static final List<String> CONFIG_FILE =
             Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf");
@@ -81,8 +90,13 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                 .jdbcUrl(jdbcUrl)
                 .userName(username)
                 .password(password)
+                .database(SQLSERVER_DATABASE)
+                .schema(SQLSERVER_SCHEMA)
                 .sourceTable(SQLSERVER_SOURCE)
                 .sinkTable(SQLSERVER_SINK)
+                .catalogDatabase(SQLSERVER_CATALOG_DATABASE)
+                .catalogSchema(SQLSERVER_SCHEMA)
+                .catalogTable(SQLSERVER_SINK)
                 .createSql(CREATE_SQL)
                 .configFile(CONFIG_FILE)
                 .insertSql(insertSql)
@@ -158,4 +172,22 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
     public void clearTable(String schema, String table) {
         // do nothing.
     }
+
+    @Override
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(schema, table);
+    }
+
+    @Override
+    protected void initCatalog() {
+        catalog =
+                new SqlServerCatalog(
+                        "sqlserver",
+                        jdbcCase.getUserName(),
+                        jdbcCase.getPassword(),
+                        SqlServerURLParser.parse(
+                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())),
+                        SQLSERVER_SCHEMA);
+        catalog.open();
+    }
 }

Reply via email to