Hi,ssmi, no duplicates are produced — exactly-once is guaranteed on the source side, not downstream.
Before the `BinlogSplit` is suspended to accept newly-added tables, each existing table's finished snapshot-split `highWatermark` is **forwarded to the current binlog reading position** `P_current`. Later, when the `startingOffset` is rewound to the minimum `highWatermark` `P_min` (so the new table can pick up its snapshot tail), `BinlogSplitReader.shouldEmit()` filters records per-table: events from existing tables in `(P_min, P_current]` are dropped because their `highWatermark` is now `P_current`, while new-table events from `(P_new, ...)` pass through. Offset rewinds globally, but per-table watermarks prevent re-emission. *Key Code* - `BinlogSplitReader.java <https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L321>` `shouldEmit()` — per-record, per-table filter against `highWatermark`. - `MySqlBinlogSplit.java <https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java#L337>` `forwardHighWatermarkToStartingOffset()` — the dedup primitive; rewrites old splits' `highWatermark` to `P_current` at suspend time. - `MySqlBinlogSplit.java <https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java#L209>` `appendFinishedSplitInfos()` — rewinds `startingOffset` to `min(old, new highWatermarks)` after new-table snapshot completes. If you have any further questions, feel free to discuss. Best, Yanquan lec ssmi <[email protected]> 于2026年6月9日周二 22:07写道: > Hi Flink CDC community, > > I have a question regarding incremental snapshot behavior when restoring a > Flink CDC job from checkpoint. > > Suppose a CDC job is restored from a checkpoint and, at the same time, new > tables are added into the capture list (whitelist). > > As I understand the current implementation: > > 1. > > Existing tables continue consuming binlog. > 2. > > Newly added tables start incremental snapshot. > 3. > > After all snapshot splits finish, the BinlogSplit obtains all finished > snapshot split infos from SplitEnumerator. > 4. > > The new binlog split start offset is adjusted to the minimum watermark > among all completed snapshot splits. > > My confusion is about possible duplicate consumption. > > During the period when BinlogSplit runs independently (before switching to > the adjusted start offset), existing tables may already consume and emit > some binlog events. > > Later, after the binlog start offset moves backward to the minimum > snapshot watermark, those previously consumed events appear to become > readable again. > > From my reading, shouldEmit() seems to compare the current binlog > position with snapshot offsets to decide whether to emit records. > > However, I could not fully understand how duplication is avoided for > existing tables whose records were already emitted during that standalone > binlog reading period. > > Specifically: > > - > > Is there additional state tracking to prevent re-emitting events > already processed before offset adjustment? > - > > Or is duplication possible in this scenario and expected to be handled > downstream? > - > > Which component guarantees exactly-once semantics here? > > I may have misunderstood some part of the snapshot/binlog coordination > logic, so any clarification would be greatly appreciated. > > Thanks! >
