[
https://issues.apache.org/jira/browse/FLINK-38786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
laixiong updated FLINK-38786:
-----------------------------
Description:
does support oracle-cdc partitioned table for current version flink 1.20 ,
flink-cdc 3.5.0。
I found a sql in source code file:
flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java
{code:java}
public static List<TableId> listTables(
JdbcConnection jdbcConnection, RelationalTableFilters tableFilters)
throws SQLException {
final List<TableId> capturedTableIds = new ArrayList<>();
Set<TableId> tableIdSet = new HashSet<>();
String queryTablesSql =
"SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
+ "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME
NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT
PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";
try {
jdbcConnection.query(
queryTablesSql,
rs -> {
while (rs.next()) {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
TableId tableId =
new TableId(jdbcConnection.database(),
schemaName, tableName);
tableIdSet.add(tableId);
}
});
} catch (SQLException e) {
LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e);
}
for (TableId tableId : tableIdSet) {
if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
capturedTableIds.add(tableId);
LOG.info("\t including '{}' for further processing", tableId);
} else {
LOG.debug("\t '{}' is filtered out of capturing", tableId);
}
}
return capturedTableIds;
} {code}
this sql will filter out partitioned table, can it be optimized here?
was:does support oracle-cdc partitioned table for current version flink 1.20
, flink-cdc 3.5.0
> Does support oracle-cdc partitioned table now ?
> -----------------------------------------------
>
> Key: FLINK-38786
> URL: https://issues.apache.org/jira/browse/FLINK-38786
> Project: Flink
> Issue Type: Improvement
> Affects Versions: cdc-3.5.0
> Reporter: laixiong
> Priority: Major
>
> does support oracle-cdc partitioned table for current version flink 1.20 ,
> flink-cdc 3.5.0。
>
> I found a sql in source code file:
> flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java
>
> {code:java}
> public static List<TableId> listTables(
> JdbcConnection jdbcConnection, RelationalTableFilters tableFilters)
> throws SQLException {
> final List<TableId> capturedTableIds = new ArrayList<>();
> Set<TableId> tableIdSet = new HashSet<>();
> String queryTablesSql =
> "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
> + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME
> NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT
> PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";
> try {
> jdbcConnection.query(
> queryTablesSql,
> rs -> {
> while (rs.next()) {
> String schemaName = rs.getString(1);
> String tableName = rs.getString(2);
> TableId tableId =
> new TableId(jdbcConnection.database(),
> schemaName, tableName);
> tableIdSet.add(tableId);
> }
> });
> } catch (SQLException e) {
> LOG.warn(" SQL execute error, sql:{}", queryTablesSql, e);
> }
> for (TableId tableId : tableIdSet) {
> if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
> capturedTableIds.add(tableId);
> LOG.info("\t including '{}' for further processing", tableId);
> } else {
> LOG.debug("\t '{}' is filtered out of capturing", tableId);
> }
> }
> return capturedTableIds;
> } {code}
> this sql will filter out partitioned table, can it be optimized here?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)