We're considering avoiding point 4 above (The HA configmap containing the
checkpoint reference is cleaned up) by patching our deployment of Flink to
never have flink-kubernetes clean up job configmaps, by commenting out the
contents of KubernetesLeaderElectionHaServices#internalCleanupJobData. Are
there any obvious reasons we should not do this?

- Max

On Sun, Mar 2, 2025 at 5:41 PM Max Feng <max.mf.f...@gmail.com> wrote:

> Hi,
>
> We're running Flink 1.20, native kubernetes application-mode clusters, and
> we're running into an issue where clusters are restarting without
> checkpoints from HA metadata.
>
> To the best of our understanding, here's what's happening:
> 1) We're running application-mode clusters in native kubernetes with
> externalized checkpoints, retained on cancellation. We're attempting to
> restore a job from a checkpoint; the checkpoint reference is held in the
> Kubernetes HA configmap.
> 2) We encounter an exception while mapping state to the new jobgraph.[1]
> We understand the nature of the exception, the question is not about the
> exception, it is about the handling of the exception.
> 3) The job goes to state FAILED.[2]
> 4) The HA configmap containing the checkpoint reference is cleaned up,[3]
> 5) The Kubernetes pod exits. Because it is a Kubernetes deployment, the
> pod is immediately restarted.
> 6) Upon restart, the new Jobmanager finds no checkpoints to restore
> from.[4]
>
> This behavior was unexpected to say the least. For comparison, we
> recreated the scenario with a savepoint, passed in via
> execution.savepoint.path. The cluster repeatedly failed with the state
> mapping exception, which appeared to be reasonable behavior. How can we
> make the cluster not automatically restart without checkpoint metadata in
> this scenario?
>
> Logs Appendix
>
> [1] org.apache.flink.runtime.client.JobInitializationException: Could not
> start the JobMaster. at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$
> 0(DefaultJobMasterServiceProcess.java:97) at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863
> ) at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:841) at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510
> ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1773) at
> org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:67) at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1136) at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635
> ) at java.base/java.lang.Thread.run(Thread.java:840) Caused by:
> java.util.concurrent.CompletionException: java.lang.IllegalStateException:
> There is no operator for the state ad8761465be643c10db5fae153b87f68 at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315
> ) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(
> CompletableFuture.java:320) at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770
> ) ... 4 more Caused by: java.lang.IllegalStateException: There is no
> operator for the state ad8761465be643c10db5fae153b87f68 at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness
> (StateAssignmentOperation.java:769) at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:101
> ) at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal
> (CheckpointCoordinator.java:1829) at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1749
> ) at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph
> (DefaultExecutionGraphFactory.java:210) at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:382
> ) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(
> SchedulerBase.java:225) at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:142
> ) at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(
> DefaultSchedulerFactory.java:162) at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121
> ) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(
> JobMaster.java:406) at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:383)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService
> (DefaultJobMasterServiceFactory.java:128) at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100
> ) at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(
> FunctionUtils.java:112) at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768
> ) ... 4 more
>
> [2] Job 00000000000000000000000000000000 reached terminal state FAILED
>
> [3] Clean up the high availability data for job
> 00000000000000000000000000000000.
>
> [4] No checkpoint found during restore.
>
> Best,
> Max Feng
>

Reply via email to