zoltar9264 commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1170220547
########## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java: ########## @@ -51,14 +52,15 @@ * <li>Store the meta of files into {@link ChangelogTaskLocalStateStore} by * AsyncCheckpointRunnable#reportCompletedSnapshotStates(). * <li>Pass control of the file to {@link LocalChangelogRegistry#register} when - * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous - * checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at - * the same time. + * FsStateChangelogWriter#persist , files of the previous checkpoint will be deleted by + * {@link LocalChangelogRegistry#discardUpToCheckpoint} when the previous checkpoint is + * confirmed. Review Comment: Yes, the cleaning up of local files depends on the newly completed checkpoint, local files will accumulate if no checkpoint could complete. IIUC, remote dstl file has same problem. I do encounter in our environment that remote dstl files accumulate until reach the HDFS single directory file limit. <img width="683" alt="image" src="https://user-images.githubusercontent.com/16664055/232822869-ae8ccf9f-3caf-42d2-9d5b-1d19523612cd.png"> I suggest open a new ticket to solve this problem both in remote and local, WDYT @Myasuka ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org