[
https://issues.apache.org/jira/browse/FLINK-37686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hangxiang Yu updated FLINK-37686:
---------------------------------
Attachment: EnableFileDeletions.png
> ForSt consecutive failed checkpoint due to check deleted file status when
> enableFileDeletions
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-37686
> URL: https://issues.apache.org/jira/browse/FLINK-37686
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 2.1.0
> Reporter: Hangxiang Yu
> Assignee: Hangxiang Yu
> Priority: Major
> Attachments: EnableFileDeletions.png, Full Scan Obsolete Files.png
>
>
> Currently, ForSt will try to reuse files when checkpoint.
> When reuse, FileOwnership is NOT_OWNED
>
> {code:java}
> 2025-04-16 13:03:55,952 INFO
> org.apache.flink.state.forst.fs.filemapping.FileMappingManager [] - Give up
> ownership for file: MappingEntry{source=HandleBackedSource{stateHandle=File
> State:
> hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
> [52273557 bytes]}, fileOwnership=NOT_OWNED, isDirectory= false}, the source
> is now backed by: File State:
> hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
> [52273557 bytes] {code}
>
> and this file maybe deleted by JM in async checkpoint cleaner thread.
>
> {code:java}
> 2025-04-16 13:07:01,920 INFO
> org.apache.flink.runtime.state.SharedStateRegistryImpl [] - Scheduled
> delete of state handle File State:
> hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
> [52273557 bytes].{code}
>
> But it may still be recorded in ForSt ObsoleteFiles, everytime
> enableFileDeletions is called, ForSt will check whether all obsolete files
> are existed, and may fails if it's deleted by JM.
> {code:java}
> 2025-04-16 13:07:53,123 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 8 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1744780073120 for job
> e3fca74b64c70018276be5ab859a2ce2.
> 2025-04-16 13:08:04,848 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline
> checkpoint 8 by task
> 8bbfb2b8ab9d60131688ca9a1b38fa4c_90bea66de1c231edf33913ecd54406c1_0_0 of job
> e3fca74b64c70018276be5ab859a2ce2 at
> application-flink-k8s-1744779470362-5783573-taskmanager-1-1.
> org.apache.flink.util.SerializedThrowable:
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task
> checkpoint failed.
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception:
> Could not materialize checkpoint 8 for operator GroupAggregate[4] -> Calc[5]
> -> Sink: print_sink[6] (1/1)#0.
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File
> does not exist:
> hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
> at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
> at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
> at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.io.FileNotFoundException: File does not exist:
> hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1417)
> ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1409)
> ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1425)
> ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:320)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:356)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at org.forstdb.RocksDB.enableFileDeletions(Native Method)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at org.forstdb.RocksDB.enableFileDeletions(RocksDB.java:4312)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.lambda$syncPrepareResources$3(ForStNativeFullSnapshotStrategy.java:188)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase$ForStNativeSnapshotResources.release(ForStSnapshotStrategyBase.java:247)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy$ForStIncrementalSnapshotOperation.get(ForStIncrementalSnapshotStrategy.java:287)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
> at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
> ... 3 more{code}
> We should do special check for NOT_OWNED file when enableFileDeletions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)