[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580261#comment-17580261 ]
ChangjiGuo edited comment on FLINK-28927 at 8/18/22 3:31 AM: ------------------------------------------------------------- [~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up uploaded files in two ways: # For being interrupted or getting a unexpected exception while waiting for the future to complete, we can catch exception, and then set a callback for each future and discard the stream state handle(as mentioned above). # For uninterrupted case, that is, the files have been uploaded, but the AsyncSnapshotTask is still running when canceled. First, we need to collect all uploaded files, which can be cleaned up according to whether the AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure the order of getting value of _isCancelled()_ and calling {_}stateFuture.cancel(true){_}, so we must clean up state after future is done. In my opinion, we could discard state at _AsyncCheckpointRunnable#run_ in finally block according to whether _asyncCheckpointState_ is {_}AsyncCheckpointState.DISCARDED{_}. I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ is as follows: {code:java} @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { boolean completed = false; .... final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // Handles to the misc files in the current snapshot will go here final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); try { .... .... uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry); .... .... completed = true; return snapshotResult; } finally { if (!completed) { final List<StateObject> statesToDiscard = new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(sstFiles.values()); cleanupIncompleteSnapshot(statesToDiscard); } }{code} Here are some cases I can think of: # Interrupted while uploading sst files, and no misc file uploaded yet, the first way can clean up uploaded sst files and delete the file while waiting for the unfinished future to complete. # Interrupted while uploading misc files, _sstFiles_ already contains all uploaded sst files and it will be cleaned because of completed is false. The first way also can clean up uploaded misc files and delete the file while waiting for the unfinished future to complete. # Both sst files and misc files have been uploaded, but the future is cancelled(can't actually be interrupted), the future will return normally. The second way can clean up all states. Looking forward to your reply! Thx. was (Author: changjiguo): [~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up uploaded files in two ways: # For being interrupted or getting a unexpected exception while waiting for the future to complete, we can catch exception, and then set a callback for each future and discard the stream state handle(as mentioned above). # For uninterrupted case, that is, the files have been uploaded, but the AsyncSnapshotTask is still running when canceled. First, we need to collect all uploaded files, which can be cleaned up according to whether the AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure the order of getting value of _isCancelled()_ and calling {_}stateFuture.cancel(true){_}, so we must clean up state after future is done. In my opinion, we could discard state at _AsyncCheckpointRunnable#run_ in finally block according to whether _asyncCheckpointState_ is {_}AsyncCheckpointState.DISCARDED{_}. I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ is as follows: {code:java} @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { boolean completed = false; .... final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // Handles to the misc files in the current snapshot will go here final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); try { .... .... uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry); .... .... completed = true; return snapshotResult; } finally { if (!completed) { final List<StateObject> statesToDiscard = new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(sstFiles.values()); cleanupIncompleteSnapshot(statesToDiscard); } }{code} Here are some cases I can think of: # Interrupted while uploading sst files, and no misc file uploaded yet, the first way can clean up uploaded sst files. # Interrupted while uploading misc files, _sstFiles_ already contains all uploaded sst files and it will be cleaned because of completed is false. The first way also can clean up uploaded misc files. # Both sst files and misc files have been uploaded, but the future is cancelled(can't actually be interrupted), the future will return normally. The second way can clean up all states. Looking forward to your reply! Thx. > Can not clean up the uploaded shared files when the checkpoint fails > -------------------------------------------------------------------- > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 > Reporter: ChangjiGuo > Assignee: ChangjiGuo > Priority: Major > > If a checkpoint times out, the task will cancel all snapshot futures and do > some cleanup work, including the following: > * Cancel all AsyncSnapshotTasks. > * If the future has finished, it will clean up all state object. > * If the future has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)