PeterZh6 commented on code in PR #11130:
URL: https://github.com/apache/inlong/pull/11130#discussion_r1771308552


##########
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java:
##########
@@ -496,6 +524,12 @@ public void notifyCheckpointComplete(long checkpointId) {
                 schema.flushAudit();
                 schema.updateLastCheckpointId(checkpointId);
             }
+            // get the start time of the currently completed checkpoint
+            Long snapShotStartTimeById = 
checkpointStartTimeMap.remove(checkpointId);

Review Comment:
   Thank you for the insightful question! Upon reviewing the code, to the best 
of my knowledge, it seems that the put method is actually executed first in the 
`snapshotState` method, which records the checkpoint start time as soon as a 
checkpoint is initiated. By the time `notifyCheckpointComplete` is called, the 
`checkpointStartTimeMap` should already contain the start time for that 
checkpoint.
   
   The `remove` method is then used in `notifyCheckpointComplete` to clean up 
the entry after the checkpoint has been successfully completed while also 
getting the start time of the corresponding checkpoint. This approach ensures 
that we track the start time for each checkpoint and prevent stale entries from 
accumulating in the map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to