yzeng1618 commented on code in PR #9380:
URL: https://github.com/apache/seatunnel/pull/9380#discussion_r2131458719


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -27,36 +27,224 @@
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sourcetype.DatabaseTypeEnum;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import lombok.SneakyThrows;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 public class JdbcSource
         implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit, 
JdbcSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
-
+    private static final String POINT = ".";
     private final JdbcSourceConfig jdbcSourceConfig;
     private final Map<TablePath, JdbcSourceTable> jdbcSourceTables;
 
     @SneakyThrows
     public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
         this.jdbcSourceConfig = jdbcSourceConfig;
+        JdbcConnectionConfig jdbcConnectionConfig = 
jdbcSourceConfig.getJdbcConnectionConfig();
+        JdbcDialect jdbcDialect =
+                JdbcDialectLoader.load(
+                        jdbcConnectionConfig.getUrl(), 
jdbcConnectionConfig.getCompatibleMode());
+        if (!isSupportedDatabase(jdbcDialect.dialectName())) {
+            this.jdbcSourceTables =
+                    JdbcCatalogUtils.getTables(
+                            jdbcSourceConfig.getJdbcConnectionConfig(),
+                            jdbcSourceConfig.getTableConfigList());
+            return;
+        }
+        JdbcConnectionProvider connectionProvider =
+                
jdbcDialect.getJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
+        Connection connection = connectionProvider.getOrEstablishConnection();
+        List<JdbcSourceTableConfig> jdbcSourceTableConfigs = new ArrayList<>();
+        ResultSet rs = null;
+        PreparedStatement ps = null;
+        List<JdbcSourceTableConfig> tablePaths = 
jdbcSourceConfig.getTableConfigList();
+        try {
+            for (JdbcSourceTableConfig tableConfig : tablePaths) {
+                List<String> schemaTables = new ArrayList<>();
+                String tablePath = tableConfig.getTablePath();
+                String query = tableConfig.getQuery();
+                LOG.info("Processing table path: {}, custom query: {}", 
tablePath, query);
+                String sql;
+                if (StringUtils.isBlank(query)) {
+                    String schemaName;
+                    if 
(jdbcDialect.dialectName().startsWith(DatabaseTypeEnum.ORACLE.getValue())) {
+                        schemaName = tablePath.split("\\.")[0];
+                        sql = "SELECT OWNER, TABLE_NAME FROM dba_tables where 
OWNER=?";
+                        ps = connection.prepareStatement(sql);
+                        ps.setString(1, schemaName);
+                        rs = ps.executeQuery();
+                        while (rs.next()) {
+                            // For Oracle: schema.table
+                            String foundTable =
+                                    rs.getString("OWNER") + POINT + 
rs.getString("TABLE_NAME");
+                            schemaTables.add(foundTable);
+                            LOG.info("Found table in Oracle: {}", foundTable);
+                        }

Review Comment:
   The current approach of manually implementing table scanning in  JdbcSource 
offers several key advantages:
   Precise Control
   
   1. Custom SQL 
   
   - queries tailored for each database type (Oracle, MySQL, SQLServer, 
PostgreSQL)
   - Direct access to system tables for accurate table information retrieval
   
   2. Detailed Logging
   
   - Comprehensive logging for debugging and troubleshooting
   - Database-specific logging (e.g., LOG.info("Found table in PostgreSQL: {}", 
foundTable))
   
   3. Specialized Format Handling
   
   - Dedicated handling for different database table path formats:
   
       MySQL: database.table
       PostgreSQL: database.schema.table
       Oracle: schema.table
       SQLServer: database.schema.table
   
   4. Flexible Pattern Matching
   
   - Two-step matching strategy in  filterCapturedTablesByRegrex:First attempts 
exact matches
   Falls back to regex pattern matching
   
   5. Reduced Dependencies
   Functions independently of complete  Catalog implementations
   Works even when some databases have incomplete  Catalog implementations
   
   6. Compatibility Assurance
   Explicitly defines supported databases via  isSupportedDatabase
   Provides fallback to JdbcCatalogUtils.getTables for unsupported database 
types
   
   While using the  Catalog interface might lead to more unified code, the 
current implementation provides greater flexibility and adaptability to 
specific database behaviors, especially when handling differences in system 
table structures and naming conventions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to