This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f012b2a6f0 [Improve][Connector-v2] Optimize the way of databases and 
tables are checked for existence (#7261)
f012b2a6f0 is described below

commit f012b2a6f093bcad49fbd59946e75ce6d6a97838
Author: dailai <dai...@chinatelecom.cn>
AuthorDate: Mon Jul 29 21:51:59 2024 +0800

    [Improve][Connector-v2] Optimize the way of databases and tables are 
checked for existence (#7261)
---
 .../seatunnel/common/exception/CommonError.java    |  8 +++
 .../common/exception/CommonErrorCode.java          |  1 +
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 70 +++++++++++++++++++---
 .../seatunnel/jdbc/catalog/dm/DamengCatalog.java   | 30 +++++-----
 .../seatunnel/jdbc/catalog/iris/IrisCatalog.java   | 29 +++++----
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 17 ++++++
 .../catalog/oceanbase/OceanBaseOracleCatalog.java  | 33 +++++-----
 .../jdbc/catalog/oracle/OracleCatalog.java         | 39 ++++++------
 .../jdbc/catalog/psql/PostgresCatalog.java         | 35 +++++------
 .../jdbc/catalog/redshift/RedshiftCatalog.java     | 40 +++++--------
 .../jdbc/catalog/saphana/SapHanaCatalog.java       | 30 ++++------
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 29 +++++----
 .../seatunnel/jdbc/catalog/xugu/XuguCatalog.java   | 32 +++++-----
 .../jdbc/catalog/mysql/MySqlCatalogTest.java       | 15 ++++-
 .../jdbc/catalog/oracle/OracleCatalogTest.java     | 33 ++++++++--
 .../jdbc/catalog/psql/PostgresCatalogTest.java     | 32 ++++++++--
 .../catalog/sqlserver/SqlServerCatalogTest.java    | 14 +++--
 ..._mysql_source_and_sink_with_multiple_tables.sql |  4 +-
 18 files changed, 309 insertions(+), 182 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index 782a071d01..e9adf4d70a 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -285,4 +285,12 @@ public class CommonError {
         params.put("field", field);
         return new 
SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params);
     }
+
+    public static SeaTunnelRuntimeException unsupportedMethod(
+            String identifier, String methodName) {
+        Map<String, String> params = new HashMap<>();
+        params.put("identifier", identifier);
+        params.put("methodName", methodName);
+        return new 
SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_METHOD, params);
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 5893924848..79621c4216 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -77,6 +77,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
     FORMAT_DATETIME_ERROR(
             "COMMON-33",
             "The datetime format '<datetime>' of field '<field>' is not 
supported. Please check the datetime format."),
+    UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method 
'<methodName>'"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index a033d0eaac..8d0301b492 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -44,6 +44,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
@@ -63,11 +65,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;
 
+@Slf4j
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
 
     protected static final Set<String> SYS_DATABASES = new HashSet<>();
+    protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>();
 
     protected final String catalogName;
     protected final String defaultDatabase;
@@ -259,6 +264,10 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         throw new UnsupportedOperationException();
     }
 
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        throw CommonError.unsupportedMethod(this.catalogName, 
"getDatabaseWithConditionSql");
+    }
+
     @Override
     public List<String> listDatabases() throws CatalogException {
         try {
@@ -277,15 +286,35 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     @Override
     public boolean databaseExists(String databaseName) throws CatalogException 
{
-        checkArgument(StringUtils.isNotBlank(databaseName));
-
-        return listDatabases().contains(databaseName);
+        if (StringUtils.isBlank(databaseName)) {
+            return false;
+        }
+        if (SYS_DATABASES.contains(databaseName)) {
+            return false;
+        }
+        try {
+            return querySQLResultExists(
+                    getUrlFromDatabaseName(databaseName),
+                    getDatabaseWithConditionSql(databaseName));
+        } catch (SeaTunnelRuntimeException e) {
+            if 
(e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+                log.warn(
+                        "The catalog: {} is not supported the 
getDatabaseWithConditionSql for databaseExists",
+                        this.catalogName);
+                return listDatabases().contains(databaseName);
+            }
+            throw e;
+        }
     }
 
     protected String getListTableSql(String databaseName) {
         throw new UnsupportedOperationException();
     }
 
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        throw CommonError.unsupportedMethod(this.catalogName, 
"getTableWithConditionSql");
+    }
+
     protected String getTableName(ResultSet rs) throws SQLException {
         String schemaName = rs.getString(1);
         String tableName = rs.getString(2);
@@ -317,12 +346,28 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     @Override
     public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && 
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
-        } catch (DatabaseNotExistException e) {
+        String databaseName = tablePath.getDatabaseName();
+        if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
             return false;
         }
+        try {
+            return querySQLResultExists(
+                    this.getUrlFromDatabaseName(databaseName), 
getTableWithConditionSql(tablePath));
+        } catch (SeaTunnelRuntimeException e1) {
+            if 
(e1.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+                log.warn(
+                        "The catalog: {} is not supported the 
getTableWithConditionSql for tableExists ",
+                        this.catalogName);
+                try {
+                    return databaseExists(tablePath.getDatabaseName())
+                            && listTables(tablePath.getDatabaseName())
+                                    .contains(getTableName(tablePath));
+                } catch (DatabaseNotExistException e2) {
+                    return false;
+                }
+            }
+            throw e1;
+        }
     }
 
     @Override
