Hi Sigalit,

To restart from an explicit savepoint, you need to not only set your 
initialSavepointPath but also the savepointRedeployNonce which should trigger 
the job to restart using that savepoint. 

Try setting/incrementing that property and you should see the job run from your 
manual savepoint.

Hope that helps,

Rion

> On Aug 14, 2024, at 6:26 AM, Sigalit Eliazov <e.siga...@gmail.com> wrote:
> 
> 
> hi,
> 
> We are trying to restart a pipeline from a save point we triggered manually 
> via the job manager rest api.
> 
> with the following configuration in the flinkdeployment crd:
> savepointTriggerNonce: 1
> initialSavepointPath: <the save point path>
> upgradeMode: savepoint
> this always fails with the following error
> 
>   org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Ignoring 
> JobGraph submission 'XXX' (33427c11cef9a7ff48edfd5341b34e28) because the job 
> already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) 
> in a previous execution.org.apache.flink.util.FlinkException: Failed to 
> execute job ‘XXX’.
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2253)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2219)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:?]
>       at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  ~[?:?]
>       at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:?]
>       at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) 
> ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172)
>  ~[?:?]
>       at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>  ~[flink-dist-1.18.1.jar:1.18.1]
>       at 
> org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) 
> [flink-rpc-akkae0beeada-9e90-40ec-abe6-dc401bbf1f51.jar:1.18.1]
>       at 
> org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
>  [flink-rpc-akkae0beeada-9e90-40ec-abe6-dc401bbf1f51.jar:1.18.1]
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>  [?:?]
>       at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) 
> [?:?]
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) 
> [?:?]
> Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException: 
> Job has already been submitted.
> 
> We are using flink operator 1.7 and flink 1.18
> When using the flink operator automatic savepoint mechanism everything works 
> great.
> 
> Any ideas?
> 
> Thanks
> Sigalit
> 
> 

Reply via email to