Hisoka-X commented on code in PR #9380:
URL: https://github.com/apache/seatunnel/pull/9380#discussion_r2131432439


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -27,36 +27,224 @@
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sourcetype.DatabaseTypeEnum;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import lombok.SneakyThrows;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 public class JdbcSource
         implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit, 
JdbcSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
-
+    private static final String POINT = ".";
     private final JdbcSourceConfig jdbcSourceConfig;
     private final Map<TablePath, JdbcSourceTable> jdbcSourceTables;
 
     @SneakyThrows
     public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
         this.jdbcSourceConfig = jdbcSourceConfig;
+        JdbcConnectionConfig jdbcConnectionConfig = 
jdbcSourceConfig.getJdbcConnectionConfig();
+        JdbcDialect jdbcDialect =
+                JdbcDialectLoader.load(
+                        jdbcConnectionConfig.getUrl(), 
jdbcConnectionConfig.getCompatibleMode());
+        if (!isSupportedDatabase(jdbcDialect.dialectName())) {
+            this.jdbcSourceTables =
+                    JdbcCatalogUtils.getTables(
+                            jdbcSourceConfig.getJdbcConnectionConfig(),
+                            jdbcSourceConfig.getTableConfigList());
+            return;
+        }
+        JdbcConnectionProvider connectionProvider =
+                
jdbcDialect.getJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
+        Connection connection = connectionProvider.getOrEstablishConnection();
+        List<JdbcSourceTableConfig> jdbcSourceTableConfigs = new ArrayList<>();
+        ResultSet rs = null;
+        PreparedStatement ps = null;
+        List<JdbcSourceTableConfig> tablePaths = 
jdbcSourceConfig.getTableConfigList();
+        try {
+            for (JdbcSourceTableConfig tableConfig : tablePaths) {
+                List<String> schemaTables = new ArrayList<>();
+                String tablePath = tableConfig.getTablePath();
+                String query = tableConfig.getQuery();
+                LOG.info("Processing table path: {}, custom query: {}", 
tablePath, query);
+                String sql;
+                if (StringUtils.isBlank(query)) {
+                    String schemaName;
+                    if 
(jdbcDialect.dialectName().startsWith(DatabaseTypeEnum.ORACLE.getValue())) {
+                        schemaName = tablePath.split("\\.")[0];
+                        sql = "SELECT OWNER, TABLE_NAME FROM dba_tables where 
OWNER=?";
+                        ps = connection.prepareStatement(sql);
+                        ps.setString(1, schemaName);
+                        rs = ps.executeQuery();
+                        while (rs.next()) {
+                            // For Oracle: schema.table
+                            String foundTable =
+                                    rs.getString("OWNER") + POINT + 
rs.getString("TABLE_NAME");
+                            schemaTables.add(foundTable);
+                            LOG.info("Found table in Oracle: {}", foundTable);
+                        }

Review Comment:
   Why not reuse JDBCCatalog to scan all should read tables? This way support 
all database type with catalog. 
https://github.com/apache/seatunnel/blob/dev/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java#L102



##########
docs/en/connector-v2/source/Jdbc.md:
##########
@@ -73,6 +73,49 @@ supports query SQL and can achieve projection effect.
 | split.string_split_mode                    | String  | No       | sample     
     | Supports different string splitting algorithms. By default, `sample` is 
used to determine the split by sampling the string value. You can switch to 
`charset_based` to enable charset-based string splitting algorithm. When set to 
`charset_based`, the algorithm assumes characters of partition_column are 
within ASCII range 32-126, which covers most character-based splitting 
scenarios.                                                                      
                                                                                
                                                                                
              |
 | split.string_split_mode_collate            | String  | No       | -          
     | Specifies the collation to use when string_split_mode is set to 
`charset_based` and the table has a special collation. If not specified, the 
database's default collation will be used.                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      |
 
+### Table Matching
+
+The JDBC Source connector supports two ways to specify tables:
+
+1. **Exact Table Path**: Use `table_path` to specify a single table with its 
full path.
+   ```hocon
+   table_path = "testdb.table1"
+   ```
+
+2. **Regular Expression**: Use `table_path` with a regex pattern to match 
multiple tables.
+   ```hocon
+   table_path = "testdb.table\\d+"  # Matches table1, table2, table3, etc.
+   ```
+
+#### Regular Expression Support
+
+The JDBC Source connector supports regular expressions for table matching. 
This is useful when you want to read multiple tables that follow a specific 
naming pattern.
+
+Examples:
+- `testdb.table+` - Matches all tables starting with "table" followed by 
numbers
+- `testdb.*` - Matches all tables in the `testdb` database

Review Comment:
   Does this expression will match database `testdb2` too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to