@@ -528,6 +573,17 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         }
     }
 
+    protected boolean querySQLResultExists(String dbUrl, String sql) {
+        try (PreparedStatement stmt = 
getConnection(dbUrl).prepareStatement(sql)) {
+            try (ResultSet rs = stmt.executeQuery()) {
+                return rs.next();
+            }
+        } catch (Exception e) {
+            log.info("query exists error", e);
+            return false;
+        }
+    }
+
     // If sql is DDL, the execute() method always returns false, so the return 
value
     // should not be used to determine whether changes were made in database.
     protected boolean executeInternal(String url, String sql) throws 
SQLException {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index 3796a76025..ede65bc8a0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -30,8 +30,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeMapper;
 
-import org.apache.commons.lang3.StringUtils;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -70,6 +68,20 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + " where name = '%s'", 
databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + " where OWNER = '%s' and TABLE_NAME = '%s'",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
     @Override
     protected String getListDatabaseSql() {
         return "SELECT name FROM v$database";
@@ -145,20 +157,6 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         return tablePath.getSchemaAndTableName();
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                .contains(tablePath.getSchemaAndTableName());
-            }
-            return listTables().contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     private List<String> listTables() {
         List<String> databases = listDatabases();
         return listTables(databases.get(0));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
index 40f08dc50b..02e58ea857 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
@@ -57,7 +57,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 public class IrisCatalog extends AbstractJdbcCatalog {
 
     private static final String LIST_TABLES_SQL_TEMPLATE =
-            "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables 
WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 
'SYSTEM VIEW';";
+            "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables 
WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 
'SYSTEM VIEW'";
 
     public IrisCatalog(
             String catalogName, String username, String password, 
JdbcUrlUtil.UrlInfo urlInfo) {
@@ -101,13 +101,6 @@ public class IrisCatalog extends AbstractJdbcCatalog {
         return schemaName + "." + tableName;
     }
 
-    //    @Override
-    //    protected String getSelectColumnsSql(TablePath tablePath) {
-    //        return String.format(
-    //                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
-    // tablePath.getTableName());
-    //    }
-
     @Override
     protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
@@ -144,12 +137,24 @@ public class IrisCatalog extends AbstractJdbcCatalog {
 
     @Override
     public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return listTables(tablePath.getSchemaName())
-                    .contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
+        if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
             return false;
         }
+        return querySQLResultExists(
+                this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+                getTableWithConditionSql(tablePath));
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME 
= '%s'",
+                tablePath.getTableName());
+    }
+
+    @Override
+    protected String getUrlFromDatabaseName(String databaseName) {
+        return defaultUrl;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 6b263b0fd4..e2df8ab24b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -51,6 +51,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
     private static final String SELECT_COLUMNS_SQL_TEMPLATE =
             "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 
'%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";
 
+    private static final String SELECT_DATABASE_EXISTS =
+            "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE 
SCHEMA_NAME = '%s'";
+
+    private static final String SELECT_TABLE_EXISTS =
+            "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables 
WHERE table_schema = '%s' AND table_name = '%s'";
+
     static {
         SYS_DATABASES.add("information_schema");
         SYS_DATABASES.add("mysql");
@@ -68,6 +74,17 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         this.typeConverter = new MySqlTypeConverter(version);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(SELECT_DATABASE_EXISTS, databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
     @Override
     protected String getListDatabaseSql() {
         return "SHOW DATABASES;";
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
index b4ece7db9c..b98f4c4c2b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
@@ -25,8 +25,6 @@ import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -34,9 +32,10 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
 public class OceanBaseOracleCatalog extends OracleCatalog {
 
     static {
-        EXCLUDED_SCHEMAS =
-                Collections.unmodifiableList(
-                        Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", 
"SYS"));
+        EXCLUDED_SCHEMAS.add("oceanbase");
+        EXCLUDED_SCHEMAS.add("LBACSYS");
+        EXCLUDED_SCHEMAS.add("ORAAUDITOR");
+        EXCLUDED_SCHEMAS.add("SYS");
     }
 
     public OceanBaseOracleCatalog(
@@ -53,6 +52,21 @@ public class OceanBaseOracleCatalog extends OracleCatalog {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
+            return false;
+        }
+        return querySQLResultExists(
+                this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+                getTableWithConditionSql(tablePath));
+    }
+
     @Override
     public List<String> listTables(String databaseName)
             throws CatalogException, DatabaseNotExistException {
@@ -65,15 +79,6 @@ public class OceanBaseOracleCatalog extends OracleCatalog {
         }
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return 
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     @Override
     public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index b51369e3f5..1430cb387a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -30,8 +28,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;
 
-import org.apache.commons.lang3.StringUtils;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -46,7 +42,7 @@ import java.util.List;
 @Slf4j
 public class OracleCatalog extends AbstractJdbcCatalog {
 
-    protected static List<String> EXCLUDED_SCHEMAS =
+    protected static List<String> EXCLUDED_SCHEMAS_ALL =
             Collections.unmodifiableList(
                     Arrays.asList(
                             "APPQOSSYS",
@@ -101,6 +97,10 @@ public class OracleCatalog extends AbstractJdbcCatalog {
                     + "ORDER BY \n"
                     + "    cols.column_id \n";
 
+    static {
+        EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL);
+    }
+
     public OracleCatalog(
             String catalogName,
             String username,
@@ -110,6 +110,21 @@ public class OracleCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + " where name = '%s'", 
databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return getListTableSql(tablePath.getDatabaseName())
+                + "  and  OWNER = '"
+                + tablePath.getSchemaName()
+                + "' and table_name = '"
+                + tablePath.getTableName()
+                + "'";
+    }
+
     @Override
     protected String getListDatabaseSql() {
         return "SELECT name FROM v$database";
@@ -191,20 +206,6 @@ public class OracleCatalog extends AbstractJdbcCatalog {
         return tablePath.getSchemaAndTableName();
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                .contains(tablePath.getSchemaAndTableName());
-            }
-            return listTables().contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     private List<String> listTables() {
         List<String> databases = listDatabases();
         return listTables(databases.get(0));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index 4697d1999e..d5261e16d5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -30,7 +29,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.Post
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeMapper;
 
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -104,14 +102,28 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + " where datname = '%s'", 
databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + " where table_schema = '%s' and table_name= '%s'",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
     @Override
     protected String getListDatabaseSql() {
-        return "select datname from pg_database;";
+        return "select datname from pg_database";
     }
 
     @Override
     protected String getListTableSql(String databaseName) {
-        return "SELECT table_schema, table_name FROM 
information_schema.tables;";
+        return "SELECT table_schema, table_name FROM 
information_schema.tables";
     }
 
     @Override
@@ -231,21 +243,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         super.dropDatabaseInternal(databaseName);
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                .contains(tablePath.getSchemaAndTableName());
-            }
-
-            return 
listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
         Connection defaultConnection = getConnection(defaultUrl);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
index 7b29bbb8ea..064b247337 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -31,23 +30,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeMapper;
 
-import org.apache.commons.lang3.StringUtils;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
 public class RedshiftCatalog extends AbstractJdbcCatalog {
 
-    protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>(4);
-
     private final String SELECT_COLUMNS =
             "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 
'%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC";
 
@@ -80,6 +73,20 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
         this.connectionMap = new ConcurrentHashMap<>();
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + " where datname = '%s'", 
databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + " where table_schema = '%s' and table_name = '%s'",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
     @Override
     public void close() throws CatalogException {
         for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
@@ -95,12 +102,12 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected String getListDatabaseSql() {
-        return "select datname from pg_database;";
+        return "select datname from pg_database";
     }
 
     @Override
     protected String getListTableSql(String databaseName) {
-        return "SELECT table_schema, table_name FROM 
information_schema.tables;";
+        return "SELECT table_schema, table_name FROM 
information_schema.tables";
     }
 
     @Override
@@ -144,21 +151,6 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
         return String.format("DROP DATABASE `%s`;", databaseName);
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                
.contains(tablePath.getSchemaAndTableName().toLowerCase());
-            }
-            return listTables(defaultDatabase)
-                    .contains(tablePath.getSchemaAndTableName().toLowerCase());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     @Override
     protected String getSelectColumnsSql(TablePath tablePath) {
         return String.format(SELECT_COLUMNS, tablePath.getSchemaName(), 
tablePath.getTableName());
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
index df6f4b3c24..19b8f668af 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
@@ -22,8 +22,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -31,8 +29,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;
 
-import org.apache.commons.lang3.StringUtils;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -113,6 +109,18 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + " where SCHEMA_NAME = 
'%s'", databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName()) + " and 
TABLE_NAME = '%s'",
+                tablePath.getTableName());
+    }
+
     @Override
     protected String getListDatabaseSql() {
         return "SELECT SCHEMA_NAME FROM SCHEMAS";
@@ -203,20 +211,6 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
         return tablePath.getTableName();
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                .contains(tablePath.getTableName());
-            }
-            return listTables().contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     private List<String> listTables() {
         List<String> databases = listDatabases();
         return listTables(databases.get(0));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 55660b36a2..e4c6351522 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -69,6 +68,20 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + "  where name = '%s'", 
databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + "  and  TABLE_SCHEMA = '%s' and TABLE_NAME = '%s'",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
     @Override
     protected String getListDatabaseSql() {
         return "SELECT NAME FROM sys.databases";
@@ -147,20 +160,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                .contains(tablePath.getSchemaAndTableName());
-            }
-            return 
listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
         Connection defaultConnection = getConnection(defaultUrl);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
index 462e109c76..a0b28e49ab 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -30,8 +28,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
 
-import org.apache.commons.lang3.StringUtils;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -128,6 +124,20 @@ public class XuguCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(getListDatabaseSql() + "  where DB_NAME = '%s'", 
databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + "  where user_name = '%s' and table_name = '%s'",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
     @Override
     protected String getListDatabaseSql() {
         return "SELECT DB_NAME FROM dba_databases";
@@ -210,20 +220,6 @@ public class XuguCatalog extends AbstractJdbcCatalog {
         return tablePath.getSchemaAndTableName();
     }
 
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
-                return databaseExists(tablePath.getDatabaseName())
-                        && listTables(tablePath.getDatabaseName())
-                                .contains(tablePath.getSchemaAndTableName());
-            }
-            return listTables().contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     private List<String> listTables() {
         List<String> databases = listDatabases();
         return listTables(databases.get(0));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
index daf87b3693..bc89d4c8c3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServe
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.MethodOrderer;
@@ -39,7 +40,8 @@ class MySqlCatalogTest {
     static JdbcUrlUtil.UrlInfo sqlParse =
             
SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB");
     static JdbcUrlUtil.UrlInfo MysqlUrlInfo =
-            
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false");
+            JdbcUrlUtil.getUrlInfo(
+                    
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false&allowPublicKeyRetrieval=true");
     static JdbcUrlUtil.UrlInfo pg =
             
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest");
     static TablePath tablePathSQL;
@@ -74,13 +76,22 @@ class MySqlCatalogTest {
         tablePathPG = TablePath.of(databaseName, "pg_to_mysql");
         tablePathOracle = TablePath.of(databaseName, "oracle_to_mysql");
         sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", 
sqlParse, null);
-        mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", 
MysqlUrlInfo);
+        mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", 
MysqlUrlInfo);
         postgresCatalog = new PostgresCatalog("postgres", "postgres", 
"postgres", pg, null);
         mySqlCatalog.open();
         sqlServerCatalog.open();
         postgresCatalog.open();
     }
 
+    @Test
+    void exists() {
+        Assertions.assertTrue(mySqlCatalog.databaseExists("test"));
+        Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test", 
"MY_TABLE")));
+        Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test", 
"my_table")));
+        Assertions.assertFalse(mySqlCatalog.tableExists(TablePath.of("test", 
"test")));
+        Assertions.assertFalse(mySqlCatalog.databaseExists("mysql"));
+    }
+
     @Test
     @Order(1)
     void getTable() {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
index 1c5fb5a2b2..75b22ec24d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
@@ -27,17 +29,24 @@ import java.util.List;
 
 @Disabled("Please Test it in your local environment")
 class OracleCatalogTest {
-    @Test
-    void testCatalog() {
-        OracleCatalog catalog =
+
+    static OracleCatalog catalog;
+
+    @BeforeAll
+    static void before() {
+        catalog =
                 new OracleCatalog(
                         "oracle",
-                        "test",
-                        "oracle",
-                        
OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521:xe"),
+                        "c##gguser",
+                        "testdb",
+                        
OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521/CDC_PDB"),
                         null);
 
         catalog.open();
+    }
+
+    @Test
+    void testCatalog() {
 
         List<String> strings = catalog.listDatabases();
 
@@ -45,4 +54,16 @@ class OracleCatalogTest {
 
         catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table, 
false);
     }
+
+    @Test
+    void exist() {
+        Assertions.assertTrue(catalog.databaseExists("ORCLCDB"));
+        Assertions.assertTrue(catalog.tableExists(TablePath.of("ORCLCDB", 
"C##GGUSER", "myTable")));
+        Assertions.assertFalse(catalog.databaseExists("ORCL"));
+        Assertions.assertTrue(
+                catalog.tableExists(
+                        TablePath.of("ORCLCDB", "CDC_PDB", 
"ads_index_public_health_data")));
+        Assertions.assertTrue(
+                catalog.tableExists(TablePath.of("ORCLCDB", "CDC_PDB", 
"ADS_INDEX_DISEASE_DATA")));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
index c04c1941b0..05a013ef69 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
 
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
@@ -31,15 +33,23 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 class PostgresCatalogTest {
 
-    @Test
-    void testCatalog() {
-        JdbcUrlUtil.UrlInfo urlInfo =
-                
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest");
-        PostgresCatalog catalog =
-                new PostgresCatalog("postgres", "postgres", "postgres", 
urlInfo, null);
+    static PostgresCatalog catalog;
+
+    @BeforeAll
+    static void before() {
+        catalog =
+                new PostgresCatalog(
+                        "postgres",
+                        "pg",
+                        "pg#2024",
+                        
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/postgres"),
+                        null);
 
         catalog.open();
+    }
 
+    @Test
+    void testCatalog() {
         MySqlCatalog mySqlCatalog =
                 new MySqlCatalog(
                         "mysql",
@@ -59,4 +69,14 @@ class PostgresCatalogTest {
         catalog.createTable(
                 new TablePath("liulitest", "public", "all_types_table_02"), 
table, false);
     }
+
+    @Test
+    void exists() {
+        Assertions.assertFalse(catalog.databaseExists("postgres"));
+        Assertions.assertFalse(
+                catalog.tableExists(TablePath.of("postgres", "pg_catalog", 
"pg_aggregate")));
+        Assertions.assertTrue(catalog.databaseExists("zdykdb"));
+        Assertions.assertTrue(
+                catalog.tableExists(TablePath.of("zdykdb", "pg_catalog", 
"pg_class")));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
index ea305ca0c1..a18cc4abd9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.MethodOrderer;
@@ -38,7 +39,7 @@ import java.util.List;
 class SqlServerCatalogTest {
 
     static JdbcUrlUtil.UrlInfo sqlParse =
-            
SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB");
+            
SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1433;database=master");
     static JdbcUrlUtil.UrlInfo MysqlUrlInfo =
             
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false");
     static JdbcUrlUtil.UrlInfo pg =
@@ -84,9 +85,14 @@ class SqlServerCatalogTest {
     }
 
     @Test
-    void tableExists() {
-
-        //        boolean b = sqlServerCatalog.tableExists(tablePath);
+    void exists() {
+        Assertions.assertTrue(sqlServerCatalog.databaseExists("master"));
+        Assertions.assertTrue(
+                sqlServerCatalog.tableExists(
+                        TablePath.of("master", "dbo", 
"MSreplication_options")));
+        Assertions.assertTrue(
+                sqlServerCatalog.tableExists(TablePath.of("master", "dbo", 
"spt_fallback_db")));
+        
Assertions.assertFalse(sqlServerCatalog.tableExists(TablePath.of("master", 
"dbo", "xxx")));
     }
 
     @Test
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
index a9b02e2ae3..8c624959f8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
@@ -55,10 +55,10 @@ CREATE TABLE sink_table WITH (
   'user' = 'root',
   'password' = 'Abc!@#135_seatunnel',
   'generate_sink_sql' = 'true',
-  'database' = 'sink'
+  'database' = 'sink',
   'table' = '${table_name}'
 );
 
 -- If it's multi-table synchronization, there's no need to set select columns.
 -- You can directly use the syntax 'INSERT INTO sink_table SELECT 
source_table'.
-INSERT INTO sink_table SELECT source_table;
\ No newline at end of file
+INSERT INTO sink_table SELECT source_table;

Reply via email to