[ 
https://issues.apache.org/jira/browse/FLINK-20648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253385#comment-17253385
 ] 

Yang Wang commented on FLINK-20648:
-----------------------------------

Thanks [~xintongsong] for suggestion. Maybe we do not need to store the initial 
savepoint to HA storage at all. We just need to check whether it is initial 
savepoint and then add to a local list cache {{initialSavepoints}}. When 
recovering, we merge the {{initialSavepoints}} and {{completedCheckpoints}}. 
This[1] is a simple PoC of the thoughts and I am verifying whether it could 
work.

 

[1]. 
https://github.com/wangyang0918/flink/commit/ee75083487741b508579717644eca291632f98db

> Unable to restore job from savepoint when using Kubernetes based HA services
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20648
>                 URL: https://issues.apache.org/jira/browse/FLINK-20648
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.12.0
>            Reporter: David Morávek
>            Assignee: Yang Wang
>            Priority: Blocker
>             Fix For: 1.13.0, 1.12.1
>
>
> When restoring job from savepoint, we always end up with following error:
> {code}
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not instantiate JobManager.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Stopped 
> retrying the operation because the error is not retryable.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?]
>       at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:150)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:211)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1479)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:325)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:266)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Stopped retrying the operation because the error is not retryable.
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:166)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
> {code}
> Cause of the issue is following:
> - We construct `jobMasterServices` prior starting `leaderElectionService` (in 
> `JobManagerRunnerImpl`)
> - During `jobMasterServices` initialization 
> `tryRestoreExecutionGraphFromSavepoint` gets called. This calls 
> `KubernetesStateHandleStore.addAndLock` interally.
> - `KubernetesStateHandleStore.addAndLock` expects configmap for JM leadership 
> to be already present, which is wrong, because `leaderElectionService` which 
> is responsible for its creation has not started yet
> Possible fixes:
> - Start `leaderElectionService` before `jobMasterServices`
> - Fix `KubernetesStateHandleStore`, so it can handle the case, when leader 
> hasn't been elected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to