loserwang1024 commented on code in PR #3349: URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1613185257
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java: ########## @@ -104,6 +108,10 @@ public List<SourceSplitBase> snapshotState(long checkpointId) { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { + this.checkpointCount = (this.checkpointCount + 1) % this.checkpointCycle; Review Comment: @morazow, great job. I generally agree with your approach. However, I currently have a different perspective. Instead of committing at every third checkpoint cycle (rolling window), I prefer to commit the offsets three checkpoints in advance of current checkpoint (sliding window). For a detailed design, we can store successful checkpoint IDs in a min heap, whose size is three (as decided by the configuration). When a checkpoint is successfully performed, we can push its ID into the heap and take the minimum checkpoint ID value, then commit it. By doing this, we always have three checkpoints whose offsets have not been recycled. (P.S.: Let's log the heap at each checkpoint, so users can know from which checkpoint IDs they can restore.) @leonardBang , @ruanhang1993 , CC, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org