loserwang1024 commented on code in PR #3808: URL: https://github.com/apache/flink-cdc/pull/3808#discussion_r1898288855
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java: ########## @@ -112,7 +112,13 @@ private List<TableChange> readTableSchema(List<TableId> tableIds) throws SQLExce tables, dbzConfig.databaseName(), null, - dbzConfig.getTableFilters().dataCollectionFilter(), + // only check context tableIds + (tb) -> Review Comment: >. each chunk will load a table schema PostgresDialect has aleady cache a table schema. > dbzConfig.getTableFilters().dataCollectionFilter(), But you modification also do same thing. the dbzConfig.getTableFilters().dataCollectionFilter() is in fellowing: ```java Predicate<TableId> tablePredicate = eligibleTables .includeTables( config.getFallbackStringProperty( RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, RelationalDatabaseConnectorConfig.TABLE_WHITELIST), tableIdMapper) .excludeTables( config.getFallbackStringProperty( RelationalDatabaseConnectorConfig.TABLE_EXCLUDE_LIST, RelationalDatabaseConnectorConfig.TABLE_BLACKLIST), tableIdMapper) .build(); ``` Maybe what you need to do is: In JM, use getTableSchema(List<TableId> tableIds) to load all the schema of table only once(also can loaded when need). In TM, use TableChange getTableSchema(TableId tableId) to load only one schema when use. After use, clean the table schema of old table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org