Anyone can help us on this issue? Best Regards, Brian
From: Zhou, Brian Sent: Wednesday, July 15, 2020 18:26 To: 'user@flink.apache.org' Subject: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint" Hi community, To give some background, https://github.com/pravega/flink-connectors is a Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` interface to trigger the Pravega checkpoint during Flink checkpoints to make sure the data recovery. We experienced the failures in the latest Flink 1.11 upgrade with the checkpoint recovery, there are some timeout issues for the continuous checkpoint failure on some of the test cases. Error stacktrace: 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint acknowledgement message org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 3. Failure reason: Failure to finalize checkpoint. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948) at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has not been fully acknowledged yet at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021) ... 9 common frames omitted After some investigation, the main problem is found. It is about the checkpoint recovery. When Flink CheckpointCoordinator wants to finalize a checkpoint, it needs to check everything is acknowledged, but for some reason, the master state still has our ReaderCheckpointHook remaining unack-ed, hence leading the checkpoint failure in the complete stage. In the PendingCheckpoint::snapshotMasterState, there is an async call to acknowledge the master state for each hook. But it returned before the acknowledgement. I think it might be related to the latest changes of the thread model of the checkpoint coordinator. Can someone help to verify? Reproduce procedure: Checkout this branch https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and run below test case: FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint [1] https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java Best Regards, Brian