Hi Alexey, >From your attached logs, it seems that the leader related config map is reused. Then the Flink application is recovered instead of submitting a new one. This is the root cause it is trying to recover from a wrong savepoint which is specified in your last submission.
> So how to fix this? If you want to stop the application, I strongly suggest to cancel the flink job with savepoint instead of directly deleting all the K8s resources. After then, you will find that the leader related config maps will be deleted automatically after the job is cancelled. Best, Yang Alexey Trenikhun <yen...@msn.com> 于2021年3月10日周三 下午12:16写道: > Hi Yang, > The problem is re-occurred, full JM log is attached > > Thanks, > Alexey > ------------------------------ > *From:* Yang Wang <danrtsey...@gmail.com> > *Sent:* Sunday, February 28, 2021 10:04 PM > *To:* Alexey Trenikhun <yen...@msn.com> > *Cc:* Flink User Mail List <user@flink.apache.org> > *Subject:* Re: Kubernetes HA - attempting to restore from wrong > (non-existing) savepoint > > Hi Alexey, > > It seems that the KubernetesHAService works well since all the checkpoints > have been cleaned up when the job is canceled. > And we could find related logs "Found 0 checkpoints in > KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.". > > However, it is a little strange that the CheckpointCoordinator is > recovering from a wrong savepoint path. Could you share the > full JobManager logs? One possible reason I could guess is the application > cluster entrypoint is not creating a new JobGraph from the specified > arguments. > > > Best, > Yang > > Alexey Trenikhun <yen...@msn.com> 于2021年2月27日周六 上午1:48写道: > > Hello, > We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is > deployed as Job, single TM as StatefulSet). We taken savepoint with > cancel=true. Now when we are trying to start job using --fromSavepoint *A*, > where is *A* path we got from taking savepoint (ClusterEntrypoint reports > *A* in log), but looks like Job for some reason ignores given *A* and > actually trying to restore from some path *B* (CheckpointCoordinator logs > *B* ): > > *{"ts":"2021-02-26T17:09:52.500Z","message":" Program > Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c > <http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c>","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.501Z","message":" > > --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:52.502Z","message":" > > 00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} > * > *...* > > *{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from > KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in > KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are > already > downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during > restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:59.183Z","message":"Starting job > 00000000000000000000000000000000 from savepoint > wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685 > <http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685> > (allowing non restored > state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n build > 2021-02-21T21:13:31-0800\n tag: 0.0.0.7\n id: > 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000} > {"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the > cluster > entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: > JobMaster for job 00000000000000000000000000000000 failed.\n\tat > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat > org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat > org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat > scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat > akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat > akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat > akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat > akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat > akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused > by: org.apache.flink.runtime.client.JobInitializationException: Could not > instantiate JobManager.\n\tat > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat > java.lang.Thread.run(Thread.java:748)\nCaused by: > java.io.FileNotFoundException: Cannot find checkpoint or savepoint > file/directory > 'wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685 > <http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>' > on file system 'wasbs'.\n\tat > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... > 4 common frames omitted\n"} * > > Any suggestions? > > Thanks, > Alexey > >