fredia commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1184493394
########## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java: ########## @@ -394,18 +394,14 @@ public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) { .forEach( localHandle -> { changelogRegistry.stopTracking(localHandle); - localChangelogRegistry.register(localHandle, checkpointId); }); - } - - @Override - public void subsume(long checkpointId) { localChangelogRegistry.discardUpToCheckpoint(checkpointId); } @Override public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) { - localChangelogRegistry.prune(checkpointId); + // delete all accumulated local dstl files when abort + localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1); Review Comment: As we discuss offline, because changelog state backend is incremental, when a checkpoint is aborted, all subsequent checkpoints that depend on this checkpoint are incomplete, It is equivalent to all previous checkpoints that are no longer useful. Therefore, we broaden the responsibility of `#abort`/`#prune` and delete all the previous useless local checkpoints. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org