[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
-------------------------------
    Description: 
If a checkpoint times out, the task will cancel all snapshot futures and do 
some cleanup work, including the following:
 * Cancel all AsyncSnapshotTasks.
 * If the future has finished, it will clean up all state object.
 * If the future has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 
 

  was:
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotTasks.
 * If the future has finished, it will clean up all state object.
 * If the future has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 
 


> Can not clean up the uploaded shared files when the checkpoint fails
> --------------------------------------------------------------------
>
>                 Key: FLINK-28927
>                 URL: https://issues.apache.org/jira/browse/FLINK-28927
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.6, 1.15.1
>         Environment: Flink-1.11
>            Reporter: ChangjiGuo
>            Assignee: ChangjiGuo
>            Priority: Major
>
> If a checkpoint times out, the task will cancel all snapshot futures and do 
> some cleanup work, including the following:
>  * Cancel all AsyncSnapshotTasks.
>  * If the future has finished, it will clean up all state object.
>  * If the future has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to