This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dde3104f76 [Improve][Connector-v2][Jdbc] Refactor AbstractJdbcCatalog
(#5096)
dde3104f76 is described below
commit dde3104f761f9e3f4a298f2ba9a61e2e0dcdbee1
Author: He Wang <[email protected]>
AuthorDate: Wed Sep 6 11:13:10 2023 +0800
[Improve][Connector-v2][Jdbc] Refactor AbstractJdbcCatalog (#5096)
* refactor jdbc connector AbstractJdbcCatalog
* add testCatalog for pg
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 342 +++++++++++++++++----
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 309 ++++---------------
.../catalog/mysql/MysqlCreateTableSqlBuilder.java | 2 +-
.../jdbc/catalog/oracle/OracleCatalog.java | 224 +++-----------
.../jdbc/catalog/psql/PostgresCatalog.java | 311 +++----------------
.../psql/PostgresCreateTableSqlBuilder.java | 2 +-
.../jdbc/catalog/sqlserver/SqlServerCatalog.java | 295 ++++--------------
.../jdbc/catalog/oracle/OracleCatalogTest.java | 19 +-
.../jdbc/catalog/psql/PostgresCatalogTest.java | 4 +-
.../sql/MysqlCreateTableSqlBuilderTest.java | 2 +-
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 65 +++-
.../connectors/seatunnel/jdbc/JdbcCase.java | 5 +
.../connectors/seatunnel/jdbc/JdbcMysqlIT.java | 17 +
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 47 ++-
.../src/test/resources/sql/oracle_init.sql | 22 ++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 43 +++
.../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 34 +-
17 files changed, 732 insertions(+), 1011 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 22752ba501..ddc327fbc3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -20,9 +20,12 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -40,14 +43,19 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -56,6 +64,8 @@ import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+ protected static final Set<String> SYS_DATABASES = new HashSet<>();
+
protected final String catalogName;
protected final String defaultDatabase;
protected final String username;
@@ -66,7 +76,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
protected final Optional<String> defaultSchema;
- protected Connection defaultConnection;
+ protected final Map<String, Connection> connectionMap;
public AbstractJdbcCatalog(
String catalogName,
@@ -88,6 +98,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
this.defaultUrl = urlInfo.getOrigin();
this.suffix = urlInfo.getSuffix();
this.defaultSchema = Optional.ofNullable(defaultSchema);
+ this.connectionMap = new ConcurrentHashMap<>();
}
@Override
@@ -95,51 +106,101 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
return defaultDatabase;
}
- public String getCatalogName() {
- return catalogName;
+ protected Connection getConnection(String url) {
+ if (connectionMap.containsKey(url)) {
+ return connectionMap.get(url);
+ }
+ try {
+ Connection connection = DriverManager.getConnection(url, username,
pwd);
+ connectionMap.put(url, connection);
+ return connection;
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed connecting to %s
via JDBC.", url), e);
+ }
}
- public String getUsername() {
- return username;
+ @Override
+ public void open() throws CatalogException {
+ getConnection(defaultUrl);
+ LOG.info("Catalog {} established connection to {}", catalogName,
defaultUrl);
}
- public String getPassword() {
- return pwd;
+ @Override
+ public void close() throws CatalogException {
+ for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed to close %s via JDBC.",
entry.getKey()), e);
+ }
+ }
+ connectionMap.clear();
+ LOG.info("Catalog {} closing", catalogName);
}
- public String getBaseUrl() {
- return baseUrl;
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ throw new UnsupportedOperationException();
}
- @Override
- public void open() throws CatalogException {
- try {
- defaultConnection = DriverManager.getConnection(defaultUrl,
username, pwd);
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed connecting to %s via JDBC.",
defaultUrl), e);
- }
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
- LOG.info("Catalog {} established connection to {}", catalogName,
defaultUrl);
+ protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+ return TableIdentifier.of(
+ catalogName,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
}
- @Override
- public void close() throws CatalogException {
- if (defaultConnection == null) {
- return;
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
}
+
+ String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+ Connection conn = getConnection(dbUrl);
try {
- defaultConnection.close();
- } catch (SQLException e) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData,
tablePath);
+ List<ConstraintKey> constraintKeys = getConstraintKeys(metaData,
tablePath);
+ try (PreparedStatement ps =
conn.prepareStatement(getSelectColumnsSql(tablePath));
+ ResultSet resultSet = ps.executeQuery()) {
+
+ TableSchema.Builder builder = TableSchema.builder();
+ while (resultSet.next()) {
+ builder.column(buildColumn(resultSet));
+ }
+ // add primary key
+ primaryKey.ifPresent(builder::primaryKey);
+ // add constraint key
+ constraintKeys.forEach(builder::constraintKey);
+ TableIdentifier tableIdentifier =
getTableIdentifier(tablePath);
+ return CatalogTable.of(
+ tableIdentifier,
+ builder.build(),
+ buildConnectorOptions(tablePath),
+ Collections.emptyList(),
+ "",
+ catalogName);
+ }
+
+ } catch (Exception e) {
throw new CatalogException(
- String.format("Failed to close %s via JDBC.", defaultUrl),
e);
+ String.format("Failed getting table %s",
tablePath.getFullName()), e);
}
- LOG.info("Catalog {} closing", catalogName);
}
- protected Optional<PrimaryKey> getPrimaryKey(
- DatabaseMetaData metaData, String database, String table) throws
SQLException {
- return getPrimaryKey(metaData, database, table, table);
+ protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ return getPrimaryKey(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
}
protected Optional<PrimaryKey> getPrimaryKey(
@@ -174,9 +235,13 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
return Optional.of(PrimaryKey.of(pkName, pkFields));
}
- protected List<ConstraintKey> getConstraintKeys(
- DatabaseMetaData metaData, String database, String table) throws
SQLException {
- return getConstraintKeys(metaData, database, table, table);
+ protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ return getConstraintKeys(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
}
protected List<ConstraintKey> getConstraintKeys(
@@ -217,16 +282,24 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
return new ArrayList<>(constraintKeyMap.values());
}
- protected Optional<String> getColumnDefaultValue(
- DatabaseMetaData metaData, String database, String schema, String
table, String column)
- throws SQLException {
- try (ResultSet resultSet = metaData.getColumns(database, schema,
table, column)) {
- while (resultSet.next()) {
- String defaultValue = resultSet.getString("COLUMN_DEF");
- return Optional.ofNullable(defaultValue);
- }
+ protected String getListDatabaseSql() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ return queryString(
+ defaultUrl,
+ getListDatabaseSql(),
+ rs -> {
+ String s = rs.getString(1);
+ return SYS_DATABASES.contains(s) ? null : s;
+ });
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
this.catalogName), e);
}
- return Optional.empty();
}
@Override
@@ -236,11 +309,44 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
return listDatabases().contains(databaseName);
}
+ protected String getListTableSql(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected String getTableName(ResultSet rs) throws SQLException {
+ String schemaName = rs.getString(1);
+ String tableName = rs.getString(2);
+ if (StringUtils.isNotBlank(schemaName) &&
!SYS_DATABASES.contains(schemaName)) {
+ return schemaName + "." + tableName;
+ }
+ return null;
+ }
+
+ protected String getTableName(TablePath tablePath) {
+ return tablePath.getSchemaAndTableName();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+
+ String dbUrl = getUrlFromDatabaseName(databaseName);
+ try {
+ return queryString(dbUrl, getListTableSql(databaseName),
this::getTableName);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
- &&
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+ &&
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
} catch (DatabaseNotExistException e) {
return false;
}
@@ -261,24 +367,61 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
defaultSchema.get(),
tablePath.getTableName());
}
- if (!createTableInternal(tablePath, table) && !ignoreIfExists) {
+
+ if (tableExists(tablePath)) {
+ if (ignoreIfExists) {
+ return;
+ }
throw new TableAlreadyExistException(catalogName, tablePath);
}
+
+ createTableInternal(tablePath, table);
+ }
+
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ throw new UnsupportedOperationException();
}
- protected abstract boolean createTableInternal(TablePath tablePath,
CatalogTable table)
- throws CatalogException;
+ protected void createTableInternal(TablePath tablePath, CatalogTable table)
+ throws CatalogException {
+ String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+ try {
+ executeInternal(dbUrl, getCreateTableSql(tablePath, table));
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed creating table %s",
tablePath.getFullName()), e);
+ }
+ }
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
- if (!dropTableInternal(tablePath) && !ignoreIfNotExists) {
+
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
throw new TableNotExistException(catalogName, tablePath);
}
+
+ dropTableInternal(tablePath);
}
- protected abstract boolean dropTableInternal(TablePath tablePath) throws
CatalogException;
+ protected String getDropTableSql(TablePath tablePath) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void dropTableInternal(TablePath tablePath) throws
CatalogException {
+ String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+ try {
+ // Will there exist concurrent drop for one table?
+ executeInternal(dbUrl, getDropTableSql(tablePath));
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed dropping table %s",
tablePath.getFullName()), e);
+ }
+ }
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -287,14 +430,42 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
checkNotNull(tablePath.getDatabaseName(), "Database name cannot be
null");
if (databaseExists(tablePath.getDatabaseName())) {
+ if (ignoreIfExists) {
+ return;
+ }
throw new DatabaseAlreadyExistException(catalogName,
tablePath.getDatabaseName());
}
- if (!createDatabaseInternal(tablePath.getDatabaseName()) &&
!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(catalogName,
tablePath.getDatabaseName());
+
+ createDatabaseInternal(tablePath.getDatabaseName());
+ }
+
+ protected String getCreateDatabaseSql(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void createDatabaseInternal(String databaseName) {
+ try {
+ executeInternal(defaultUrl, getCreateDatabaseSql(databaseName));
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed creating database %s in catalog %s",
+ databaseName, this.catalogName),
+ e);
}
}
- protected abstract boolean createDatabaseInternal(String databaseName);
+ protected void closeDatabaseConnection(String databaseName) {
+ String dbUrl = getUrlFromDatabaseName(databaseName);
+ try {
+ Connection connection = connectionMap.remove(dbUrl);
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed to close %s via
JDBC.", dbUrl), e);
+ }
+ }
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
@@ -302,10 +473,77 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
checkNotNull(tablePath, "Table path cannot be null");
checkNotNull(tablePath.getDatabaseName(), "Database name cannot be
null");
- if (!dropDatabaseInternal(tablePath.getDatabaseName()) &&
!ignoreIfNotExists) {
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ if (ignoreIfNotExists) {
+ return;
+ }
throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
}
+
+ dropDatabaseInternal(tablePath.getDatabaseName());
+ }
+
+ protected String getDropDatabaseSql(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void dropDatabaseInternal(String databaseName) throws
CatalogException {
+ try {
+ executeInternal(defaultUrl, getDropDatabaseSql(databaseName));
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed dropping database %s in catalog %s",
+ databaseName, this.catalogName),
+ e);
+ }
+ }
+
+ protected String getUrlFromDatabaseName(String databaseName) {
+ String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+ return url + databaseName + suffix;
+ }
+
+ protected String getOptionTableName(TablePath tablePath) {
+ return tablePath.getFullName();
+ }
+
+ @SuppressWarnings("MagicNumber")
+ protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
+ Map<String, String> options = new HashMap<>(8);
+ options.put("connector", "jdbc");
+ options.put("url",
getUrlFromDatabaseName(tablePath.getDatabaseName()));
+ options.put("table-name", getOptionTableName(tablePath));
+ options.put("username", username);
+ options.put("password", pwd);
+ return options;
+ }
+
+ @FunctionalInterface
+ public interface ResultSetConsumer<T> {
+ T apply(ResultSet rs) throws SQLException;
+ }
+
+ protected List<String> queryString(String url, String sql,
ResultSetConsumer<String> consumer)
+ throws SQLException {
+ try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
+ List<String> result = new ArrayList<>();
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String value = consumer.apply(rs);
+ if (value != null) {
+ result.add(value);
+ }
+ }
+ return result;
+ }
}
- protected abstract boolean dropDatabaseInternal(String databaseName)
throws CatalogException;
+ // If sql is DDL, the execute() method always returns false, so the return
value
+ // should not be used to determine whether changes were made in database.
+ protected boolean executeInternal(String url, String sql) throws
SQLException {
+ try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
+ return ps.execute();
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 267a68f0ee..b558926e45 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -19,47 +19,34 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import com.mysql.cj.MysqlType;
-import com.mysql.cj.jdbc.result.ResultSetImpl;
-import com.mysql.cj.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class MySqlCatalog extends AbstractJdbcCatalog {
- protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
- private final String SELECT_COLUMNS =
+ private static final MysqlDataTypeConvertor DATA_TYPE_CONVERTOR = new
MysqlDataTypeConvertor();
+
+ private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA =
'%s' AND TABLE_NAME ='%s'";
static {
@@ -69,137 +56,65 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
SYS_DATABASES.add("sys");
}
- protected final Map<String, Connection> connectionMap;
-
public MySqlCatalog(
String catalogName, String username, String pwd,
JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo, null);
- this.connectionMap = new ConcurrentHashMap<>();
}
- public Connection getConnection(String url) {
- if (connectionMap.containsKey(url)) {
- return connectionMap.get(url);
- }
- try {
- Connection connection = DriverManager.getConnection(url, username,
pwd);
- connectionMap.put(url, connection);
- return connection;
- } catch (SQLException e) {
- throw new CatalogException(String.format("Failed connecting to %s
via JDBC.", url), e);
- }
+ @Override
+ protected String getListDatabaseSql() {
+ return "SHOW DATABASES;";
}
@Override
- public void close() throws CatalogException {
- for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
- try {
- entry.getValue().close();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed to close %s via JDBC.",
entry.getKey()), e);
- }
- }
- super.close();
+ protected String getListTableSql(String databaseName) {
+ return "SHOW TABLES;";
}
@Override
- public List<String> listDatabases() throws CatalogException {
- try (PreparedStatement ps = defaultConnection.prepareStatement("SHOW
DATABASES;")) {
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- if (!SYS_DATABASES.contains(databaseName)) {
- databases.add(rs.getString(1));
- }
- }
-
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
this.catalogName), e);
- }
+ protected String getTableName(ResultSet rs) throws SQLException {
+ return rs.getString(1);
}
@Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName,
databaseName);
- }
-
- String dbUrl = getUrlFromDatabaseName(databaseName);
- Connection connection = getConnection(dbUrl);
- try (PreparedStatement ps = connection.prepareStatement("SHOW
TABLES;")) {
-
- ResultSet rs = ps.executeQuery();
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- tables.add(rs.getString(1));
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
catalogName), e);
- }
+ protected String getTableName(TablePath tablePath) {
+ return tablePath.getTableName();
}
@Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- Connection conn = getConnection(dbUrl);
- try {
- DatabaseMetaData metaData = conn.getMetaData();
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ return String.format(
+ SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(metaData, tablePath.getDatabaseName(),
tablePath.getTableName());
- List<ConstraintKey> constraintKeys =
- getConstraintKeys(
- metaData, tablePath.getDatabaseName(),
tablePath.getTableName());
- String sql =
- String.format(
- SELECT_COLUMNS, tablePath.getDatabaseName(),
tablePath.getTableName());
- try (PreparedStatement ps = conn.prepareStatement(sql);
- ResultSet resultSet = ps.executeQuery(); ) {
+ @Override
+ protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+ return TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
- TableSchema.Builder builder = TableSchema.builder();
- while (resultSet.next()) {
- buildTable(resultSet, builder);
- }
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "",
- "mysql");
- }
+ @Override
+ protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ return getPrimaryKey(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ tablePath.getTableName());
+ }
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s",
tablePath.getFullName()), e);
- }
+ @Override
+ protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ return getConstraintKeys(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ tablePath.getTableName());
}
- private void buildTable(ResultSet resultSet, TableSchema.Builder builder)
throws SQLException {
+ @Override
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String sourceType = resultSet.getString("COLUMN_TYPE");
String typeName = resultSet.getString("DATA_TYPE").toUpperCase();
@@ -243,121 +158,39 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
break;
}
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- columnName,
- type,
- 0,
- isNullable,
- defaultValue,
- comment,
- sourceType,
- sourceType.contains("unsigned"),
- sourceType.contains("zerofill"),
- bitLen,
- null,
- columnLength);
- builder.column(physicalColumn);
+ return PhysicalColumn.of(
+ columnName,
+ type,
+ 0,
+ isNullable,
+ defaultValue,
+ comment,
+ sourceType,
+ sourceType.contains("unsigned"),
+ sourceType.contains("zerofill"),
+ bitLen,
+ null,
+ columnLength);
}
- public static Map<String, Object> getColumnsDefaultValue(TablePath
tablePath, Connection conn) {
- StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
- queryBuf.append(StringUtils.quoteIdentifier(tablePath.getTableName(),
"`", false));
- queryBuf.append(" FROM ");
-
queryBuf.append(StringUtils.quoteIdentifier(tablePath.getDatabaseName(), "`",
false));
- try (PreparedStatement ps2 =
conn.prepareStatement(queryBuf.toString())) {
- ResultSet rs = ps2.executeQuery();
- Map<String, Object> result = new HashMap<>();
- while (rs.next()) {
- String field = rs.getString("Field");
- Object defaultValue = rs.getObject("Default");
- result.put(field, defaultValue);
- }
- return result;
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed getting table(%s) columns default value",
- tablePath.getFullName()),
- e);
- }
- }
-
- // todo: If the origin source is mysql, we can directly use create table
like to create the
@Override
- protected boolean createTableInternal(TablePath tablePath, CatalogTable
table)
- throws CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-
- String createTableSql =
- MysqlCreateTableSqlBuilder.builder(tablePath,
table).build(table.getCatalogName());
- Connection connection = getConnection(dbUrl);
- log.info("create table sql: {}", createTableSql);
- try (PreparedStatement ps =
connection.prepareStatement(createTableSql)) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed creating table %s",
tablePath.getFullName()), e);
- }
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ return MysqlCreateTableSqlBuilder.builder(tablePath,
table).build(table.getCatalogName());
}
@Override
- protected boolean dropTableInternal(TablePath tablePath) throws
CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- Connection connection = getConnection(dbUrl);
- try (PreparedStatement ps =
- connection.prepareStatement(
- String.format("DROP TABLE IF EXISTS %s;",
tablePath.getFullName()))) {
- // Will there exist concurrent drop for one table?
- return ps.execute();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed dropping table %s",
tablePath.getFullName()), e);
- }
+ protected String getDropTableSql(TablePath tablePath) {
+ return String.format("DROP TABLE %s;", tablePath.getFullName());
}
@Override
- protected boolean createDatabaseInternal(String databaseName) throws
CatalogException {
- try (PreparedStatement ps =
- defaultConnection.prepareStatement(
- String.format("CREATE DATABASE `%s`;", databaseName)))
{
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed creating database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
+ protected String getCreateDatabaseSql(String databaseName) {
+ return String.format("CREATE DATABASE `%s`;", databaseName);
}
@Override
- protected boolean dropDatabaseInternal(String databaseName) throws
CatalogException {
- try (PreparedStatement ps =
- defaultConnection.prepareStatement(
- String.format("DROP DATABASE `%s`;", databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed dropping database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
- }
-
- /**
- * @see com.mysql.cj.MysqlType
- * @see ResultSetImpl#getObjectStoredProc(int, int)
- */
- @SuppressWarnings("unchecked")
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex)
- throws SQLException {
- MysqlType mysqlType =
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
- Map<String, Object> dataTypeProperties = new HashMap<>();
- dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION,
metadata.getPrecision(colIndex));
- dataTypeProperties.put(MysqlDataTypeConvertor.SCALE,
metadata.getScale(colIndex));
- return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType,
dataTypeProperties);
+ protected String getDropDatabaseSql(String databaseName) {
+ return String.format("DROP DATABASE `%s`;", databaseName);
}
private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision,
int scale) {
@@ -365,22 +198,6 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, scale);
- return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType,
dataTypeProperties);
- }
-
- @SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
- Map<String, String> options = new HashMap<>(8);
- options.put("connector", "jdbc");
- options.put("url", baseUrl + tablePath.getDatabaseName());
- options.put("table-name", tablePath.getFullName());
- options.put("username", username);
- options.put("password", pwd);
- return options;
- }
-
- private String getUrlFromDatabaseName(String databaseName) {
- String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
- return url + databaseName + suffix;
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(mysqlType,
dataTypeProperties);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 608062fc99..490ecd30ff 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -119,7 +119,7 @@ public class MysqlCreateTableSqlBuilder {
List<String> sqls = new ArrayList<>();
sqls.add(
String.format(
- "CREATE TABLE IF NOT EXISTS %s (\n%s\n)",
+ "CREATE TABLE %s (\n%s\n)",
tableName, buildColumnsIdentifySql(catalogName)));
if (engine != null) {
sqls.add("ENGINE = " + engine);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 261f4f7fb6..b90a86a7ab 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -18,33 +18,22 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import lombok.extern.slf4j.Slf4j;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BFILE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BLOB;
@@ -61,8 +50,10 @@ import static
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.Orac
@Slf4j
public class OracleCatalog extends AbstractJdbcCatalog {
+
private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR =
new OracleDataTypeConvertor();
+
private static final List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList(
@@ -87,7 +78,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
"EXFSYS",
"SYSMAN"));
- private static final String SELECT_COLUMNS_SQL =
+ private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT\n"
+ " cols.COLUMN_NAME,\n"
+ " CASE \n"
@@ -127,158 +118,50 @@ public class OracleCatalog extends AbstractJdbcCatalog {
}
@Override
- public List<String> listDatabases() throws CatalogException {
- try (PreparedStatement ps =
- defaultConnection.prepareStatement("SELECT name FROM
v$database")) {
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- databases.add(databaseName);
- }
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
this.catalogName), e);
- }
+ protected String getListDatabaseSql() {
+ return "SELECT name FROM v$database";
}
@Override
- protected boolean createTableInternal(TablePath tablePath, CatalogTable
table)
- throws CatalogException {
- String createTableSql = new
OracleCreateTableSqlBuilder(table).build(tablePath);
- String[] createTableSqls = createTableSql.split(";");
- for (String sql : createTableSqls) {
- log.info("create table sql: {}", sql);
- try (PreparedStatement ps =
defaultConnection.prepareStatement(sql)) {
- ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed creating table %s",
tablePath.getFullName()), e);
- }
- }
- return true;
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ return new OracleCreateTableSqlBuilder(table).build(tablePath);
}
@Override
- protected boolean dropTableInternal(TablePath tablePath) throws
CatalogException {
- return false;
+ protected String getDropTableSql(TablePath tablePath) {
+ return String.format("DROP TABLE %s", getTableName(tablePath));
}
@Override
- protected boolean createDatabaseInternal(String databaseName) {
- return false;
+ protected String getTableName(TablePath tablePath) {
+ return tablePath.getSchemaAndTableName().toUpperCase();
}
@Override
- protected boolean dropDatabaseInternal(String databaseName) throws
CatalogException {
- return false;
+ protected String getListTableSql(String databaseName) {
+ return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES"
+ + " WHERE TABLE_NAME NOT LIKE 'MDRT_%'"
+ + " AND TABLE_NAME NOT LIKE 'MDRS_%'"
+ + " AND TABLE_NAME NOT LIKE 'MDXT_%'"
+ + " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS
NULL)";
}
@Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
-
.contains(tablePath.getSchemaAndTableName().toUpperCase());
- } catch (DatabaseNotExistException e) {
- return false;
+ protected String getTableName(ResultSet rs) throws SQLException {
+ if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+ return null;
}
+ return rs.getString(1) + "." + rs.getString(2);
}
@Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName,
databaseName);
- }
-
- try (PreparedStatement ps =
- defaultConnection.prepareStatement(
- "SELECT OWNER, TABLE_NAME FROM ALL_TABLES\n"
- + "WHERE TABLE_NAME NOT LIKE 'MDRT_%'\n"
- + " AND TABLE_NAME NOT LIKE 'MDRS_%'\n"
- + " AND TABLE_NAME NOT LIKE 'MDXT_%'\n"
- + " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%'
AND IOT_NAME IS NULL)")) {
-
- ResultSet rs = ps.executeQuery();
- List<String> tables = new ArrayList<>();
- while (rs.next()) {
- if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
- continue;
- }
- tables.add(rs.getString(1) + "." + rs.getString(2));
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
catalogName), e);
- }
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ return String.format(
+ SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
tablePath.getTableName());
}
@Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- try {
- DatabaseMetaData metaData = defaultConnection.getMetaData();
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- List<ConstraintKey> constraintKeys =
- getConstraintKeys(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
-
- String sql =
- String.format(
- SELECT_COLUMNS_SQL,
- tablePath.getSchemaName(),
- tablePath.getTableName());
- try (PreparedStatement ps =
defaultConnection.prepareStatement(sql);
- ResultSet resultSet = ps.executeQuery()) {
- TableSchema.Builder builder = TableSchema.builder();
- // add column
- while (resultSet.next()) {
- buildColumn(resultSet, builder);
- }
-
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "");
- }
-
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s",
tablePath.getFullName()), e);
- }
- }
-
- private void buildColumn(ResultSet resultSet, TableSchema.Builder builder)
throws SQLException {
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String typeName = resultSet.getString("TYPE_NAME");
String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
@@ -314,31 +197,19 @@ public class OracleCatalog extends AbstractJdbcCatalog {
break;
}
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- columnName,
- type,
- 0,
- isNullable,
- defaultValue,
- columnComment,
- fullTypeName,
- false,
- false,
- bitLen,
- null,
- columnLength);
- builder.column(physicalColumn);
- }
-
- @SuppressWarnings("unchecked")
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex)
- throws SQLException {
- String columnType = metadata.getColumnTypeName(colIndex);
- Map<String, Object> dataTypeProperties = new HashMap<>();
- dataTypeProperties.put(OracleDataTypeConvertor.PRECISION,
metadata.getPrecision(colIndex));
- dataTypeProperties.put(OracleDataTypeConvertor.SCALE,
metadata.getScale(colIndex));
- return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnType,
dataTypeProperties);
+ return PhysicalColumn.of(
+ columnName,
+ type,
+ 0,
+ isNullable,
+ defaultValue,
+ columnComment,
+ fullTypeName,
+ false,
+ false,
+ bitLen,
+ null,
+ columnLength);
}
private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision,
long scale) {
@@ -348,14 +219,13 @@ public class OracleCatalog extends AbstractJdbcCatalog {
return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName,
dataTypeProperties);
}
- @SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
- Map<String, String> options = new HashMap<>(8);
- options.put("connector", "jdbc");
- options.put("url", baseUrl);
- options.put("table-name", tablePath.getSchemaAndTableName());
- options.put("username", username);
- options.put("password", pwd);
- return options;
+ @Override
+ protected String getUrlFromDatabaseName(String databaseName) {
+ return defaultUrl;
+ }
+
+ @Override
+ protected String getOptionTableName(TablePath tablePath) {
+ return tablePath.getSchemaAndTableName();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index e3507666d0..2769d09ebb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -18,39 +18,20 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
-import com.mysql.cj.MysqlType;
-import com.mysql.cj.jdbc.result.ResultSetImpl;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor.PG_BIT;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor.PG_BYTEA;
@@ -65,7 +46,10 @@ import static
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.Postgr
@Slf4j
public class PostgresCatalog extends AbstractJdbcCatalog {
- private static final String SELECT_COLUMNS_SQL =
+ private static final PostgresDataTypeConvertor DATA_TYPE_CONVERTOR =
+ new PostgresDataTypeConvertor();
+
+ private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT \n"
+ " a.attname AS column_name, \n"
+ "\t\tt.typname as type_name,\n"
@@ -102,8 +86,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
+ "ORDER BY \n"
+ " a.attnum;";
- protected static final Set<String> SYS_DATABASES = new HashSet<>(9);
-
static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("pg_catalog");
@@ -116,8 +98,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
SYS_DATABASES.add("template1");
}
- protected final Map<String, Connection> connectionMap;
-
public PostgresCatalog(
String catalogName,
String username,
@@ -125,154 +105,26 @@ public class PostgresCatalog extends AbstractJdbcCatalog
{
JdbcUrlUtil.UrlInfo urlInfo,
String defaultSchema) {
super(catalogName, username, pwd, urlInfo, defaultSchema);
- this.connectionMap = new ConcurrentHashMap<>();
- }
-
- public Connection getConnection(String url) {
- if (connectionMap.containsKey(url)) {
- return connectionMap.get(url);
- }
- try {
- Connection connection = DriverManager.getConnection(url, username,
pwd);
- connectionMap.put(url, connection);
- return connection;
- } catch (SQLException e) {
- throw new CatalogException(String.format("Failed connecting to %s
via JDBC.", url), e);
- }
}
@Override
- public void close() throws CatalogException {
- for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
- try {
- entry.getValue().close();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed to close %s via JDBC.",
entry.getKey()), e);
- }
- }
- super.close();
+ protected String getListDatabaseSql() {
+ return "select datname from pg_database;";
}
@Override
- public List<String> listDatabases() throws CatalogException {
- try (PreparedStatement ps =
- defaultConnection.prepareStatement("select datname from
pg_database;")) {
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- if (!SYS_DATABASES.contains(databaseName)) {
- databases.add(rs.getString(1));
- }
- }
-
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
this.catalogName), e);
- }
+ protected String getListTableSql(String databaseName) {
+ return "SELECT table_schema, table_name FROM
information_schema.tables;";
}
@Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName,
databaseName);
- }
-
- String dbUrl = getUrlFromDatabaseName(databaseName);
- Connection connection = getConnection(dbUrl);
- try (PreparedStatement ps =
- connection.prepareStatement(
- "SELECT table_schema, table_name FROM
information_schema.tables;")) {
-
- ResultSet rs = ps.executeQuery();
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- String schemaName = rs.getString("table_schema");
- String tableName = rs.getString("table_name");
- if (org.apache.commons.lang3.StringUtils.isNotBlank(schemaName)
- && !SYS_DATABASES.contains(schemaName)) {
- tables.add(schemaName + "." + tableName);
- }
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
catalogName), e);
- }
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ return String.format(
+ SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
tablePath.getTableName());
}
@Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- Connection conn = getConnection(dbUrl);
- try {
- DatabaseMetaData metaData = conn.getMetaData();
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- List<ConstraintKey> constraintKeys =
- getConstraintKeys(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
-
- String sql =
- String.format(
- SELECT_COLUMNS_SQL,
- tablePath.getSchemaName(),
- tablePath.getTableName());
- try (PreparedStatement ps = conn.prepareStatement(sql);
- ResultSet resultSet = ps.executeQuery()) {
- TableSchema.Builder builder = TableSchema.builder();
-
- // add column
- while (resultSet.next()) {
- buildColumn(resultSet, builder);
- }
-
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "",
- "postgres");
- }
-
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s",
tablePath.getFullName()), e);
- }
- }
-
- private void buildColumn(ResultSet resultSet, TableSchema.Builder builder)
throws SQLException {
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("column_name");
String typeName = resultSet.getString("type_name");
String fullTypeName = resultSet.getString("full_type_name");
@@ -282,8 +134,9 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
Object defaultValue = resultSet.getObject("default_value");
boolean isNullable = resultSet.getString("is_nullable").equals("YES");
- if (defaultValue != null &&
defaultValue.toString().contains("regclass"))
+ if (defaultValue != null &&
defaultValue.toString().contains("regclass")) {
defaultValue = null;
+ }
SeaTunnelDataType<?> type = fromJdbcType(typeName, columnLength,
columnScale);
long bitLen = 0;
@@ -311,131 +164,55 @@ public class PostgresCatalog extends AbstractJdbcCatalog
{
break;
}
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- columnName,
- type,
- 0,
- isNullable,
- defaultValue,
- columnComment,
- fullTypeName,
- false,
- false,
- bitLen,
- null,
- columnLength);
- builder.column(physicalColumn);
+ return PhysicalColumn.of(
+ columnName,
+ type,
+ 0,
+ isNullable,
+ defaultValue,
+ columnComment,
+ fullTypeName,
+ false,
+ false,
+ bitLen,
+ null,
+ columnLength);
}
@Override
- protected boolean createTableInternal(TablePath tablePath, CatalogTable
table)
- throws CatalogException {
- String createTableSql = new
PostgresCreateTableSqlBuilder(table).build(tablePath);
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- Connection conn = getConnection(dbUrl);
- log.info("create table sql: {}", createTableSql);
- try (PreparedStatement ps = conn.prepareStatement(createTableSql)) {
- ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed creating table %s",
tablePath.getFullName()), e);
- }
- return true;
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ return new PostgresCreateTableSqlBuilder(table).build(tablePath);
}
@Override
- protected boolean dropTableInternal(TablePath tablePath) throws
CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-
- String schemaName = tablePath.getSchemaName();
- String tableName = tablePath.getTableName();
-
- String sql = "DROP TABLE IF EXISTS \"" + schemaName + "\".\"" +
tableName + "\"";
- Connection connection = getConnection(dbUrl);
- try (PreparedStatement ps = connection.prepareStatement(sql)) {
- // Will there exist concurrent drop for one table?
- return ps.execute();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed dropping table %s",
tablePath.getFullName()), e);
- }
+ protected String getDropTableSql(TablePath tablePath) {
+ return "DROP TABLE \""
+ + tablePath.getSchemaName()
+ + "\".\""
+ + tablePath.getTableName()
+ + "\"";
}
@Override
- protected boolean createDatabaseInternal(String databaseName) throws
CatalogException {
- String sql = "CREATE DATABASE \"" + databaseName + "\"";
- try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed creating database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
+ protected String getCreateDatabaseSql(String databaseName) {
+ return "CREATE DATABASE \"" + databaseName + "\"";
}
@Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
+ protected String getDropDatabaseSql(String databaseName) {
+ return "DROP DATABASE \"" + databaseName + "\"";
}
@Override
- protected boolean dropDatabaseInternal(String databaseName) throws
CatalogException {
- String sql = "DROP DATABASE IF EXISTS \"" + databaseName + "\"";
- try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed dropping database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
- }
-
- /**
- * @see MysqlType
- * @see ResultSetImpl#getObjectStoredProc(int, int)
- */
- @SuppressWarnings("unchecked")
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex)
- throws SQLException {
- String columnTypeName = metadata.getColumnTypeName(colIndex);
- Map<String, Object> dataTypeProperties = new HashMap<>();
- dataTypeProperties.put(
- PostgresDataTypeConvertor.PRECISION,
metadata.getPrecision(colIndex));
- dataTypeProperties.put(PostgresDataTypeConvertor.SCALE,
metadata.getScale(colIndex));
- return new PostgresDataTypeConvertor().toSeaTunnelType(columnTypeName,
dataTypeProperties);
+ protected void dropDatabaseInternal(String databaseName) throws
CatalogException {
+ closeDatabaseConnection(databaseName);
+ super.dropDatabaseInternal(databaseName);
}
private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision,
long scale) {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(PostgresDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, scale);
- return new PostgresDataTypeConvertor().toSeaTunnelType(typeName,
dataTypeProperties);
- }
-
- @SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
- Map<String, String> options = new HashMap<>(8);
- options.put("connector", "jdbc");
- options.put("url", baseUrl + tablePath.getDatabaseName());
- options.put("table-name", tablePath.getFullName());
- options.put("username", username);
- options.put("password", pwd);
- return options;
- }
-
- private String getUrlFromDatabaseName(String databaseName) {
- String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
- return url + databaseName + suffix;
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName,
dataTypeProperties);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
index 85f4468bef..d423f18301 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
@@ -48,7 +48,7 @@ public class PostgresCreateTableSqlBuilder {
public String build(TablePath tablePath) {
StringBuilder createTableSql = new StringBuilder();
createTableSql
- .append("CREATE TABLE IF NOT EXISTS ")
+ .append("CREATE TABLE ")
.append(tablePath.getSchemaAndTableName())
.append(" (\n");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index ea04c60bff..7d18ed2d90 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,15 +19,10 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -37,33 +32,35 @@ import org.apache.commons.lang3.tuple.Pair;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
@Slf4j
public class SqlServerCatalog extends AbstractJdbcCatalog {
- private static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
- static {
- SYS_DATABASES.add("master");
- SYS_DATABASES.add("tempdb");
- SYS_DATABASES.add("model");
- SYS_DATABASES.add("msdb");
- }
+ private static final SqlServerDataTypeConvertor DATA_TYPE_CONVERTOR =
+ new SqlServerDataTypeConvertor();
+
+ private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+ "SELECT tbl.name AS table_name,\n"
+ + " col.name AS column_name,\n"
+ + " ext.value AS comment,\n"
+ + " col.column_id AS column_id,\n"
+ + " types.name AS type,\n"
+ + " col.max_length AS max_length,\n"
+ + " col.precision AS precision,\n"
+ + " col.scale AS scale,\n"
+ + " col.is_nullable AS is_nullable,\n"
+ + " def.definition AS default_value\n"
+ + "FROM sys.tables tbl\n"
+ + " INNER JOIN sys.columns col ON tbl.object_id =
col.object_id\n"
+ + " LEFT JOIN sys.types types ON col.user_type_id =
types.user_type_id\n"
+ + " LEFT JOIN sys.extended_properties ext ON
ext.major_id = col.object_id AND ext.minor_id = col.column_id\n"
+ + " LEFT JOIN sys.default_constraints def ON
col.default_object_id = def.object_id AND ext.minor_id = col.column_id AND
ext.name = 'MS_Description'\n"
+ + "WHERE schema_name(tbl.schema_id) = '%s' %s\n"
+ + "ORDER BY tbl.name, col.column_id";
public SqlServerCatalog(
String catalogName,
@@ -75,133 +72,29 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
}
@Override
- public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
- PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM
sys.databases")) {
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- if (!SYS_DATABASES.contains(databaseName)) {
- databases.add(databaseName);
- }
- }
-
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
this.catalogName), e);
- }
- }
-
- @Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName,
databaseName);
- }
-
- String dbUrl = getUrlFromDatabaseName(databaseName);
- try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
- + databaseName
- + ".INFORMATION_SCHEMA.TABLES WHERE
TABLE_TYPE = 'BASE TABLE'")) {
-
- ResultSet rs = ps.executeQuery();
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- tables.add(rs.getString(1) + "." + rs.getString(2));
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s",
catalogName), e);
- }
+ protected String getListDatabaseSql() {
+ return "SELECT NAME FROM sys.databases";
}
@Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
+ protected String getListTableSql(String databaseName) {
+ return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+ + databaseName
+ + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
}
@Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
+ protected String getSelectColumnsSql(TablePath tablePath) {
String tableSql =
StringUtils.isNotEmpty(tablePath.getTableName())
? "AND tbl.name = '" + tablePath.getTableName() + "'"
: "";
- String columnSql =
- String.format(
- " SELECT tbl.name AS table_name, \n
col.name AS column_name, \n ext.value AS comment, \n
col.column_id AS column_id, \n types.name AS type, \n
col.max_length AS max_length, \n col.precision AS precision, \n
col.scale AS scale, \n col.is_nullable AS is_nullable, \n
def.definition AS default_value\n FROM sys.tables tbl \nINNER JOIN
sys.columns col \n ON tbl.object_id = col. [...]
- tablePath.getSchemaName(), tableSql);
-
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd)) {
- DatabaseMetaData metaData = conn.getMetaData();
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- List<ConstraintKey> constraintKeys =
- getConstraintKeys(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
-
- try (PreparedStatement ps = conn.prepareStatement(columnSql);
- ResultSet resultSet = ps.executeQuery(); ) {
- TableSchema.Builder builder = TableSchema.builder();
- while (resultSet.next()) {
- buildTable(resultSet, builder);
- }
-
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "",
- "sqlserver");
- }
-
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s",
tablePath.getFullName()), e);
- }
+ return String.format(SELECT_COLUMNS_SQL_TEMPLATE,
tablePath.getSchemaName(), tableSql);
}
- private void buildTable(ResultSet resultSet, TableSchema.Builder builder)
throws SQLException {
+ @Override
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("column_name");
String sourceType = resultSet.getString("type");
// String typeName =
resultSet.getString("DATA_TYPE").toUpperCase();
@@ -266,21 +159,19 @@ public class SqlServerCatalog extends AbstractJdbcCatalog
{
default:
break;
}
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- columnName,
- type,
- 0,
- isNullable,
- defaultValue,
- comment,
- sourceType,
- false,
- false,
- bitLen,
- null,
- columnLength);
- builder.column(physicalColumn);
+ return PhysicalColumn.of(
+ columnName,
+ type,
+ 0,
+ isNullable,
+ defaultValue,
+ comment,
+ sourceType,
+ false,
+ false,
+ bitLen,
+ null,
+ columnLength);
}
private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision,
int scale) {
@@ -288,103 +179,37 @@ public class SqlServerCatalog extends
AbstractJdbcCatalog {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(SqlServerDataTypeConvertor.PRECISION,
precision);
dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, scale);
- return new
SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(),
dataTypeProperties);
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(pair.getLeft(),
dataTypeProperties);
}
@Override
- protected boolean createTableInternal(TablePath tablePath, CatalogTable
table)
- throws CatalogException {
-
- String createTableSql =
- SqlServerCreateTableSqlBuilder.builder(tablePath,
table).build(tablePath, table);
- log.info("create table sql: {}", createTableSql);
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
- PreparedStatement ps = conn.prepareStatement(createTableSql)) {
- System.out.println(createTableSql);
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed creating table %s",
tablePath.getFullName()), e);
- }
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ return SqlServerCreateTableSqlBuilder.builder(tablePath,
table).build(tablePath, table);
}
@Override
- protected boolean dropTableInternal(TablePath tablePath) throws
CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format(
- "DROP TABLE IF EXISTS %s",
tablePath.getFullName()))) {
- // Will there exist concurrent drop for one table?
- return ps.execute();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed dropping table %s",
tablePath.getFullName()), e);
- }
+ protected String getDropTableSql(TablePath tablePath) {
+ return String.format("DROP TABLE %s", tablePath.getFullName());
}
@Override
- protected boolean createDatabaseInternal(String databaseName) throws
CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("CREATE DATABASE `%s`",
databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed creating database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
+ protected String getCreateDatabaseSql(String databaseName) {
+ return String.format("CREATE DATABASE %s", databaseName);
}
@Override
- protected boolean dropDatabaseInternal(String databaseName) throws
CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("DROP DATABASE IF EXISTS `%s`;",
databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed dropping database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
+ protected String getDropDatabaseSql(String databaseName) {
+ return String.format("DROP DATABASE %s;", databaseName);
}
- @SuppressWarnings("unchecked")
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex)
- throws SQLException {
- Pair<SqlServerType, Map<String, Object>> pair =
- SqlServerType.parse(metadata.getColumnTypeName(colIndex));
- Map<String, Object> dataTypeProperties = new HashMap<>();
- dataTypeProperties.put(
- SqlServerDataTypeConvertor.PRECISION,
metadata.getPrecision(colIndex));
- dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE,
metadata.getScale(colIndex));
- return new
SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(),
dataTypeProperties);
- }
-
- @SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
- Map<String, String> options = new HashMap<>(8);
- options.put("connector", "jdbc");
- options.put("url",
getUrlFromDatabaseName(tablePath.getDatabaseName()));
- options.put("table-name", tablePath.getFullName());
- options.put("username", username);
- options.put("password", pwd);
- return options;
+ @Override
+ protected void dropDatabaseInternal(String databaseName) throws
CatalogException {
+ closeDatabaseConnection(databaseName);
+ super.dropDatabaseInternal(databaseName);
}
- private String getUrlFromDatabaseName(String databaseName) {
+ @Override
+ protected String getUrlFromDatabaseName(String databaseName) {
return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
}
-
- private String getCreateTableSql(TablePath tablePath, CatalogTable table) {
-
- return "";
- }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
index 6b8c49bc0a..1c5fb5a2b2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
@@ -19,8 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -41,25 +39,10 @@ class OracleCatalogTest {
catalog.open();
- MySqlCatalog mySqlCatalog =
- new MySqlCatalog(
- "mysql",
- "root",
- "root@123",
-
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest"));
-
- mySqlCatalog.open();
-
- CatalogTable table1 =
- mySqlCatalog.getTable(TablePath.of("mingdongtest",
"all_types_table_02"));
-
List<String> strings = catalog.listDatabases();
- System.out.println(strings);
-
- List<String> strings1 = catalog.listTables("XE");
CatalogTable table = catalog.getTable(TablePath.of("XE", "TEST",
"PG_TYPES_TABLE_CP1"));
- catalog.createTableInternal(new TablePath("XE", "TEST", "TEST003"),
table);
+ catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table,
false);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
index badab864fc..6ef4d9e654 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
@@ -53,7 +53,7 @@ class PostgresCatalogTest {
catalog.getTable(TablePath.of("st_test", "public",
"all_types_table_02"));
System.out.println("find table: " + table);
- catalog.createTableInternal(
- new TablePath("liulitest", "public", "all_types_table_02"),
table);
+ catalog.createTable(
+ new TablePath("liulitest", "public", "all_types_table_02"),
table, false);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
index 3de5c65bf8..b75ac68223 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -93,7 +93,7 @@ public class MysqlCreateTableSqlBuilderTest {
MysqlCreateTableSqlBuilder.builder(tablePath,
catalogTable).build("mysql");
// create table sql is change; The old unit tests are no longer
applicable
String expect =
- "CREATE TABLE IF NOT EXISTS test_table (\n"
+ "CREATE TABLE test_table (\n"
+ "\tid null NOT NULL COMMENT 'id', \n"
+ "\tname null NOT NULL COMMENT 'name', \n"
+ "\tage null NULL COMMENT 'age', \n"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 6528be0e1f..a38fb2217f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -31,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
@@ -76,6 +80,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
protected GenericContainer<?> dbServer;
protected JdbcCase jdbcCase;
protected Connection connection;
+ protected Catalog catalog;
abstract JdbcCase getJdbcCase();
@@ -141,12 +146,16 @@ public abstract class AbstractJdbcIT extends
TestSuiteBase implements TestResour
String.format(
createTemplate,
buildTableInfoWithSchema(
- jdbcCase.getDatabase(),
jdbcCase.getSourceTable()));
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSourceTable()));
String createSink =
String.format(
createTemplate,
buildTableInfoWithSchema(
- jdbcCase.getDatabase(),
jdbcCase.getSinkTable()));
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSinkTable()));
statement.execute(createSource);
statement.execute(createSink);
@@ -173,6 +182,14 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
+ ")";
}
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(database, table);
+ }
+
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(database, table);
+ }
+
public void clearTable(String schema, String table) {
try (Statement statement = connection.createStatement()) {
statement.execute("TRUNCATE TABLE " +
buildTableInfoWithSchema(schema, table));
@@ -215,6 +232,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
createSchemaIfNeeded();
createNeededTables();
insertTestData();
+ initCatalog();
}
@Override
@@ -226,6 +244,10 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
if (connection != null) {
connection.close();
}
+
+ if (catalog != null) {
+ catalog.close();
+ }
}
@TestTemplate
@@ -238,6 +260,43 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
}
compareResult();
- clearTable(jdbcCase.getDatabase(), jdbcCase.getSinkTable());
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
+ }
+
+ protected void initCatalog() {}
+
+ @Test
+ public void testCatalog() {
+ if (catalog == null) {
+ return;
+ }
+
+ TablePath sourceTablePath =
+ new TablePath(
+ jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSourceTable());
+ TablePath targetTablePath =
+ new TablePath(
+ jdbcCase.getCatalogDatabase(),
+ jdbcCase.getCatalogSchema(),
+ jdbcCase.getCatalogTable());
+ boolean createdDb = false;
+
+ if (!catalog.databaseExists(targetTablePath.getDatabaseName())) {
+ catalog.createDatabase(targetTablePath, false);
+
Assertions.assertTrue(catalog.databaseExists(targetTablePath.getDatabaseName()));
+ createdDb = true;
+ }
+
+ CatalogTable catalogTable = catalog.getTable(sourceTablePath);
+ catalog.createTable(targetTablePath, catalogTable, false);
+ Assertions.assertTrue(catalog.tableExists(targetTablePath));
+
+ catalog.dropTable(targetTablePath, false);
+ Assertions.assertFalse(catalog.tableExists(targetTablePath));
+
+ if (createdDb) {
+ catalog.dropDatabase(targetTablePath, false);
+
Assertions.assertFalse(catalog.databaseExists(targetTablePath.getDatabaseName()));
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index 805fcbd16b..5f17eacc51 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -41,6 +41,7 @@ public class JdbcCase {
private int port;
private int localPort;
private String database;
+ private String schema;
private String sourceTable;
private String sinkTable;
private String jdbcTemplate;
@@ -50,4 +51,8 @@ public class JdbcCase {
private List<String> configFile;
private Pair<String[], List<SeaTunnelRow>> testData;
private Map<String, String> containerEnv;
+
+ private String catalogDatabase;
+ private String catalogSchema;
+ private String catalogTable;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index f4b1338b15..b10aa0c222 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -19,6 +19,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
import org.apache.commons.lang3.tuple.Pair;
@@ -48,6 +50,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private static final String MYSQL_DATABASE = "seatunnel";
private static final String MYSQL_SOURCE = "source";
private static final String MYSQL_SINK = "sink";
+ private static final String CATALOG_DATABASE = "catalog_database";
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel";
@@ -138,6 +141,8 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
.configFile(CONFIG_FILE)
.insertSql(insertSql)
.testData(testDataSet)
+ .catalogDatabase(CATALOG_DATABASE)
+ .catalogTable(MYSQL_SINK)
.build();
}
@@ -282,4 +287,16 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
return container;
}
+
+ @Override
+ protected void initCatalog() {
+ catalog =
+ new MySqlCatalog(
+ "mysql",
+ jdbcCase.getUserName(),
+ jdbcCase.getPassword(),
+ JdbcUrlUtil.getUrlInfo(
+ jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost())));
+ catalog.open();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index d0f8ce3b68..75bdffbd6c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -19,6 +19,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
import org.apache.commons.lang3.tuple.Pair;
@@ -27,6 +29,7 @@ import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
import com.google.common.collect.Lists;
@@ -47,11 +50,13 @@ public class JdbcOracleIT extends AbstractJdbcIT {
private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
private static final int ORACLE_PORT = 1521;
private static final String ORACLE_URL = "jdbc:oracle:thin:@" + HOST +
":%s/%s";
- private static final String USERNAME = "testUser";
+ private static final String USERNAME = "TESTUSER";
private static final String PASSWORD = "testPassword";
- private static final String DATABASE = "TESTUSER";
+ private static final String DATABASE = "XE";
+ private static final String SCHEMA = USERNAME;
private static final String SOURCE_TABLE = "E2E_TABLE_SOURCE";
private static final String SINK_TABLE = "E2E_TABLE_SINK";
+ private static final String CATALOG_TABLE = "E2E_TABLE_CATALOG";
private static final List<String> CONFIG_FILE =
Lists.newArrayList("/jdbc_oracle_source_to_sink.conf");
@@ -78,11 +83,11 @@ public class JdbcOracleIT extends AbstractJdbcIT {
containerEnv.put("ORACLE_PASSWORD", PASSWORD);
containerEnv.put("APP_USER", USERNAME);
containerEnv.put("APP_USER_PASSWORD", PASSWORD);
- String jdbcUrl = String.format(ORACLE_URL, ORACLE_PORT, DATABASE);
+ String jdbcUrl = String.format(ORACLE_URL, ORACLE_PORT, SCHEMA);
Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
String[] fieldNames = testDataSet.getKey();
- String insertSql = insertTable(DATABASE, SOURCE_TABLE, fieldNames);
+ String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames);
return JdbcCase.builder()
.dockerImage(ORACLE_IMAGE)
@@ -97,8 +102,12 @@ public class JdbcOracleIT extends AbstractJdbcIT {
.userName(USERNAME)
.password(PASSWORD)
.database(DATABASE)
+ .schema(SCHEMA)
.sourceTable(SOURCE_TABLE)
.sinkTable(SINK_TABLE)
+ .catalogDatabase(DATABASE)
+ .catalogSchema(SCHEMA)
+ .catalogTable(CATALOG_TABLE)
.createSql(CREATE_SQL)
.configFile(CONFIG_FILE)
.insertSql(insertSql)
@@ -162,9 +171,10 @@ public class JdbcOracleIT extends AbstractJdbcIT {
GenericContainer<?> container =
new OracleContainer(imageName)
- .withDatabaseName(DATABASE)
- .withUsername(USERNAME)
- .withPassword(PASSWORD)
+ .withDatabaseName(SCHEMA)
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("sql/oracle_init.sql"),
+ "/container-entrypoint-startdb.d/init.sql")
.withNetwork(NETWORK)
.withNetworkAliases(ORACLE_NETWORK_ALIASES)
.withExposedPorts(ORACLE_PORT)
@@ -181,4 +191,27 @@ public class JdbcOracleIT extends AbstractJdbcIT {
public String quoteIdentifier(String field) {
return "\"" + field + "\"";
}
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(schema, table);
+ }
+
+ @Override
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ @Override
+ protected void initCatalog() {
+ String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost());
+ catalog =
+ new OracleCatalog(
+ "oracle",
+ jdbcCase.getUserName(),
+ jdbcCase.getPassword(),
+ OracleURLParser.parse(jdbcUrl),
+ SCHEMA);
+ catalog.open();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql
new file mode 100644
index 0000000000..ba77de271e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+ALTER SESSION SET CONTAINER = TESTUSER;
+
+CREATE USER TESTUSER IDENTIFIED BY testPassword;
+
+GRANT DBA TO TESTUSER;
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index f66ef615d7..a5796c1aaa 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -26,6 +31,7 @@ import
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -252,6 +258,43 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
}
}
+ @Test
+ public void testCatalog() {
+ String schema = "public";
+ String databaseName = POSTGRESQL_CONTAINER.getDatabaseName();
+ String tableName = "pg_e2e_sink_table";
+ String catalogDatabaseName = "pg_e2e_catalog_database";
+ String catalogTableName = "pg_e2e_catalog_table";
+
+ Catalog catalog =
+ new PostgresCatalog(
+ "postgres",
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword(),
+
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
+ schema);
+ catalog.open();
+
+ TablePath tablePath = new TablePath(databaseName, schema, tableName);
+ TablePath catalogTablePath = new TablePath(catalogDatabaseName,
schema, catalogTableName);
+
+
Assertions.assertFalse(catalog.databaseExists(catalogTablePath.getDatabaseName()));
+ catalog.createDatabase(catalogTablePath, false);
+
Assertions.assertTrue(catalog.databaseExists(catalogTablePath.getDatabaseName()));
+
+ CatalogTable catalogTable = catalog.getTable(tablePath);
+ catalog.createTable(catalogTablePath, catalogTable, false);
+ Assertions.assertTrue(catalog.tableExists(catalogTablePath));
+
+ catalog.dropTable(catalogTablePath, false);
+ Assertions.assertFalse(catalog.tableExists(catalogTablePath));
+
+ catalog.dropDatabase(catalogTablePath, false);
+
Assertions.assertFalse(catalog.databaseExists(catalogTablePath.getDatabaseName()));
+
+ catalog.close();
+ }
+
private void initializeJdbcTable() {
try (Connection connection = getJdbcConnection()) {
Statement statement = connection.createStatement();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index f615b6656e..0a170ff4be 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.commons.lang3.tuple.Pair;
@@ -44,9 +46,16 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
private static final String SQLSERVER_CONTAINER_HOST = "sqlserver";
private static final String SQLSERVER_SOURCE = "source";
private static final String SQLSERVER_SINK = "sink";
+ private static final String SQLSERVER_DATABASE = "master";
+ private static final String SQLSERVER_SCHEMA = "dbo";
+ private static final String SQLSERVER_CATALOG_DATABASE = "catalog_test";
+
private static final int SQLSERVER_CONTAINER_PORT = 1433;
private static final String SQLSERVER_URL =
- "jdbc:sqlserver://" + AbstractJdbcIT.HOST + ":%s;encrypt=false;";
+ "jdbc:sqlserver://"
+ + AbstractJdbcIT.HOST
+ + ":%s;encrypt=false;databaseName="
+ + SQLSERVER_DATABASE;
private static final String DRIVER_CLASS =
"com.microsoft.sqlserver.jdbc.SQLServerDriver";
private static final List<String> CONFIG_FILE =
Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf");
@@ -81,8 +90,13 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
.jdbcUrl(jdbcUrl)
.userName(username)
.password(password)
+ .database(SQLSERVER_DATABASE)
+ .schema(SQLSERVER_SCHEMA)
.sourceTable(SQLSERVER_SOURCE)
.sinkTable(SQLSERVER_SINK)
+ .catalogDatabase(SQLSERVER_CATALOG_DATABASE)
+ .catalogSchema(SQLSERVER_SCHEMA)
+ .catalogTable(SQLSERVER_SINK)
.createSql(CREATE_SQL)
.configFile(CONFIG_FILE)
.insertSql(insertSql)
@@ -158,4 +172,22 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
public void clearTable(String schema, String table) {
// do nothing.
}
+
+ @Override
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ @Override
+ protected void initCatalog() {
+ catalog =
+ new SqlServerCatalog(
+ "sqlserver",
+ jdbcCase.getUserName(),
+ jdbcCase.getPassword(),
+ SqlServerURLParser.parse(
+ jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost())),
+ SQLSERVER_SCHEMA);
+ catalog.open();
+ }
}