Hangxiang Yu created FLINK-37686:
------------------------------------

             Summary: ForSt consecutive failed checkpoint due to check deleted 
file status when enableFileDeletions
                 Key: FLINK-37686
                 URL: https://issues.apache.org/jira/browse/FLINK-37686
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 2.1.0
            Reporter: Hangxiang Yu
            Assignee: Hangxiang Yu


Currently, ForSt will try to reuse files when checkpoint.

When reuse, FileOwnership is NOT_OWNED

 
{code:java}
2025-04-16 13:03:55,952 INFO  
org.apache.flink.state.forst.fs.filemapping.FileMappingManager [] - Give up 
ownership for file: MappingEntry{source=HandleBackedSource{stateHandle=File 
State: 
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
 [52273557 bytes]}, fileOwnership=NOT_OWNED, isDirectory= false}, the source is 
now backed by: File State: 
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
 [52273557 bytes] {code}
 

and this file maybe deleted by JM in async checkpoint cleaner thread.

 
{code:java}
2025-04-16 13:07:01,920 INFO 
org.apache.flink.runtime.state.SharedStateRegistryImpl       [] - Scheduled 
delete of state handle File State: 
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
 [52273557 bytes].{code}
 

But it may still be recorded in ForSt ObsoleteFiles, everytime 
enableFileDeletions is called, ForSt will check whether all obsolete files are 
existed, and may fails if it's deleted by JM.
{code:java}
2025-04-16 13:07:53,123 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 8 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1744780073120 for job 
e3fca74b64c70018276be5ab859a2ce2.
2025-04-16 13:08:04,848 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
checkpoint 8 by task 
8bbfb2b8ab9d60131688ca9a1b38fa4c_90bea66de1c231edf33913ecd54406c1_0_0 of job 
e3fca74b64c70018276be5ab859a2ce2 at 
application-flink-k8s-1744779470362-5783573-taskmanager-1-1.
org.apache.flink.util.SerializedThrowable: 
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
checkpoint failed.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
Could not materialize checkpoint 8 for operator GroupAggregate[4] -> Calc[5] -> 
Sink: print_sink[6] (1/1)#0.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File 
does not exist: 
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.io.FileNotFoundException: File does not exist: 
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1417)
 ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1409)
 ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
    at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1425)
 ~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:320)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:356)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.forstdb.RocksDB.enableFileDeletions(Native Method) 
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.forstdb.RocksDB.enableFileDeletions(RocksDB.java:4312) 
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.lambda$syncPrepareResources$3(ForStNativeFullSnapshotStrategy.java:188)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase$ForStNativeSnapshotResources.release(ForStSnapshotStrategyBase.java:247)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy$ForStIncrementalSnapshotOperation.get(ForStIncrementalSnapshotStrategy.java:287)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    ... 3 more{code}
We should do special check for NOT_OWNED file when enableFileDeletions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to