ruanhang1993 commented on code in PR #3548:
URL: https://github.com/apache/flink-cdc/pull/3548#discussion_r1722636805


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java:
##########
@@ -183,17 +183,29 @@ public static MySqlBinlogSplit appendFinishedSplitInfos(
      *
      * <p>When restore from a checkpoint, the finished split infos may contain 
some splits from the
      * deleted tables. We need to remove these splits from the total finished 
split infos and update
-     * the size.
+     * the size, while also removing the outdated tables from the table 
schemas of binlog split.
      */
     public static MySqlBinlogSplit filterOutdatedSplitInfos(
             MySqlBinlogSplit binlogSplit, Tables.TableFilter 
currentTableFilter) {
+        Map<TableId, TableChange> newTableSchemas =

Review Comment:
   ```suggestion
           Map<TableId, TableChange> filteredTableSchemas =
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java:
##########
@@ -183,17 +183,29 @@ public static MySqlBinlogSplit appendFinishedSplitInfos(
      *
      * <p>When restore from a checkpoint, the finished split infos may contain 
some splits from the
      * deleted tables. We need to remove these splits from the total finished 
split infos and update
-     * the size.
+     * the size, while also removing the outdated tables from the table 
schemas of binlog split.
      */
     public static MySqlBinlogSplit filterOutdatedSplitInfos(
             MySqlBinlogSplit binlogSplit, Tables.TableFilter 
currentTableFilter) {
+        Map<TableId, TableChange> newTableSchemas =
+                binlogSplit.getTableSchemas().entrySet().stream()
+                        .filter(entry -> 
currentTableFilter.isIncluded(entry.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
         Set<TableId> tablesToRemove =

Review Comment:
   ```suggestion
           Set<TableId> tablesToRemoveInFinishedSnapshotSplitInfos =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to