[ 
https://issues.apache.org/jira/browse/FLINK-20648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253266#comment-17253266
 ] 

Yang Wang commented on FLINK-20648:
-----------------------------------

Yes, the {{KubernetesLeaderElector#run}} is started in a separate thread. Once 
started, it tries to create the ConfigMap immediately. Since we have http IO 
operation here, we could not guarantee that the ConfigMap is created in given 
time. In most cases, it could be done in one round of loop(5 seconds).

 

I quickly gone though the PR of FLINK-11719. And actually we have the similar 
implementation[1]. I verified it could works and recover from the savepoint 
successfully. But just as you said, we introduce some behavior changes, and it 
seems that we need to fix some more tests.

 

[1]. 
https://github.com/wangyang0918/flink/commit/3be9df9063280693526c1b4446ec499a024a9059

> Unable to restore job from savepoint when using Kubernetes based HA services
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20648
>                 URL: https://issues.apache.org/jira/browse/FLINK-20648
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.12.0
>            Reporter: David Morávek
>            Assignee: Yang Wang
>            Priority: Critical
>             Fix For: 1.13.0, 1.12.1
>
>
> When restoring job from savepoint, we always end up with following error:
> {code}
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not instantiate JobManager.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Stopped 
> retrying the operation because the error is not retryable.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?]
>       at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:150)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:211)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1479)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:325)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:266)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Stopped retrying the operation because the error is not retryable.
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:166)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
> {code}
> Cause of the issue is following:
> - We construct `jobMasterServices` prior starting `leaderElectionService` (in 
> `JobManagerRunnerImpl`)
> - During `jobMasterServices` initialization 
> `tryRestoreExecutionGraphFromSavepoint` gets called. This calls 
> `KubernetesStateHandleStore.addAndLock` interally.
> - `KubernetesStateHandleStore.addAndLock` expects configmap for JM leadership 
> to be already present, which is wrong, because `leaderElectionService` which 
> is responsible for its creation has not started yet
> Possible fixes:
> - Start `leaderElectionService` before `jobMasterServices`
> - Fix `KubernetesStateHandleStore`, so it can handle the case, when leader 
> hasn't been elected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to