[ 
https://issues.apache.org/jira/browse/FLINK-38786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

laixiong updated FLINK-38786:
-----------------------------
    Description: 
does support oracle-cdc partitioned table for current version flink 1.20  , 
flink-cdc 3.5.0。

 

I found a sql in source code file: 

flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java

 
{code:java}
public static List<TableId> listTables(
        JdbcConnection jdbcConnection, RelationalTableFilters tableFilters)
        throws SQLException {
    final List<TableId> capturedTableIds = new ArrayList<>();

    Set<TableId> tableIdSet = new HashSet<>();
    String queryTablesSql =
            "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
                    + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME 
NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT 
PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";
    try {
        jdbcConnection.query(
                queryTablesSql,
                rs -> {
                    while (rs.next()) {
                        String schemaName = rs.getString(1);
                        String tableName = rs.getString(2);
                        TableId tableId =
                                new TableId(jdbcConnection.database(), 
schemaName, tableName);
                        tableIdSet.add(tableId);
                    }
                });
    } catch (SQLException e) {
        LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e);
    }

    for (TableId tableId : tableIdSet) {
        if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
            capturedTableIds.add(tableId);
            LOG.info("\t including '{}' for further processing", tableId);
        } else {
            LOG.debug("\t '{}' is filtered out of capturing", tableId);
        }
    }

    return capturedTableIds;
} {code}
this sql will filter out partitioned table, can it be optimized here?

  was:does support oracle-cdc partitioned table for current version flink 1.20  
, flink-cdc 3.5.0


> Does support oracle-cdc partitioned table now ?
> -----------------------------------------------
>
>                 Key: FLINK-38786
>                 URL: https://issues.apache.org/jira/browse/FLINK-38786
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: cdc-3.5.0
>            Reporter: laixiong
>            Priority: Major
>
> does support oracle-cdc partitioned table for current version flink 1.20  , 
> flink-cdc 3.5.0。
>  
> I found a sql in source code file: 
> flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java
>  
> {code:java}
> public static List<TableId> listTables(
>         JdbcConnection jdbcConnection, RelationalTableFilters tableFilters)
>         throws SQLException {
>     final List<TableId> capturedTableIds = new ArrayList<>();
>     Set<TableId> tableIdSet = new HashSet<>();
>     String queryTablesSql =
>             "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
>                     + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME 
> NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT 
> PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";
>     try {
>         jdbcConnection.query(
>                 queryTablesSql,
>                 rs -> {
>                     while (rs.next()) {
>                         String schemaName = rs.getString(1);
>                         String tableName = rs.getString(2);
>                         TableId tableId =
>                                 new TableId(jdbcConnection.database(), 
> schemaName, tableName);
>                         tableIdSet.add(tableId);
>                     }
>                 });
>     } catch (SQLException e) {
>         LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e);
>     }
>     for (TableId tableId : tableIdSet) {
>         if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
>             capturedTableIds.add(tableId);
>             LOG.info("\t including '{}' for further processing", tableId);
>         } else {
>             LOG.debug("\t '{}' is filtered out of capturing", tableId);
>         }
>     }
>     return capturedTableIds;
> } {code}
> this sql will filter out partitioned table, can it be optimized here?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to