Hi, We're running Flink 1.20, native kubernetes application-mode clusters, and we're running into an issue where clusters are restarting without checkpoints from HA metadata.
To the best of our understanding, here's what's happening: 1) We're running application-mode clusters in native kubernetes with externalized checkpoints, retained on cancellation. We're attempting to restore a job from a checkpoint; the checkpoint reference is held in the Kubernetes HA configmap. 2) We encounter an exception while mapping state to the new jobgraph.[1] We understand the nature of the exception, the question is not about the exception, it is about the handling of the exception. 3) The job goes to state FAILED.[2] 4) The HA configmap containing the checkpoint reference is cleaned up,[3] 5) The Kubernetes pod exits. Because it is a Kubernetes deployment, the pod is immediately restarted. 6) Upon restart, the new Jobmanager finds no checkpoints to restore from.[4] This behavior was unexpected to say the least. For comparison, we recreated the scenario with a savepoint, passed in via execution.savepoint.path. The cluster repeatedly failed with the state mapping exception, which appeared to be reasonable behavior. How can we make the cluster not automatically restart without checkpoint metadata in this scenario? Logs Appendix [1] org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$ 0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863 ) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire( CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run( CompletableFuture.java:1773) at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:67) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635 ) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state ad8761465be643c10db5fae153b87f68 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315 ) at java.base/java.util.concurrent.CompletableFuture.completeThrowable( CompletableFuture.java:320) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770 ) ... 4 more Caused by: java.lang.IllegalStateException: There is no operator for the state ad8761465be643c10db5fae153b87f68 at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness (StateAssignmentOperation.java:769) at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:101 ) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal (CheckpointCoordinator.java:1829) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1749 ) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph (DefaultExecutionGraphFactory.java:210) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:382 ) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>( SchedulerBase.java:225) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:142 ) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance( DefaultSchedulerFactory.java:162) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121 ) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler( JobMaster.java:406) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:383) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService (DefaultJobMasterServiceFactory.java:128) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100 ) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4 (FunctionUtils.java:112) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768 ) ... 4 more [2] Job 00000000000000000000000000000000 reached terminal state FAILED [3] Clean up the high availability data for job 00000000000000000000000000000000. [4] No checkpoint found during restore. Best, Max Feng