[ https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569936#comment-17569936 ]
Jinzhong Li commented on FLINK-28515: ------------------------------------- Thanks for reply [~roman] I'd like to explain the reason for this issue in more detail. Please correct me if anything is wrong. >> So if it's already running, cleanup() can be skipped by the current thread. closeSnapshotIO() only closes the registry, and I don't see that the folder is registered with it. This is right, for this case, closeSnapshotIO() will close all the CheckpointStreams which belong to the folder, but it will not delete the folder. 1. But if cleanup() method is skipped here, it will still be invoke in [AsyncSnapshotCallable.call() finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87] . 2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not COMPLETED.(AsyncSnapshotTask.cleanup() -> IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp) {code:java} public boolean cleanup() throws IOException { if (state.compareAndSet(State.ONGOING, State.DELETED)) { FileUtils.deleteDirectory(directory.toFile()); } return true; } {code} 3. AsyncSnapshotTask.callInternal() will invoke RocksDBIncrementalSnapshotOperation.get(CloseableRegistry snapshotCloseableRegistry), in which the folder status will be [transformed from ONGOING to COMPLETED|[https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L422].] (1) the localSnapshot folder can't be cleaned-up by AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED; (2) the localSnapshot folder can't be cleaned-up by RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the completed flag is ture. {code:java} @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { try{ ....... completed = true; return snapshotResult; } finally { if (!completed) { ......... } } } {code} > The files in local recovery directory hasn't be clean up properly after > checkpoint abort > ---------------------------------------------------------------------------------------- > > Key: FLINK-28515 > URL: https://issues.apache.org/jira/browse/FLINK-28515 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.15.1, 1.16.0 > Reporter: Jinzhong Li > Assignee: Jinzhong Li > Priority: Major > Labels: pull-request-available > Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, > image-2022-07-19-18-28-20-239.png, image.png > > > In my case, i found that some files in local recovery directory hasn't be > clean up properly after checkpoint abort(as shown in the attached picture). > By analyzing flink log, I found that when stateBackend completes the local > snapshot but the task has not completed the whole snapshot, > then checkpoint is aborted (caused by checkpoint timeout or netword-error), > files in the local directory directory may not be cleaned up properly. > I think the reason for local snapshot file residual is: > (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, > the comleted localSnapshot info can be registered into > org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task has > completed the whole snapshot. > ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]). > (2) If stateBackend completes the local snapshot but the task has not > completed the entire snapshot, when checkpoint-aborting is triggered, the > TaskLocalStateStore can't clean up the unregistered localSnapshot files. > ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301]) > (3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, > it will cancel all the ongoing stateSnapshot futureTask in > 'AsyncCheckpointRunnable.close()'. For rocksdbKeyedStatebackend, > [AsyncSnapshotTask.cancel > |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will > be invoke during checkpoint abort. After this, the > [RocksDBIncrementalSnapshotOperation.get > |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may > still run until it completes. > And the localSnapshot files can't be cleaned up in > RocksDBIncrementalSnapshotOperation.get(finally) and > AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]). > Then the localSnapshot files also can't be cleaned up in > [AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391], > because > [AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78] > return ture. > > To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, > we can try to delete the corresponding localRecovery directory, even if the > checkpoint is not unregistered into TaskLocalStateStoreImpl. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)