[ https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jinzhong Li updated FLINK-28515: -------------------------------- Description: In my case, i found that some files in local recovery directory hasn't be clean up properly after checkpoint abort(as shown in the attached picture). By analyzing flink log, I found that when stateBackend completes the local snapshot but the task has not completed the whole snapshot, then checkpoint is aborted (caused by checkpoint timeout or netword-error), files in the local directory directory may not be cleaned up properly. I think the reason for local snapshot file residual is: (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, the comleted localSnapshot info can be registered into org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has completed the whole snapshot. ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). (2) If stateBackend completes the local snapshot but the task has not completed the entire snapshot, when checkpoint-aborting is triggered, the TaskLocalStateStore can't clean up the unregistered localSnapshot files. ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) (3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, it will cancel all the ongoing stateSnapshot futureTask in 'AsyncCheckpointRunnable.close()'. For rocksdbKeyedStatebackend, [AsyncSnapshotTask.cancel |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will be invoke during checkpoint abort. After this, the [RocksDBIncrementalSnapshotOperation.get |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may still run until it completes. And the localSnapshot files can't be cleaned up in RocksDBIncrementalSnapshotOperation.get(finally) and AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]). Then the localSnapshot files also can't be cleaned up in [AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391], because [AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78] return ture. To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, we can try to delete the corresponding localRecovery directory, even if the checkpoint is not unregistered into TaskLocalStateStoreImpl. was: In my case, i found that some files in local recovery directory hasn't be clean up properly after checkpoint abort(as shown in the attached picture). By analyzing flink log, I found that when stateBackend completes the local snapshot but the task has not completed the whole snapshot, then checkpoint is aborted (caused by checkpoint timeout or netword-error), files in the local directory directory may not be cleaned up properly. I think the reason for local snapshot file residual is: (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, the comleted localSnapshot info can be registered into org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has completed the whole snapshot. ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). (2) If stateBackend completes the local snapshot but the task has not completed the entire snapshot, when checkpoint-aborting is triggered, the TaskLocalStateStore can't clean up the unregistered localSnapshot files. ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) (3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, it will cancel all the ongoing stateSnapshot futureTask in 'AsyncCheckpointRunnable.close()'. To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, we can try to delete the corresponding localRecovery directory, even if the checkpoint is not unregistered into TaskLocalStateStoreImpl. > The files in local recovery directory hasn't be clean up properly after > checkpoint abort > ---------------------------------------------------------------------------------------- > > Key: FLINK-28515 > URL: https://issues.apache.org/jira/browse/FLINK-28515 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.15.1, 1.16.0 > Reporter: Jinzhong Li > Assignee: Jinzhong Li > Priority: Major > Labels: pull-request-available > Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, > image-2022-07-19-18-28-20-239.png, image.png > > > In my case, i found that some files in local recovery directory hasn't be > clean up properly after checkpoint abort(as shown in the attached picture). > By analyzing flink log, I found that when stateBackend completes the local > snapshot but the task has not completed the whole snapshot, > then checkpoint is aborted (caused by checkpoint timeout or netword-error), > files in the local directory directory may not be cleaned up properly. > I think the reason for local snapshot file residual is: > (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, > the comleted localSnapshot info can be registered into > org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has > completed the whole snapshot. > ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). > (2) If stateBackend completes the local snapshot but the task has not > completed the entire snapshot, when checkpoint-aborting is triggered, the > TaskLocalStateStore can't clean up the unregistered localSnapshot files. > ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) > (3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, > it will cancel all the ongoing stateSnapshot futureTask in > 'AsyncCheckpointRunnable.close()'. For rocksdbKeyedStatebackend, > [AsyncSnapshotTask.cancel > |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will > be invoke during checkpoint abort. After this, the > [RocksDBIncrementalSnapshotOperation.get > |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may > still run until it completes. > And the localSnapshot files can't be cleaned up in > RocksDBIncrementalSnapshotOperation.get(finally) and > AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]). > Then the localSnapshot files also can't be cleaned up in > [AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391], > because > [AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78] > return ture. > > To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, > we can try to delete the corresponding localRecovery directory, even if the > checkpoint is not unregistered into TaskLocalStateStoreImpl. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)