[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580261#comment-17580261 ]
ChangjiGuo edited comment on FLINK-28927 at 8/29/22 1:03 PM: ------------------------------------------------------------- [~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up uploaded files in two ways: # For being interrupted or getting a unexpected exception while waiting for the future to complete, we can catch exception, and then set a callback for each future and discard the stream state handle(as mentioned above). # For uninterrupted case, that is, the files have been uploaded, but the AsyncSnapshotTask is still running when canceled. First, we need to collect all uploaded files, which can be cleaned up according to whether the AsyncSnapshotTask is canceled. But there is a uncertainty problem that we cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we will delete these files) and calling _stateFuture.cancel(true)._ That is to say, we can't handle it in _FutureTask#run, and_ have to wait until the run method finishes. I have an idea that we can override the _FutureTask#set_ method and check if it can be discarded. {code:java} public class AsyncSnapshotTask extends FutureTask<T> { @Override protected void set(T t) { super.set(t); if (isCancelled()) { if (t instanceof SnapshotResult) { try { ((SnapshotResult<?>) t).discardState(); } catch (Exception e) { LOG.warn("clean this occured error", e); } } } else { // super.set(t) has modified internal state, cancel will return false, StateUtil will clean it. } } }{code} I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as follows: {code:java} @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { boolean completed = false; .... final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // Handles to the misc files in the current snapshot will go here final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); try { .... .... uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry); .... .... completed = true; return snapshotResult; } finally { if (!completed) { final List<StateObject> statesToDiscard = new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(sstFiles.values()); cleanupIncompleteSnapshot(statesToDiscard); } }{code} Here are some cases I can think of: # Interrupted while uploading sst files, and no misc file uploaded yet, the first way can clean up uploaded sst files and delete the file while waiting for the unfinished future to complete. # Interrupted while uploading misc files, _sstFiles_ already contains all uploaded sst files and it will be cleaned because of completed is false. The first way also can clean up uploaded misc files and delete the file while waiting for the unfinished future to complete too. # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks is cancelled(can't actually be interrupted), the _FutureTask#run_ will execute normally. The second way can clean up all state files. I'm Looking forward for your reply! Thx. was (Author: changjiguo): [~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up uploaded files in two ways: # For being interrupted or getting a unexpected exception while waiting for the future to complete, we can catch exception, and then set a callback for each future and discard the stream state handle(as mentioned above). # For uninterrupted case, that is, the files have been uploaded, but the AsyncSnapshotTask is still running when canceled. First, we need to collect all uploaded files, which can be cleaned up according to whether the AsyncSnapshotTask is canceled. But there is a uncertainty problem that we cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we will delete these files) and calling {_}stateFuture.cancel(true){_}, so we must clean up state after future is really done. The solution that comes to my mind is that we can discard state at _AsyncCheckpointRunnable#run_ in finally block according to whether _asyncCheckpointState_ is {_}AsyncCheckpointState.DISCARDED{_}. I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as follows: {code:java} @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { boolean completed = false; .... final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // Handles to the misc files in the current snapshot will go here final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); try { .... .... uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry); .... .... completed = true; return snapshotResult; } finally { if (!completed) { final List<StateObject> statesToDiscard = new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(sstFiles.values()); cleanupIncompleteSnapshot(statesToDiscard); } }{code} Here are some cases I can think of: # Interrupted while uploading sst files, and no misc file uploaded yet, the first way can clean up uploaded sst files and delete the file while waiting for the unfinished future to complete. # Interrupted while uploading misc files, _sstFiles_ already contains all uploaded sst files and it will be cleaned because of completed is false. The first way also can clean up uploaded misc files and delete the file while waiting for the unfinished future to complete too. # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks is cancelled(can't actually be interrupted), the future will return normally. The second way can clean up all state files. I'm Looking forward for your reply! Thx. > Can not clean up the uploaded shared files when the checkpoint fails > -------------------------------------------------------------------- > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 > Reporter: ChangjiGuo > Assignee: ChangjiGuo > Priority: Major > > If a checkpoint times out, the task will cancel all snapshot futures and do > some cleanup work, including the following: > * Cancel all AsyncSnapshotTasks. > * If the future has finished, it will clean up all state object. > * If the future has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)