PeterZh6 commented on code in PR #11130: URL: https://github.com/apache/inlong/pull/11130#discussion_r1771308552
########## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java: ########## @@ -496,6 +524,12 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); Review Comment: Thank you for the insightful question! Upon reviewing the code, to the best of my knowledge, it seems that the put method is actually executed first in the `snapshotState` method, which records the checkpoint start time as soon as a checkpoint is initiated. By the time `notifyCheckpointComplete` is called, the `checkpointStartTimeMap` should already contain the start time for that checkpoint. The `remove` method is then used in `notifyCheckpointComplete` to clean up the entry after the checkpoint has been successfully completed while also getting the start time of the corresponding checkpoint. This approach ensures that we track the start time for each checkpoint and prevent stale entries from accumulating in the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org