Hi! I would consider using the Flink Kubernetes Operator, which already avoids scenarios like this.
One possible workaround that we use there is when we expect it to restart from HA metadata we still set execution.savepoint.path to a DUMMY path, so that if this issue happens it won't start without checkpoint and fails with DUMMY not found. Cheers, Gyula On Thu, Mar 6, 2025 at 4:37 AM Max Feng <max.mf.f...@gmail.com> wrote: > We're considering avoiding point 4 above (The HA configmap containing the > checkpoint reference is cleaned up) by patching our deployment of Flink to > never have flink-kubernetes clean up job configmaps, by commenting out the > contents of KubernetesLeaderElectionHaServices#internalCleanupJobData. Are > there any obvious reasons we should not do this? > > - Max > > On Sun, Mar 2, 2025 at 5:41 PM Max Feng <max.mf.f...@gmail.com> wrote: > >> Hi, >> >> We're running Flink 1.20, native kubernetes application-mode clusters, >> and we're running into an issue where clusters are restarting without >> checkpoints from HA metadata. >> >> To the best of our understanding, here's what's happening: >> 1) We're running application-mode clusters in native kubernetes with >> externalized checkpoints, retained on cancellation. We're attempting to >> restore a job from a checkpoint; the checkpoint reference is held in the >> Kubernetes HA configmap. >> 2) We encounter an exception while mapping state to the new jobgraph.[1] >> We understand the nature of the exception, the question is not about the >> exception, it is about the handling of the exception. >> 3) The job goes to state FAILED.[2] >> 4) The HA configmap containing the checkpoint reference is cleaned up,[3] >> 5) The Kubernetes pod exits. Because it is a Kubernetes deployment, the >> pod is immediately restarted. >> 6) Upon restart, the new Jobmanager finds no checkpoints to restore >> from.[4] >> >> This behavior was unexpected to say the least. For comparison, we >> recreated the scenario with a savepoint, passed in via >> execution.savepoint.path. The cluster repeatedly failed with the state >> mapping exception, which appeared to be reasonable behavior. How can we >> make the cluster not automatically restart without checkpoint metadata in >> this scenario? >> >> Logs Appendix >> >> [1] org.apache.flink.runtime.client.JobInitializationException: Could >> not start the JobMaster. at >> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$ >> 0(DefaultJobMasterServiceProcess.java:97) at >> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863 >> ) at >> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire( >> CompletableFuture.java:841) at >> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510 >> ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run( >> CompletableFuture.java:1773) at >> org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:67) >> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1136) at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635 >> ) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: >> java.util.concurrent.CompletionException: >> java.lang.IllegalStateException: There is no operator for the state >> ad8761465be643c10db5fae153b87f68 at >> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315 >> ) at java.base/java.util.concurrent.CompletableFuture.completeThrowable( >> CompletableFuture.java:320) at >> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770 >> ) ... 4 more Caused by: java.lang.IllegalStateException: There is no >> operator for the state ad8761465be643c10db5fae153b87f68 at >> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness >> (StateAssignmentOperation.java:769) at >> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:101 >> ) at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal >> (CheckpointCoordinator.java:1829) at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1749 >> ) at >> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph >> (DefaultExecutionGraphFactory.java:210) at >> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:382 >> ) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>( >> SchedulerBase.java:225) at >> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:142 >> ) at >> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance >> (DefaultSchedulerFactory.java:162) at >> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121 >> ) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler( >> JobMaster.java:406) at >> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:383) >> at >> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService >> (DefaultJobMasterServiceFactory.java:128) at >> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100 >> ) at >> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4( >> FunctionUtils.java:112) at >> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768 >> ) ... 4 more >> >> [2] Job 00000000000000000000000000000000 reached terminal state FAILED >> >> [3] Clean up the high availability data for job >> 00000000000000000000000000000000. >> >> [4] No checkpoint found during restore. >> >> Best, >> Max Feng >> >