Hi Sigalit, To restart from an explicit savepoint, you need to not only set your initialSavepointPath but also the savepointRedeployNonce which should trigger the job to restart using that savepoint.
Try setting/incrementing that property and you should see the job run from your manual savepoint. Hope that helps, Rion > On Aug 14, 2024, at 6:26 AM, Sigalit Eliazov <e.siga...@gmail.com> wrote: > > > hi, > > We are trying to restart a pipeline from a save point we triggered manually > via the job manager rest api. > > with the following configuration in the flinkdeployment crd: > savepointTriggerNonce: 1 > initialSavepointPath: <the save point path> > upgradeMode: savepoint > this always fails with the following error > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring > JobGraph submission 'XXX' (33427c11cef9a7ff48edfd5341b34e28) because the job > already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) > in a previous execution.org.apache.flink.util.FlinkException: Failed to > execute job ‘XXX’. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2253) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2219) > ~[flink-dist-1.18.1.jar:1.18.1] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:?] > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:?] > at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) > ~[flink-dist-1.18.1.jar:1.18.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) > ~[?:?] > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) > ~[flink-dist-1.18.1.jar:1.18.1] > at > org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) > [flink-rpc-akkae0beeada-9e90-40ec-abe6-dc401bbf1f51.jar:1.18.1] > at > org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57) > [flink-rpc-akkae0beeada-9e90-40ec-abe6-dc401bbf1f51.jar:1.18.1] > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?] > at > java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > [?:?] > at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?] > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) > [?:?] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > [?:?] > Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException: > Job has already been submitted. > > We are using flink operator 1.7 and flink 1.18 > When using the flink operator automatic savepoint mechanism everything works > great. > > Any ideas? > > Thanks > Sigalit > >