[ https://issues.apache.org/jira/browse/FLINK-25305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-25305: ----------------------------------- Labels: pull-request-available (was: ) > Always wait for input channel state and result partition state get completed > in AsyncRunnable > --------------------------------------------------------------------------------------------- > > Key: FLINK-25305 > URL: https://issues.apache.org/jira/browse/FLINK-25305 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing > Affects Versions: 1.15.0 > Reporter: Yun Gao > Assignee: Yun Gao > Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > > {code:java} > 29245 [jobmanager-io-thread-12] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline > checkpoint 16 by task 07fea3eb73acb4898317b4aa2c9fea30 of job > da6de908107aa847cde5e9e0beb4812b at 064277c9-73dc-4bf2-8729-91ab16bbe8c6 @ > localhost (dataPort=-1).org.apache.flink.util.SerializedThrowable: > Asynchronous task checkpoint failed. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:321) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:158) > ~[classes/:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_271] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_271] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271] > Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize > checkpoint 16 for operator keyed (1/5)#5. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:299) > ~[classes/:?] > ... 4 more > Caused by: org.apache.flink.util.SerializedThrowable > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) > ~[classes/:?] > at > org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261) > ~[classes/:?] > at > org.apache.flink.util.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1193) > ~[classes/:?] > at > org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:248) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:139) > ~[classes/:?] > ... 3 more > {code} > > When both unaligned checkpoint and final checkpoint is enabled, some > checkpoints would fail due to the above exception at the async phase, > indicating that the checkpoint metric futures are not fully fulfilled. > The exception should be caused by when a task is restored with previously > fully finished, when taking checkpoint, we would skip snapshotting the state > of the operators. Specially, we would also not includes the > InputChannelStates and the ResultPartitionState attached to the operator. > Then with unaligned checkpoint, there would be the following bad case: > 1. The task received the first barrier. > 2. With the process of unaligned checkpoint, the task would snapshot the > state of the operators. > 3. The checkpoint would start the asynchronous part. > 4. Normally in the asynchronous part, it would wait till all the state > futures get done, including the channel states and result partition states. > With this method, it ensures the asynchronous part would wait till the last > barrier arrived. But if the task has been fully finished before, these states > are ignored and the assumption is broken. > 5. Then the asynchronous part would fail since when it try to build the > CheckpointMetrics, the alignment for this checkpoint is in fact not done yet. > -- This message was sent by Atlassian Jira (v8.20.1#820001)