hql0312 commented on code in PR #3808: URL: https://github.com/apache/flink-cdc/pull/3808#discussion_r1898300731
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java: ########## @@ -112,7 +112,13 @@ private List<TableChange> readTableSchema(List<TableId> tableIds) throws SQLExce tables, dbzConfig.databaseName(), null, - dbzConfig.getTableFilters().dataCollectionFilter(), + // only check context tableIds + (tb) -> Review Comment: in PostgresDialect will cache the TableChange which had load. but first time, all the table is not loaded. you can trace JdbcSourceChunkSplitter.generateSplits method, which will call analyzeTable, then call PostgresqlDialect.queryTableSchema ,and finally call CustomPostgresSchema.getTableSchema, the code follow: ```java public TableChange getTableSchema(TableId tableId) { // read schema from cache first if (!schemasByTableId.containsKey(tableId)) { try { readTableSchema(Collections.singletonList(tableId)); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to read table schema", e); } } return schemasByTableId.get(tableId); } ``` this will call readTableSchema for the tableId which had not loaded. in readTableSchema method will call jdbcConnection.readSchema with dbzConfig.getTableFilters().dataCollectionFilter() ```java public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) throws SQLException { // Before we make any changes, get the copy of the set of table IDs ... Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds()); // Read the metadata for the table columns ... DatabaseMetaData metadata = connection().getMetaData(); // Find regular and materialized views as they cannot be snapshotted final Set<TableId> viewIds = new HashSet<>(); final Set<TableId> tableIds = new HashSet<>(); int totalTables = 0; // **the logic will load the tables which match dbzConfig.getTableFilters().dataCollectionFilter()** try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, supportedTableTypes())) { while (rs.next()) { final String catalogName = resolveCatalogName(rs.getString(1)); final String schemaName = rs.getString(2); final String tableName = rs.getString(3); final String tableType = rs.getString(4); if (isTableType(tableType)) { totalTables++; TableId tableId = new TableId(catalogName, schemaName, tableName); // ** match dbzConfig.getTableFilters().dataCollectionFilter() will add all tables ** if (tableFilter == null || tableFilter.isIncluded(tableId)) { tableIds.add(tableId); } } else { TableId tableId = new TableId(catalogName, schemaName, tableName); viewIds.add(tableId); } } } Map<TableId, List<Column>> columnsByTable = new HashMap<>(); if (totalTables == tableIds.size() || config.getBoolean(RelationalDatabaseConnectorConfig.SNAPSHOT_FULL_COLUMN_SCAN_FORCE)) { columnsByTable = getColumnsDetails(databaseCatalog, schemaNamePattern, null, tableFilter, columnFilter, metadata, viewIds); } else { // **load the tables which match dbzConfig.getTableFilters().dataCollectionFilter() for each tableId** for (TableId includeTable : tableIds) { LOGGER.debug("Retrieving columns of table {}", includeTable); Map<TableId, List<Column>> cols = getColumnsDetails(databaseCatalog, schemaNamePattern, includeTable.table(), tableFilter, columnFilter, metadata, viewIds); columnsByTable.putAll(cols); } } // Read the metadata for the primary keys ... for (Entry<TableId, List<Column>> tableEntry : columnsByTable.entrySet()) { // First get the primary key information, which must be done for *each* table ... List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey()); // Then define the table ... List<Column> columns = tableEntry.getValue(); Collections.sort(columns); String defaultCharsetName = null; // JDBC does not expose character sets tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName); } if (removeTablesNotFoundInJdbc) { // Remove any definitions for tables that were not found in the database metadata ... tableIdsBefore.removeAll(columnsByTable.keySet()); tableIdsBefore.forEach(tables::removeTable); } } ``` ,this will load all tables which match dbzConfig.getTableFilters().dataCollectionFilter() . the logic will run once for each tableId which is the first to load, you can focus the comment content with ** actually we should only load the table which is passed by readTableSchema method‘s parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org