We're considering avoiding point 4 above (The HA configmap containing the checkpoint reference is cleaned up) by patching our deployment of Flink to never have flink-kubernetes clean up job configmaps, by commenting out the contents of KubernetesLeaderElectionHaServices#internalCleanupJobData. Are there any obvious reasons we should not do this?
- Max On Sun, Mar 2, 2025 at 5:41 PM Max Feng <max.mf.f...@gmail.com> wrote: > Hi, > > We're running Flink 1.20, native kubernetes application-mode clusters, and > we're running into an issue where clusters are restarting without > checkpoints from HA metadata. > > To the best of our understanding, here's what's happening: > 1) We're running application-mode clusters in native kubernetes with > externalized checkpoints, retained on cancellation. We're attempting to > restore a job from a checkpoint; the checkpoint reference is held in the > Kubernetes HA configmap. > 2) We encounter an exception while mapping state to the new jobgraph.[1] > We understand the nature of the exception, the question is not about the > exception, it is about the handling of the exception. > 3) The job goes to state FAILED.[2] > 4) The HA configmap containing the checkpoint reference is cleaned up,[3] > 5) The Kubernetes pod exits. Because it is a Kubernetes deployment, the > pod is immediately restarted. > 6) Upon restart, the new Jobmanager finds no checkpoints to restore > from.[4] > > This behavior was unexpected to say the least. For comparison, we > recreated the scenario with a savepoint, passed in via > execution.savepoint.path. The cluster repeatedly failed with the state > mapping exception, which appeared to be reasonable behavior. How can we > make the cluster not automatically restart without checkpoint metadata in > this scenario? > > Logs Appendix > > [1] org.apache.flink.runtime.client.JobInitializationException: Could not > start the JobMaster. at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$ > 0(DefaultJobMasterServiceProcess.java:97) at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863 > ) at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire( > CompletableFuture.java:841) at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510 > ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run( > CompletableFuture.java:1773) at > org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:67) at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1136) at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635 > ) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: > java.util.concurrent.CompletionException: java.lang.IllegalStateException: > There is no operator for the state ad8761465be643c10db5fae153b87f68 at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315 > ) at java.base/java.util.concurrent.CompletableFuture.completeThrowable( > CompletableFuture.java:320) at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770 > ) ... 4 more Caused by: java.lang.IllegalStateException: There is no > operator for the state ad8761465be643c10db5fae153b87f68 at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness > (StateAssignmentOperation.java:769) at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:101 > ) at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal > (CheckpointCoordinator.java:1829) at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1749 > ) at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph > (DefaultExecutionGraphFactory.java:210) at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:382 > ) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>( > SchedulerBase.java:225) at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:142 > ) at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance( > DefaultSchedulerFactory.java:162) at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121 > ) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler( > JobMaster.java:406) at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:383) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService > (DefaultJobMasterServiceFactory.java:128) at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100 > ) at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4( > FunctionUtils.java:112) at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768 > ) ... 4 more > > [2] Job 00000000000000000000000000000000 reached terminal state FAILED > > [3] Clean up the high availability data for job > 00000000000000000000000000000000. > > [4] No checkpoint found during restore. > > Best, > Max Feng >