yzeng1618 commented on code in PR #9380:
URL: https://github.com/apache/seatunnel/pull/9380#discussion_r2131458719
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -27,36 +27,224 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sourcetype.DatabaseTypeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
+import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.SneakyThrows;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class JdbcSource
implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit,
JdbcSourceState>,
SupportParallelism,
SupportColumnProjection {
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
-
+ private static final String POINT = ".";
private final JdbcSourceConfig jdbcSourceConfig;
private final Map<TablePath, JdbcSourceTable> jdbcSourceTables;
@SneakyThrows
public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
+ JdbcConnectionConfig jdbcConnectionConfig =
jdbcSourceConfig.getJdbcConnectionConfig();
+ JdbcDialect jdbcDialect =
+ JdbcDialectLoader.load(
+ jdbcConnectionConfig.getUrl(),
jdbcConnectionConfig.getCompatibleMode());
+ if (!isSupportedDatabase(jdbcDialect.dialectName())) {
+ this.jdbcSourceTables =
+ JdbcCatalogUtils.getTables(
+ jdbcSourceConfig.getJdbcConnectionConfig(),
+ jdbcSourceConfig.getTableConfigList());
+ return;
+ }
+ JdbcConnectionProvider connectionProvider =
+
jdbcDialect.getJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
+ Connection connection = connectionProvider.getOrEstablishConnection();
+ List<JdbcSourceTableConfig> jdbcSourceTableConfigs = new ArrayList<>();
+ ResultSet rs = null;
+ PreparedStatement ps = null;
+ List<JdbcSourceTableConfig> tablePaths =
jdbcSourceConfig.getTableConfigList();
+ try {
+ for (JdbcSourceTableConfig tableConfig : tablePaths) {
+ List<String> schemaTables = new ArrayList<>();
+ String tablePath = tableConfig.getTablePath();
+ String query = tableConfig.getQuery();
+ LOG.info("Processing table path: {}, custom query: {}",
tablePath, query);
+ String sql;
+ if (StringUtils.isBlank(query)) {
+ String schemaName;
+ if
(jdbcDialect.dialectName().startsWith(DatabaseTypeEnum.ORACLE.getValue())) {
+ schemaName = tablePath.split("\\.")[0];
+ sql = "SELECT OWNER, TABLE_NAME FROM dba_tables where
OWNER=?";
+ ps = connection.prepareStatement(sql);
+ ps.setString(1, schemaName);
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ // For Oracle: schema.table
+ String foundTable =
+ rs.getString("OWNER") + POINT +
rs.getString("TABLE_NAME");
+ schemaTables.add(foundTable);
+ LOG.info("Found table in Oracle: {}", foundTable);
+ }
Review Comment:
The current approach of manually implementing table scanning in JdbcSource
offers several key advantages:
Precise Control
1. Custom SQL
- queries tailored for each database type (Oracle, MySQL, SQLServer,
PostgreSQL)
- Direct access to system tables for accurate table information retrieval
2. Detailed Logging
- Comprehensive logging for debugging and troubleshooting
- Database-specific logging (e.g., LOG.info("Found table in PostgreSQL: {}",
foundTable))
3. Specialized Format Handling
- Dedicated handling for different database table path formats:
MySQL: database.table
PostgreSQL: database.schema.table
Oracle: schema.table
SQLServer: database.schema.table
4. Flexible Pattern Matching
- Two-step matching strategy in filterCapturedTablesByRegrex:First attempts
exact matches
Falls back to regex pattern matching
5. Reduced Dependencies
Functions independently of complete Catalog implementations
Works even when some databases have incomplete Catalog implementations
6. Compatibility Assurance
Explicitly defines supported databases via isSupportedDatabase
Provides fallback to JdbcCatalogUtils.getTables for unsupported database
types
While using the Catalog interface might lead to more unified code, the
current implementation provides greater flexibility and adaptability to
specific database behaviors, especially when handling differences in system
table structures and naming conventions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]