Hangxiang Yu created FLINK-37686: ------------------------------------ Summary: ForSt consecutive failed checkpoint due to check deleted file status when enableFileDeletions Key: FLINK-37686 URL: https://issues.apache.org/jira/browse/FLINK-37686 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 2.1.0 Reporter: Hangxiang Yu Assignee: Hangxiang Yu
Currently, ForSt will try to reuse files when checkpoint. When reuse, FileOwnership is NOT_OWNED {code:java} 2025-04-16 13:03:55,952 INFO org.apache.flink.state.forst.fs.filemapping.FileMappingManager [] - Give up ownership for file: MappingEntry{source=HandleBackedSource{stateHandle=File State: hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e [52273557 bytes]}, fileOwnership=NOT_OWNED, isDirectory= false}, the source is now backed by: File State: hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e [52273557 bytes] {code} and this file maybe deleted by JM in async checkpoint cleaner thread. {code:java} 2025-04-16 13:07:01,920 INFO org.apache.flink.runtime.state.SharedStateRegistryImpl [] - Scheduled delete of state handle File State: hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e [52273557 bytes].{code} But it may still be recorded in ForSt ObsoleteFiles, everytime enableFileDeletions is called, ForSt will check whether all obsolete files are existed, and may fails if it's deleted by JM. {code:java} 2025-04-16 13:07:53,123 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 8 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1744780073120 for job e3fca74b64c70018276be5ab859a2ce2. 2025-04-16 13:08:04,848 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 8 by task 8bbfb2b8ab9d60131688ca9a1b38fa4c_90bea66de1c231edf33913ecd54406c1_0_0 of job e3fca74b64c70018276be5ab859a2ce2 at application-flink-k8s-1744779470362-5783573-taskmanager-1-1. org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 8 for operator GroupAggregate[4] -> Calc[5] -> Sink: print_sink[6] (1/1)#0. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ... 4 more Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File does not exist: hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: java.io.FileNotFoundException: File does not exist: hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1417) ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7] at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1409) ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7] at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1425) ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7] at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:320) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:356) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.forstdb.RocksDB.enableFileDeletions(Native Method) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.forstdb.RocksDB.enableFileDeletions(RocksDB.java:4312) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.lambda$syncPrepareResources$3(ForStNativeFullSnapshotStrategy.java:188) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase$ForStNativeSnapshotResources.release(ForStSnapshotStrategyBase.java:247) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy$ForStIncrementalSnapshotOperation.get(ForStIncrementalSnapshotStrategy.java:287) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ... 3 more{code} We should do special check for NOT_OWNED file when enableFileDeletions. -- This message was sent by Atlassian Jira (v8.20.10#820010)