[ https://issues.apache.org/jira/browse/FLINK-18785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172120#comment-17172120 ]
Till Rohrmann commented on FLINK-18785: --------------------------------------- The logs show that the {{Dispatcher}} will be granted leadership and eventually fail. This is the intended behavior. The problem seems to be that we don't distinguish between recoverable and non recoverable errors and that Yarn simply restarts the process when it fails. That's why you see all these application attempts because Yarn will restart the process which in turn will fail immediately after trying to read the savepoint. What we would need to do is to tell Yarn that it should only restart in some cases whereas other cases should terminate the application. I am not 100% sure whether this is supported in Yarn. > flink goes into dead lock leader election when restoring from a do-not-exist > checkpoint/savepoint path > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-18785 > URL: https://issues.apache.org/jira/browse/FLINK-18785 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / Checkpointing, Runtime / > Coordination > Affects Versions: 1.10.0, 1.10.1 > Environment: flink on yarn > flink-1.10.x > jdk8 > flink-conf.yaml yarn.application-attempts: 2 (or just delete this config) > yarn-2.7.2 > Reporter: Kai Chen > Priority: Major > Attachments: flink_savepoint_path_do_not_exits.jpg, > image-2020-07-31-19-04-19-241.png, jobmanager.log.attemp1, > jobmanager.log.attemp2-13 > > > flink goes into dead lock leader election when restoring from a do-not-exist > checkpoint/savepoint path. > I just run this cmd: > bin/flink run -m yarn-cluster -s "hdfs:///do/not/exist/path" > examples/streaming/ > WindowJoin.jar > when i visit UI,i meet this: > !image-2020-07-31-19-04-19-241.png! > in flink-1.9.3, the program just exits. But in 1.10.x, it stucks in leader > election > > Here is the stack trace in `jobmanager.err`: > ERROR ConnectionState Authentication failed > ERROR ClusterEntrypoint Fatal error occurred in the cluster entrypoint. > org.apache.flink.runtime.dispatcher.DispatcherException: Could not start > recovered job 94b0911af12b61d3ee905xxxxxxxxbaf1. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:218) > at > org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$128/130098676.apply(Unknown > Source) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor$$Lambda$60/278409878.apply(Unknown > Source) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: > java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > ... 4 more > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > org.apache.flink.util.function.CheckedSupplier$$Lambda$125/1775358927.get(Unknown > Source) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381) > at > org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$124/874035545.get(Unknown > Source) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 8 more > Caused by: java.io.FileNotFoundException: Cannot find checkpoint or > savepoint file/directory 'hdfs:///path/do/not/exist' on file system 'hdfs'. > at > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243) > at > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1152) > at > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:307) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:240) > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146) > ... 12 more -- This message was sent by Atlassian Jira (v8.3.4#803005)