[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580261#comment-17580261
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/29/22 1:03 PM:
-------------------------------------------------------------

[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling _stateFuture.cancel(true)._ That is to 
say, we can't handle it in _FutureTask#run, and_ have to wait until the run 
method finishes. I have an idea that we can override the _FutureTask#set_ 
method and check if it can be discarded.
{code:java}
public class AsyncSnapshotTask extends FutureTask<T> {

    @Override
    protected void set(T t) {
        super.set(t);
        if (isCancelled()) {
            if (t instanceof SnapshotResult) {
                try {
                    ((SnapshotResult<?>) t).discardState();
                } catch (Exception e) {
                    LOG.warn("clean this occured error", e);
                }
            }
        } else {
            // super.set(t) has modified internal state, cancel will return 
false, StateUtil will clean it.
        }
    }
}{code}

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry 
snapshotCloseableRegistry)
        throws Exception {

    boolean completed = false;
    ....

    final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
    // Handles to the misc files in the current snapshot will go here
    final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();

    try {
        ....
        ....

        uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);

        ....
        ....

        completed = true;

        return snapshotResult;
    } finally {
        if (!completed) {
            final List<StateObject> statesToDiscard =
                    new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
            statesToDiscard.add(metaStateHandle);
            statesToDiscard.addAll(miscFiles.values());
            statesToDiscard.addAll(sstFiles.values());
            cleanupIncompleteSnapshot(statesToDiscard);
        }
    }{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete too.
 # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks 
is cancelled(can't actually be interrupted), the _FutureTask#run_ will execute 
normally. The second way can clean up all state files.

I'm Looking forward for your reply! Thx.
 
 


was (Author: changjiguo):
[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling {_}stateFuture.cancel(true){_}, so we must 
clean up state after future is really done. The solution that comes to my mind 
is that we can discard state at _AsyncCheckpointRunnable#run_ in finally block 
according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry 
snapshotCloseableRegistry)
        throws Exception {

    boolean completed = false;
    ....

    final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
    // Handles to the misc files in the current snapshot will go here
    final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();

    try {
        ....
        ....

        uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);

        ....
        ....

        completed = true;

        return snapshotResult;
    } finally {
        if (!completed) {
            final List<StateObject> statesToDiscard =
                    new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
            statesToDiscard.add(metaStateHandle);
            statesToDiscard.addAll(miscFiles.values());
            statesToDiscard.addAll(sstFiles.values());
            cleanupIncompleteSnapshot(statesToDiscard);
        }
    }{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete too.
 # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks 
is cancelled(can't actually be interrupted), the future will return normally. 
The second way can clean up all state files.

I'm Looking forward for your reply! Thx.
 
 

> Can not clean up the uploaded shared files when the checkpoint fails
> --------------------------------------------------------------------
>
>                 Key: FLINK-28927
>                 URL: https://issues.apache.org/jira/browse/FLINK-28927
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.6, 1.15.1
>         Environment: Flink-1.11
>            Reporter: ChangjiGuo
>            Assignee: ChangjiGuo
>            Priority: Major
>
> If a checkpoint times out, the task will cancel all snapshot futures and do 
> some cleanup work, including the following:
>  * Cancel all AsyncSnapshotTasks.
>  * If the future has finished, it will clean up all state object.
>  * If the future has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to