Hi, We're running into an issue where jobs appear to automatically cleaning up their state and restarting without it in certain failure modes.
Relevant setup details: Flink 1.15 execution.target: kubernetes-application high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONIn this scenario, we had a job with a retained checkpoint. Upon resubmitting the job, the job ran into a failure: Classifying stack trace: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:821) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:784) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:701) at co.decodable.flink.environment.JobExecutionEnvironment.startJob(JobExecutionEnvironment.java:204) at co.decodable.flink.ReportingExecutionEnvironment.executeSqlAndReport(ReportingExecutionEnvironment.java:152) at co.decodable.flink.ReportingExecutionEnvironment.startSqlJob(ReportingExecutionEnvironment.java:100) at co.decodable.flink.FlinkApplication.main(FlinkApplication.java:133) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:295) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'jobname'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2108) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:804) ... 28 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1769) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ... 1 more Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state cf4c9f432e13b8f187c4a2a73fe0ee6f at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1766) ... 3 more Caused by: java.lang.IllegalStateException: There is no operator for the state cf4c9f432e13b8f187c4a2a73fe0ee6f at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733) at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) ... 3 more After this happened, the job appeared to clean up its HA state, in particular removing the configmap reference to the retained checkpoint. After the job exited, because this is a Kubernetes deployment, another container was started which did not attempt to resume from previous state, as the checkpoint was no longer referenced. We understand the root cause of the operator error, but we would expect that the externalized checkpoint reference would be retained in this failure mode. Has anyone else run into this issue? Best, Max Feng