Hi Kai, The reason why job job cannot be recovered maybe not directly related to the exception you mentioned in your email. Would you like provide complete jobmanager.log and taskmanager.log. Maybe we could find some hints there.
Best regards, JING ZHANG Kai Fu <zzfu...@gmail.com> 于2021年6月2日周三 上午7:23写道: > HI Till, > > Thank you for your response, per my observation that the process lasted > for ~1 day, and cannot be recovered and we killed the cluster finally. > > On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Kai, >> >> The rejection you are seeing should not be serious. The way this can >> happen is the following: If Yarn restarts the application master, Flink >> will try to recover previously started containers. If this is not possible >> or Yarn only tells about a subset of the previously allocated containers, >> then it can happen that if a container that has not been reported to the >> new ResourceManager tries to register is rejected because it is not known. >> The idea behind this behaviour is to only accept those resources which one >> has knowingly requested in order to free other resources which might belong >> to another Yarn application. >> >> In any case, the newly started Flink ResourceManager should request new >> containers so that there are enough TaskManagers available to run your job >> (assuming that the Yarn cluster has enough resources). Hence, the cluster >> should recover from this situation and there should not be a lot to worry >> about. >> >> Cheers, >> Till >> >> On Sun, May 30, 2021 at 7:36 AM Kai Fu <zzfu...@gmail.com> wrote: >> >>> Hi team, >>> >>> We encountered an issue during recovery from checkpoint. It's recovering >>> because the downstream Kafka sink is full for a while and the job is failed >>> and keeps trying to recover(The downstream is full for about 4 hours). The >>> job cannot recover from checkpoint successfully even if after we scaled up >>> the Kafka cluster and shows the following exception. Is there any guidance >>> on how to locate and avoid this kind of issue? >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *2021-05-30 01:31:21,419 INFO >>> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - >>> Connecting to ResourceManager >>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30 >>> 01:31:21,422 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >>> [] - Resolved ResourceManager address, beginning registration2021-05-30 >>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor >>> [] - Fatal error occurred in TaskExecutor >>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException: >>> The TaskExecutor's registration at the ResourceManager >>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_* >>> has been rejected: Rejected TaskExecutor registration at the ResourceManger >>> because: The ResourceManager does not recognize this TaskExecutor. >>> at >>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >>> ~[?:1.8.0_272] at >>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >>> ~[?:1.8.0_272] at >>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >>> ~[?:1.8.0_272] at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR >>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal >>> error occurred while executing the TaskManager. Shutting it down...* >>> >>> -- >>> *Best wishes,* >>> *- Kai* >>> >> > > -- > *Best wishes,* > *- Kai* >