[ https://issues.apache.org/jira/browse/FLINK-20648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253417#comment-17253417 ]
Yang Wang commented on FLINK-20648: ----------------------------------- I think we could have the similar logics in {{CheckpointCoordinator}} . When restoring from savepoint, we do not need to update the ConfigMap/ZooKeeper. Instead, we use a private long variable for the storage, and compare it with {{checkpointIdCounter.getAndIncrement()}} in the {{initializeCheckpoint}} . Does it make sense? Or you prefer blocking the constructor of {{KubernetesLeaderElectionDriver}}. IIUC, the checkpoint and counter from initial savepoint do not need to be stored in the HA storage. And they should not be stored in the HA storage since the leader has not been elected. > Unable to restore job from savepoint when using Kubernetes based HA services > ---------------------------------------------------------------------------- > > Key: FLINK-20648 > URL: https://issues.apache.org/jira/browse/FLINK-20648 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes > Affects Versions: 1.12.0 > Reporter: David Morávek > Assignee: Yang Wang > Priority: Blocker > Fix For: 1.13.0, 1.12.1 > > > When restoring job from savepoint, we always end up with following error: > {code} > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not instantiate JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > ... 3 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Stopped > retrying the operation because the error is not retryable. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > ~[?:?] > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?] > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:150) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:211) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1479) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:325) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:266) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > ... 3 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Stopped retrying the operation because the error is not retryable. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:166) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > ... 3 more > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot > retry checkAndUpdateConfigMap with configMap > pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader > because it does not exist. > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?] > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > ... 3 more > Caused by: > org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot > retry checkAndUpdateConfigMap with configMap > pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader > because it does not exist. > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?] > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > {code} > Cause of the issue is following: > - We construct `jobMasterServices` prior starting `leaderElectionService` (in > `JobManagerRunnerImpl`) > - During `jobMasterServices` initialization > `tryRestoreExecutionGraphFromSavepoint` gets called. This calls > `KubernetesStateHandleStore.addAndLock` interally. > - `KubernetesStateHandleStore.addAndLock` expects configmap for JM leadership > to be already present, which is wrong, because `leaderElectionService` which > is responsible for its creation has not started yet > Possible fixes: > - Start `leaderElectionService` before `jobMasterServices` > - Fix `KubernetesStateHandleStore`, so it can handle the case, when leader > hasn't been elected -- This message was sent by Atlassian Jira (v8.3.4#803005)