This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new f012b2a6f0 [Improve][Connector-v2] Optimize the way of databases and tables are checked for existence (#7261) f012b2a6f0 is described below commit f012b2a6f093bcad49fbd59946e75ce6d6a97838 Author: dailai <dai...@chinatelecom.cn> AuthorDate: Mon Jul 29 21:51:59 2024 +0800 [Improve][Connector-v2] Optimize the way of databases and tables are checked for existence (#7261) --- .../seatunnel/common/exception/CommonError.java | 8 +++ .../common/exception/CommonErrorCode.java | 1 + .../jdbc/catalog/AbstractJdbcCatalog.java | 70 +++++++++++++++++++--- .../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 30 +++++----- .../seatunnel/jdbc/catalog/iris/IrisCatalog.java | 29 +++++---- .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 17 ++++++ .../catalog/oceanbase/OceanBaseOracleCatalog.java | 33 +++++----- .../jdbc/catalog/oracle/OracleCatalog.java | 39 ++++++------ .../jdbc/catalog/psql/PostgresCatalog.java | 35 +++++------ .../jdbc/catalog/redshift/RedshiftCatalog.java | 40 +++++-------- .../jdbc/catalog/saphana/SapHanaCatalog.java | 30 ++++------ .../jdbc/catalog/sqlserver/SqlServerCatalog.java | 29 +++++---- .../seatunnel/jdbc/catalog/xugu/XuguCatalog.java | 32 +++++----- .../jdbc/catalog/mysql/MySqlCatalogTest.java | 15 ++++- .../jdbc/catalog/oracle/OracleCatalogTest.java | 33 ++++++++-- .../jdbc/catalog/psql/PostgresCatalogTest.java | 32 ++++++++-- .../catalog/sqlserver/SqlServerCatalogTest.java | 14 +++-- ..._mysql_source_and_sink_with_multiple_tables.sql | 4 +- 18 files changed, 309 insertions(+), 182 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index 782a071d01..e9adf4d70a 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -285,4 +285,12 @@ public class CommonError { params.put("field", field); return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params); } + + public static SeaTunnelRuntimeException unsupportedMethod( + String identifier, String methodName) { + Map<String, String> params = new HashMap<>(); + params.put("identifier", identifier); + params.put("methodName", methodName); + return new SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_METHOD, params); + } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 5893924848..79621c4216 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -77,6 +77,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { FORMAT_DATETIME_ERROR( "COMMON-33", "The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."), + UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method '<methodName>'"), ; private final String code; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index a033d0eaac..8d0301b492 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -44,6 +44,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; @@ -63,11 +65,14 @@ import java.util.concurrent.ConcurrentHashMap; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD; +@Slf4j public abstract class AbstractJdbcCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); protected static final Set<String> SYS_DATABASES = new HashSet<>(); + protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>(); protected final String catalogName; protected final String defaultDatabase; @@ -259,6 +264,10 @@ public abstract class AbstractJdbcCatalog implements Catalog { throw new UnsupportedOperationException(); } + protected String getDatabaseWithConditionSql(String databaseName) { + throw CommonError.unsupportedMethod(this.catalogName, "getDatabaseWithConditionSql"); + } + @Override public List<String> listDatabases() throws CatalogException { try { @@ -277,15 +286,35 @@ public abstract class AbstractJdbcCatalog implements Catalog { @Override public boolean databaseExists(String databaseName) throws CatalogException { - checkArgument(StringUtils.isNotBlank(databaseName)); - - return listDatabases().contains(databaseName); + if (StringUtils.isBlank(databaseName)) { + return false; + } + if (SYS_DATABASES.contains(databaseName)) { + return false; + } + try { + return querySQLResultExists( + getUrlFromDatabaseName(databaseName), + getDatabaseWithConditionSql(databaseName)); + } catch (SeaTunnelRuntimeException e) { + if (e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) { + log.warn( + "The catalog: {} is not supported the getDatabaseWithConditionSql for databaseExists", + this.catalogName); + return listDatabases().contains(databaseName); + } + throw e; + } } protected String getListTableSql(String databaseName) { throw new UnsupportedOperationException(); } + protected String getTableWithConditionSql(TablePath tablePath) { + throw CommonError.unsupportedMethod(this.catalogName, "getTableWithConditionSql"); + } + protected String getTableName(ResultSet rs) throws SQLException { String schemaName = rs.getString(1); String tableName = rs.getString(2); @@ -317,12 +346,28 @@ public abstract class AbstractJdbcCatalog implements Catalog { @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); - } catch (DatabaseNotExistException e) { + String databaseName = tablePath.getDatabaseName(); + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { return false; } + try { + return querySQLResultExists( + this.getUrlFromDatabaseName(databaseName), getTableWithConditionSql(tablePath)); + } catch (SeaTunnelRuntimeException e1) { + if (e1.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) { + log.warn( + "The catalog: {} is not supported the getTableWithConditionSql for tableExists ", + this.catalogName); + try { + return databaseExists(tablePath.getDatabaseName()) + && listTables(tablePath.getDatabaseName()) + .contains(getTableName(tablePath)); + } catch (DatabaseNotExistException e2) { + return false; + } + } + throw e1; + } } @Override @@ -528,6 +573,17 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } + protected boolean querySQLResultExists(String dbUrl, String sql) { + try (PreparedStatement stmt = getConnection(dbUrl).prepareStatement(sql)) { + try (ResultSet rs = stmt.executeQuery()) { + return rs.next(); + } + } catch (Exception e) { + log.info("query exists error", e); + return false; + } + } + // If sql is DDL, the execute() method always returns false, so the return value // should not be used to determine whether changes were made in database. protected boolean executeInternal(String url, String sql) throws SQLException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java index 3796a76025..ede65bc8a0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java @@ -30,8 +30,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -70,6 +68,20 @@ public class DamengCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where OWNER = '%s' and TABLE_NAME = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT name FROM v$database"; @@ -145,20 +157,6 @@ public class DamengCatalog extends AbstractJdbcCatalog { return tablePath.getSchemaAndTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List<String> listTables() { List<String> databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java index 40f08dc50b..02e58ea857 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java @@ -57,7 +57,7 @@ import static com.google.common.base.Preconditions.checkNotNull; public class IrisCatalog extends AbstractJdbcCatalog { private static final String LIST_TABLES_SQL_TEMPLATE = - "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW';"; + "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW'"; public IrisCatalog( String catalogName, String username, String password, JdbcUrlUtil.UrlInfo urlInfo) { @@ -101,13 +101,6 @@ public class IrisCatalog extends AbstractJdbcCatalog { return schemaName + "." + tableName; } - // @Override - // protected String getSelectColumnsSql(TablePath tablePath) { - // return String.format( - // SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), - // tablePath.getTableName()); - // } - @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); @@ -144,12 +137,24 @@ public class IrisCatalog extends AbstractJdbcCatalog { @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return listTables(tablePath.getSchemaName()) - .contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { return false; } + return querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME = '%s'", + tablePath.getTableName()); + } + + @Override + protected String getUrlFromDatabaseName(String databaseName) { + return defaultUrl; } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index 6b263b0fd4..e2df8ab24b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -51,6 +51,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog { private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC"; + private static final String SELECT_DATABASE_EXISTS = + "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'"; + + private static final String SELECT_TABLE_EXISTS = + "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'"; + static { SYS_DATABASES.add("information_schema"); SYS_DATABASES.add("mysql"); @@ -68,6 +74,17 @@ public class MySqlCatalog extends AbstractJdbcCatalog { this.typeConverter = new MySqlTypeConverter(version); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(SELECT_DATABASE_EXISTS, databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SHOW DATABASES;"; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java index b4ece7db9c..b98f4c4c2b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java @@ -25,8 +25,6 @@ import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; @@ -34,9 +32,10 @@ import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch public class OceanBaseOracleCatalog extends OracleCatalog { static { - EXCLUDED_SCHEMAS = - Collections.unmodifiableList( - Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", "SYS")); + EXCLUDED_SCHEMAS.add("oceanbase"); + EXCLUDED_SCHEMAS.add("LBACSYS"); + EXCLUDED_SCHEMAS.add("ORAAUDITOR"); + EXCLUDED_SCHEMAS.add("SYS"); } public OceanBaseOracleCatalog( @@ -53,6 +52,21 @@ public class OceanBaseOracleCatalog extends OracleCatalog { throw new UnsupportedOperationException(); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { + return false; + } + return querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } + @Override public List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException { @@ -65,15 +79,6 @@ public class OceanBaseOracleCatalog extends OracleCatalog { } } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index b51369e3f5..1430cb387a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,8 +28,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -46,7 +42,7 @@ import java.util.List; @Slf4j public class OracleCatalog extends AbstractJdbcCatalog { - protected static List<String> EXCLUDED_SCHEMAS = + protected static List<String> EXCLUDED_SCHEMAS_ALL = Collections.unmodifiableList( Arrays.asList( "APPQOSSYS", @@ -101,6 +97,10 @@ public class OracleCatalog extends AbstractJdbcCatalog { + "ORDER BY \n" + " cols.column_id \n"; + static { + EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL); + } + public OracleCatalog( String catalogName, String username, @@ -110,6 +110,21 @@ public class OracleCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return getListTableSql(tablePath.getDatabaseName()) + + " and OWNER = '" + + tablePath.getSchemaName() + + "' and table_name = '" + + tablePath.getTableName() + + "'"; + } + @Override protected String getListDatabaseSql() { return "SELECT name FROM v$database"; @@ -191,20 +206,6 @@ public class OracleCatalog extends AbstractJdbcCatalog { return tablePath.getSchemaAndTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List<String> listTables() { List<String> databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index 4697d1999e..d5261e16d5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,7 +29,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.Post import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeMapper; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -104,14 +102,28 @@ public class PostgresCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where datname = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where table_schema = '%s' and table_name= '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { - return "select datname from pg_database;"; + return "select datname from pg_database"; } @Override protected String getListTableSql(String databaseName) { - return "SELECT table_schema, table_name FROM information_schema.tables;"; + return "SELECT table_schema, table_name FROM information_schema.tables"; } @Override @@ -231,21 +243,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog { super.dropDatabaseInternal(databaseName); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - - return listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java index 7b29bbb8ea..064b247337 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -31,23 +30,17 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class RedshiftCatalog extends AbstractJdbcCatalog { - protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>(4); - private final String SELECT_COLUMNS = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC"; @@ -80,6 +73,20 @@ public class RedshiftCatalog extends AbstractJdbcCatalog { this.connectionMap = new ConcurrentHashMap<>(); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where datname = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where table_schema = '%s' and table_name = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override public void close() throws CatalogException { for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) { @@ -95,12 +102,12 @@ public class RedshiftCatalog extends AbstractJdbcCatalog { @Override protected String getListDatabaseSql() { - return "select datname from pg_database;"; + return "select datname from pg_database"; } @Override protected String getListTableSql(String databaseName) { - return "SELECT table_schema, table_name FROM information_schema.tables;"; + return "SELECT table_schema, table_name FROM information_schema.tables"; } @Override @@ -144,21 +151,6 @@ public class RedshiftCatalog extends AbstractJdbcCatalog { return String.format("DROP DATABASE `%s`;", databaseName); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName().toLowerCase()); - } - return listTables(defaultDatabase) - .contains(tablePath.getSchemaAndTableName().toLowerCase()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override protected String getSelectColumnsSql(TablePath tablePath) { return String.format(SELECT_COLUMNS, tablePath.getSchemaName(), tablePath.getTableName()); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java index df6f4b3c24..19b8f668af 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java @@ -22,8 +22,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -31,8 +29,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -113,6 +109,18 @@ public class SapHanaCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where SCHEMA_NAME = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + " and TABLE_NAME = '%s'", + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT SCHEMA_NAME FROM SCHEMAS"; @@ -203,20 +211,6 @@ public class SapHanaCatalog extends AbstractJdbcCatalog { return tablePath.getTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List<String> listTables() { List<String> databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index 55660b36a2..e4c6351522 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -69,6 +68,20 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " and TABLE_SCHEMA = '%s' and TABLE_NAME = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT NAME FROM sys.databases"; @@ -147,20 +160,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { return baseUrl + ";databaseName=" + databaseName + ";" + suffix; } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java index 462e109c76..a0b28e49ab 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java @@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,8 +28,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -128,6 +124,20 @@ public class XuguCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where DB_NAME = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where user_name = '%s' and table_name = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT DB_NAME FROM dba_databases"; @@ -210,20 +220,6 @@ public class XuguCatalog extends AbstractJdbcCatalog { return tablePath.getSchemaAndTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List<String> listTables() { List<String> databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java index daf87b3693..bc89d4c8c3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServe import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; @@ -39,7 +40,8 @@ class MySqlCatalogTest { static JdbcUrlUtil.UrlInfo sqlParse = SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB"); static JdbcUrlUtil.UrlInfo MysqlUrlInfo = - JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false"); + JdbcUrlUtil.getUrlInfo( + "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&allowPublicKeyRetrieval=true"); static JdbcUrlUtil.UrlInfo pg = JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest"); static TablePath tablePathSQL; @@ -74,13 +76,22 @@ class MySqlCatalogTest { tablePathPG = TablePath.of(databaseName, "pg_to_mysql"); tablePathOracle = TablePath.of(databaseName, "oracle_to_mysql"); sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, null); - mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", MysqlUrlInfo); + mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", MysqlUrlInfo); postgresCatalog = new PostgresCatalog("postgres", "postgres", "postgres", pg, null); mySqlCatalog.open(); sqlServerCatalog.open(); postgresCatalog.open(); } + @Test + void exists() { + Assertions.assertTrue(mySqlCatalog.databaseExists("test")); + Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test", "MY_TABLE"))); + Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test", "my_table"))); + Assertions.assertFalse(mySqlCatalog.tableExists(TablePath.of("test", "test"))); + Assertions.assertFalse(mySqlCatalog.databaseExists("mysql")); + } + @Test @Order(1) void getTable() { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java index 1c5fb5a2b2..75b22ec24d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java @@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -27,17 +29,24 @@ import java.util.List; @Disabled("Please Test it in your local environment") class OracleCatalogTest { - @Test - void testCatalog() { - OracleCatalog catalog = + + static OracleCatalog catalog; + + @BeforeAll + static void before() { + catalog = new OracleCatalog( "oracle", - "test", - "oracle", - OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521:xe"), + "c##gguser", + "testdb", + OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521/CDC_PDB"), null); catalog.open(); + } + + @Test + void testCatalog() { List<String> strings = catalog.listDatabases(); @@ -45,4 +54,16 @@ class OracleCatalogTest { catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table, false); } + + @Test + void exist() { + Assertions.assertTrue(catalog.databaseExists("ORCLCDB")); + Assertions.assertTrue(catalog.tableExists(TablePath.of("ORCLCDB", "C##GGUSER", "myTable"))); + Assertions.assertFalse(catalog.databaseExists("ORCL")); + Assertions.assertTrue( + catalog.tableExists( + TablePath.of("ORCLCDB", "CDC_PDB", "ads_index_public_health_data"))); + Assertions.assertTrue( + catalog.tableExists(TablePath.of("ORCLCDB", "CDC_PDB", "ADS_INDEX_DISEASE_DATA"))); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java index c04c1941b0..05a013ef69 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -31,15 +33,23 @@ import lombok.extern.slf4j.Slf4j; @Slf4j class PostgresCatalogTest { - @Test - void testCatalog() { - JdbcUrlUtil.UrlInfo urlInfo = - JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest"); - PostgresCatalog catalog = - new PostgresCatalog("postgres", "postgres", "postgres", urlInfo, null); + static PostgresCatalog catalog; + + @BeforeAll + static void before() { + catalog = + new PostgresCatalog( + "postgres", + "pg", + "pg#2024", + JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/postgres"), + null); catalog.open(); + } + @Test + void testCatalog() { MySqlCatalog mySqlCatalog = new MySqlCatalog( "mysql", @@ -59,4 +69,14 @@ class PostgresCatalogTest { catalog.createTable( new TablePath("liulitest", "public", "all_types_table_02"), table, false); } + + @Test + void exists() { + Assertions.assertFalse(catalog.databaseExists("postgres")); + Assertions.assertFalse( + catalog.tableExists(TablePath.of("postgres", "pg_catalog", "pg_aggregate"))); + Assertions.assertTrue(catalog.databaseExists("zdykdb")); + Assertions.assertTrue( + catalog.tableExists(TablePath.of("zdykdb", "pg_catalog", "pg_class"))); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java index ea305ca0c1..a18cc4abd9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; @@ -38,7 +39,7 @@ import java.util.List; class SqlServerCatalogTest { static JdbcUrlUtil.UrlInfo sqlParse = - SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB"); + SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1433;database=master"); static JdbcUrlUtil.UrlInfo MysqlUrlInfo = JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false"); static JdbcUrlUtil.UrlInfo pg = @@ -84,9 +85,14 @@ class SqlServerCatalogTest { } @Test - void tableExists() { - - // boolean b = sqlServerCatalog.tableExists(tablePath); + void exists() { + Assertions.assertTrue(sqlServerCatalog.databaseExists("master")); + Assertions.assertTrue( + sqlServerCatalog.tableExists( + TablePath.of("master", "dbo", "MSreplication_options"))); + Assertions.assertTrue( + sqlServerCatalog.tableExists(TablePath.of("master", "dbo", "spt_fallback_db"))); + Assertions.assertFalse(sqlServerCatalog.tableExists(TablePath.of("master", "dbo", "xxx"))); } @Test diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql index a9b02e2ae3..8c624959f8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql @@ -55,10 +55,10 @@ CREATE TABLE sink_table WITH ( 'user' = 'root', 'password' = 'Abc!@#135_seatunnel', 'generate_sink_sql' = 'true', - 'database' = 'sink' + 'database' = 'sink', 'table' = '${table_name}' ); -- If it's multi-table synchronization, there's no need to set select columns. -- You can directly use the syntax 'INSERT INTO sink_table SELECT source_table'. -INSERT INTO sink_table SELECT source_table; \ No newline at end of file +INSERT INTO sink_table SELECT source_table;