hql0312 commented on code in PR #3808:
URL: https://github.com/apache/flink-cdc/pull/3808#discussion_r1898300731


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java:
##########
@@ -112,7 +112,13 @@ private List<TableChange> readTableSchema(List<TableId> 
tableIds) throws SQLExce
                     tables,
                     dbzConfig.databaseName(),
                     null,
-                    dbzConfig.getTableFilters().dataCollectionFilter(),
+                    // only check context tableIds
+                    (tb) ->

Review Comment:
   in PostgresDialect will cache the TableChange which had load. but first 
time, all the table is not loaded.
   you can trace JdbcSourceChunkSplitter.generateSplits method, which will call 
analyzeTable, then call PostgresqlDialect.queryTableSchema ,and finally call 
CustomPostgresSchema.getTableSchema, the code follow:
   ```java
    public TableChange getTableSchema(TableId tableId) {
           // read schema from cache first
           if (!schemasByTableId.containsKey(tableId)) {
               try {
                   readTableSchema(Collections.singletonList(tableId));
               } catch (SQLException e) {
                   throw new FlinkRuntimeException("Failed to read table 
schema", e);
               }
           }
           return schemasByTableId.get(tableId);
       }
   ```
   this will call readTableSchema for the tableId which had not loaded. 
   
   in readTableSchema method will call jdbcConnection.readSchema with 
dbzConfig.getTableFilters().dataCollectionFilter() 
   ```java
   public void readSchema(Tables tables, String databaseCatalog, String 
schemaNamePattern,
                              TableFilter tableFilter, ColumnNameFilter 
columnFilter, boolean removeTablesNotFoundInJdbc)
               throws SQLException {
           // Before we make any changes, get the copy of the set of table IDs 
...
           Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
   
           // Read the metadata for the table columns ...
           DatabaseMetaData metadata = connection().getMetaData();
   
           // Find regular and materialized views as they cannot be snapshotted
           final Set<TableId> viewIds = new HashSet<>();
           final Set<TableId> tableIds = new HashSet<>();
   
           int totalTables = 0;
           // **the logic will load the tables which match 
dbzConfig.getTableFilters().dataCollectionFilter()**
           try (final ResultSet rs = metadata.getTables(databaseCatalog, 
schemaNamePattern, null, supportedTableTypes())) {
               while (rs.next()) {
                   final String catalogName = 
resolveCatalogName(rs.getString(1));
                   final String schemaName = rs.getString(2);
                   final String tableName = rs.getString(3);
                   final String tableType = rs.getString(4);
                   if (isTableType(tableType)) {
                       totalTables++;
                       TableId tableId = new TableId(catalogName, schemaName, 
tableName);
                       // ** match  
dbzConfig.getTableFilters().dataCollectionFilter() will add all tables ** 
                       if (tableFilter == null || 
tableFilter.isIncluded(tableId)) {
                           tableIds.add(tableId);
                       }
                   }
                   else {
                       TableId tableId = new TableId(catalogName, schemaName, 
tableName);
                       viewIds.add(tableId);
                   }
               }
           }
   
           Map<TableId, List<Column>> columnsByTable = new HashMap<>();
   
           if (totalTables == tableIds.size() || 
config.getBoolean(RelationalDatabaseConnectorConfig.SNAPSHOT_FULL_COLUMN_SCAN_FORCE))
 {
               columnsByTable = getColumnsDetails(databaseCatalog, 
schemaNamePattern, null, tableFilter, columnFilter, metadata, viewIds);
           }
           else {
               // **load the tables which match 
dbzConfig.getTableFilters().dataCollectionFilter() for each tableId**
               for (TableId includeTable : tableIds) {
                   LOGGER.debug("Retrieving columns of table {}", includeTable);
   
                   Map<TableId, List<Column>> cols = 
getColumnsDetails(databaseCatalog, schemaNamePattern, includeTable.table(), 
tableFilter,
                           columnFilter, metadata, viewIds);
                   columnsByTable.putAll(cols);
               }
           }
   
           // Read the metadata for the primary keys ...
           for (Entry<TableId, List<Column>> tableEntry : 
columnsByTable.entrySet()) {
               // First get the primary key information, which must be done for 
*each* table ...
               List<String> pkColumnNames = 
readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
   
               // Then define the table ...
               List<Column> columns = tableEntry.getValue();
               Collections.sort(columns);
               String defaultCharsetName = null; // JDBC does not expose 
character sets
               tables.overwriteTable(tableEntry.getKey(), columns, 
pkColumnNames, defaultCharsetName);
           }
   
           if (removeTablesNotFoundInJdbc) {
               // Remove any definitions for tables that were not found in the 
database metadata ...
               tableIdsBefore.removeAll(columnsByTable.keySet());
               tableIdsBefore.forEach(tables::removeTable);
           }
       }
   ```
   
   
   ,this will load all tables which match 
dbzConfig.getTableFilters().dataCollectionFilter() . the logic will run once 
for each tableId which is the first to load, you can focus the comment content 
with **
   actually we should only load the table which is passed by readTableSchema 
method‘s parameter.
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to