Yun Gao created FLINK-25305:
-------------------------------

             Summary: Always wait for input channel state and result partition 
state get completed in AsyncRunnable
                 Key: FLINK-25305
                 URL: https://issues.apache.org/jira/browse/FLINK-25305
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Checkpointing
    Affects Versions: 1.15.0
            Reporter: Yun Gao


 
{code:java}
29245 [jobmanager-io-thread-12] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline 
checkpoint 16 by task 07fea3eb73acb4898317b4aa2c9fea30 of job 
da6de908107aa847cde5e9e0beb4812b at 064277c9-73dc-4bf2-8729-91ab16bbe8c6 @ 
localhost (dataPort=-1).org.apache.flink.util.SerializedThrowable: Asynchronous 
task checkpoint failed.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:321)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:158)
 ~[classes/:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_271]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_271]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize 
checkpoint 16 for operator keyed (1/5)#5.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:299)
 ~[classes/:?]
    ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) 
~[classes/:?]
    at 
org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261)
 ~[classes/:?]
    at 
org.apache.flink.util.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1193)
 ~[classes/:?]
    at 
org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:248)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:139)
 ~[classes/:?]
    ... 3 more
 {code}
 


When both unaligned checkpoint and final checkpoint is enabled, some 
checkpoints would fail due to the above exception at the async phase, 
indicating that the checkpoint metric futures are not fully fulfilled. 

The exception should be caused by when the operator get closed at first run, or 
a task is restored with previously fully finished, when taking checkpoint, we 
would skip snapshotting the state of the operators. Specially, we would also 
not includes the InputChannelStates and the ResultPartitionState attached to 
the operator. Then with unaligned checkpoint, there would be the following bad 
case:

1. The task received the first barrier.
2. With the process of unaligned checkpoint, the task would snapshot the state 
of the operators.
3. The checkpoint would start the asynchronous part.
4. Normally in the asynchronous part, it would wait till all the state futures 
get done, including the channel states and result partition states. With this 
method, it ensures the asynchronous part would wait till the last barrier 
arrived. But when we have closed operators and fully finished tasks, the 
situation is broken.
5. Then the asynchronous part would fail since when it try to build the 
CheckpointMetrics, the alignment for this checkpoint is in fact not done yet.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to