[ 
https://issues.apache.org/jira/browse/FLINK-25305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-25305:
-----------------------------------
    Labels: pull-request-available  (was: )

> Always wait for input channel state and result partition state get completed 
> in AsyncRunnable
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25305
>                 URL: https://issues.apache.org/jira/browse/FLINK-25305
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
>  
> {code:java}
> 29245 [jobmanager-io-thread-12] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline 
> checkpoint 16 by task 07fea3eb73acb4898317b4aa2c9fea30 of job 
> da6de908107aa847cde5e9e0beb4812b at 064277c9-73dc-4bf2-8729-91ab16bbe8c6 @ 
> localhost (dataPort=-1).org.apache.flink.util.SerializedThrowable: 
> Asynchronous task checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:321)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:158)
>  ~[classes/:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_271]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_271]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
> Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize 
> checkpoint 16 for operator keyed (1/5)#5.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:299)
>  ~[classes/:?]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) 
> ~[classes/:?]
>     at 
> org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261)
>  ~[classes/:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1193)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:248)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:139)
>  ~[classes/:?]
>     ... 3 more
>  {code}
>  
> When both unaligned checkpoint and final checkpoint is enabled, some 
> checkpoints would fail due to the above exception at the async phase, 
> indicating that the checkpoint metric futures are not fully fulfilled. 
> The exception should be caused by when a task is restored with previously 
> fully finished, when taking checkpoint, we would skip snapshotting the state 
> of the operators. Specially, we would also not includes the 
> InputChannelStates and the ResultPartitionState attached to the operator. 
> Then with unaligned checkpoint, there would be the following bad case:
> 1. The task received the first barrier.
> 2. With the process of unaligned checkpoint, the task would snapshot the 
> state of the operators.
> 3. The checkpoint would start the asynchronous part.
> 4. Normally in the asynchronous part, it would wait till all the state 
> futures get done, including the channel states and result partition states. 
> With this method, it ensures the asynchronous part would wait till the last 
> barrier arrived. But if the task has been fully finished before, these states 
> are ignored and the assumption is broken.
> 5. Then the asynchronous part would fail since when it try to build the 
> CheckpointMetrics, the alignment for this checkpoint is in fact not done yet.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to