Hi,

We're running Flink 1.20, native kubernetes application-mode clusters, and
we're running into an issue where clusters are restarting without
checkpoints from HA metadata.

To the best of our understanding, here's what's happening:
1) We're running application-mode clusters in native kubernetes with
externalized checkpoints, retained on cancellation. We're attempting to
restore a job from a checkpoint; the checkpoint reference is held in the
Kubernetes HA configmap.
2) We encounter an exception while mapping state to the new jobgraph.[1] We
understand the nature of the exception, the question is not about the
exception, it is about the handling of the exception.
3) The job goes to state FAILED.[2]
4) The HA configmap containing the checkpoint reference is cleaned up,[3]
5) The Kubernetes pod exits. Because it is a Kubernetes deployment, the pod
is immediately restarted.
6) Upon restart, the new Jobmanager finds no checkpoints to restore from.[4]

This behavior was unexpected to say the least. For comparison, we recreated
the scenario with a savepoint, passed in via execution.savepoint.path. The
cluster repeatedly failed with the state mapping exception, which appeared
to be reasonable behavior. How can we make the cluster not automatically
restart without checkpoint metadata in this scenario?

Logs Appendix

[1] org.apache.flink.runtime.client.JobInitializationException: Could not
start the JobMaster. at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$
0(DefaultJobMasterServiceProcess.java:97) at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863
) at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
CompletableFuture.java:841) at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510
) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(
CompletableFuture.java:1773) at
org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:67) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1136) at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635
) at java.base/java.lang.Thread.run(Thread.java:840) Caused by:
java.util.concurrent.CompletionException: java.lang.IllegalStateException:
There is no operator for the state ad8761465be643c10db5fae153b87f68 at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315
) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(
CompletableFuture.java:320) at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770
) ... 4 more Caused by: java.lang.IllegalStateException: There is no
operator for the state ad8761465be643c10db5fae153b87f68 at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness
(StateAssignmentOperation.java:769) at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:101
) at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal
(CheckpointCoordinator.java:1829) at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1749
) at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph
(DefaultExecutionGraphFactory.java:210) at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:382
) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(
SchedulerBase.java:225) at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:142
) at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(
DefaultSchedulerFactory.java:162) at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121
) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(
JobMaster.java:406) at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:383) at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService
(DefaultJobMasterServiceFactory.java:128) at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100
) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4
(FunctionUtils.java:112) at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768
) ... 4 more

[2] Job 00000000000000000000000000000000 reached terminal state FAILED

[3] Clean up the high availability data for job
00000000000000000000000000000000.

[4] No checkpoint found during restore.

Best,
Max Feng

Reply via email to