[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ChangjiGuo updated FLINK-28927: ------------------------------- Description: If a checkpoint times out, the task will cancel all snapshot futures and do some cleanup work, including the following: * Cancel all AsyncSnapshotTasks. * If the future has finished, it will clean up all state object. * If the future has not completed, it will be interrupted(maybe). * Close snapshotCloseableRegistry. In my case, the thread was interrupted while waiting for the file upload to complete, but the file was not cleaned up. RocksDBStateUploader.java {code:java} FutureUtils.waitForAll(futures.values()).get(); {code} It will wait for all files to be uploaded here. Although it has been interrupted, the uploaded files will not be cleaned up. The remaining files are mainly divided into: * Files that have finished uploading before the thread is canceled. * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry has not been closed. How to reproduce? Shorten the checkpoint timeout time, making the checkpoint fail. Then check if there are any files in the shared directory. I'm testing on Flink-1.11, but I found the code from the latest branch may have the same problem. I tried to fix it. was: If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable thread and do some cleanup work, including the following: * Cancel all AsyncSnapshotTasks. * If the future has finished, it will clean up all state object. * If the future has not completed, it will be interrupted(maybe). * Close snapshotCloseableRegistry. In my case, the thread was interrupted while waiting for the file upload to complete, but the file was not cleaned up. RocksDBStateUploader.java {code:java} FutureUtils.waitForAll(futures.values()).get(); {code} It will wait for all files to be uploaded here. Although it has been interrupted, the uploaded files will not be cleaned up. The remaining files are mainly divided into: * Files that have finished uploading before the thread is canceled. * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry has not been closed. How to reproduce? Shorten the checkpoint timeout time, making the checkpoint fail. Then check if there are any files in the shared directory. I'm testing on Flink-1.11, but I found the code from the latest branch may have the same problem. I tried to fix it. > Can not clean up the uploaded shared files when the checkpoint fails > -------------------------------------------------------------------- > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 > Reporter: ChangjiGuo > Assignee: ChangjiGuo > Priority: Major > > If a checkpoint times out, the task will cancel all snapshot futures and do > some cleanup work, including the following: > * Cancel all AsyncSnapshotTasks. > * If the future has finished, it will clean up all state object. > * If the future has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)