Hi,

We're running into an issue where jobs appear to automatically cleaning up
their state and restarting without it in certain failure modes.

Relevant setup details:
Flink 1.15
execution.target: kubernetes-application
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATIONIn this scenario, we had a job with a retained
checkpoint. Upon resubmitting the job, the job ran into a failure:

Classifying stack trace:
 org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:821)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:784)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:701)
at
co.decodable.flink.environment.JobExecutionEnvironment.startJob(JobExecutionEnvironment.java:204)
at
co.decodable.flink.ReportingExecutionEnvironment.executeSqlAndReport(ReportingExecutionEnvironment.java:152)
at
co.decodable.flink.ReportingExecutionEnvironment.startSqlJob(ReportingExecutionEnvironment.java:100)
at co.decodable.flink.FlinkApplication.main(FlinkApplication.java:133)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:295)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'jobname'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2108)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:804)
... 28 more
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start
the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.runtime.client.JobInitializationException:
Could not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1769)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
... 1 more
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: There is no operator for the state
cf4c9f432e13b8f187c4a2a73fe0ee6f
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1766)
... 3 more
Caused by: java.lang.IllegalStateException: There is no operator for the
state cf4c9f432e13b8f187c4a2a73fe0ee6f
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598)
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135)
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
... 3 more


After this happened, the job appeared to clean up its HA state, in
particular removing the configmap reference to the retained checkpoint.
After the job exited, because this is a Kubernetes deployment, another
container was started which did not attempt to resume from previous state,
as the checkpoint was no longer referenced.

We understand the root cause of the operator error, but we would expect
that the externalized checkpoint reference would be retained in this
failure mode. Has anyone else run into this issue?

Best,
Max Feng

Reply via email to