[ https://issues.apache.org/jira/browse/FLINK-20648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253266#comment-17253266 ]
Yang Wang commented on FLINK-20648: ----------------------------------- Yes, the {{KubernetesLeaderElector#run}} is started in a separate thread. Once started, it tries to create the ConfigMap immediately. Since we have http IO operation here, we could not guarantee that the ConfigMap is created in given time. In most cases, it could be done in one round of loop(5 seconds). I quickly gone though the PR of FLINK-11719. And actually we have the similar implementation[1]. I verified it could works and recover from the savepoint successfully. But just as you said, we introduce some behavior changes, and it seems that we need to fix some more tests. [1]. https://github.com/wangyang0918/flink/commit/3be9df9063280693526c1b4446ec499a024a9059 > Unable to restore job from savepoint when using Kubernetes based HA services > ---------------------------------------------------------------------------- > > Key: FLINK-20648 > URL: https://issues.apache.org/jira/browse/FLINK-20648 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes > Affects Versions: 1.12.0 > Reporter: David Morávek > Assignee: Yang Wang > Priority: Critical > Fix For: 1.13.0, 1.12.1 > > > When restoring job from savepoint, we always end up with following error: > {code} > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not instantiate JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > ... 3 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Stopped > retrying the operation because the error is not retryable. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > ~[?:?] > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?] > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:150) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:211) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1479) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:325) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:266) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > ... 3 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Stopped retrying the operation because the error is not retryable. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:166) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > ... 3 more > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot > retry checkAndUpdateConfigMap with configMap > pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader > because it does not exist. > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?] > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > ... 3 more > Caused by: > org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot > retry checkAndUpdateConfigMap with configMap > pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader > because it does not exist. > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?] > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764) > ~[?:?] > {code} > Cause of the issue is following: > - We construct `jobMasterServices` prior starting `leaderElectionService` (in > `JobManagerRunnerImpl`) > - During `jobMasterServices` initialization > `tryRestoreExecutionGraphFromSavepoint` gets called. This calls > `KubernetesStateHandleStore.addAndLock` interally. > - `KubernetesStateHandleStore.addAndLock` expects configmap for JM leadership > to be already present, which is wrong, because `leaderElectionService` which > is responsible for its creation has not started yet > Possible fixes: > - Start `leaderElectionService` before `jobMasterServices` > - Fix `KubernetesStateHandleStore`, so it can handle the case, when leader > hasn't been elected -- This message was sent by Atlassian Jira (v8.3.4#803005)