Hi Kai, The rejection you are seeing should not be serious. The way this can happen is the following: If Yarn restarts the application master, Flink will try to recover previously started containers. If this is not possible or Yarn only tells about a subset of the previously allocated containers, then it can happen that if a container that has not been reported to the new ResourceManager tries to register is rejected because it is not known. The idea behind this behaviour is to only accept those resources which one has knowingly requested in order to free other resources which might belong to another Yarn application.
In any case, the newly started Flink ResourceManager should request new containers so that there are enough TaskManagers available to run your job (assuming that the Yarn cluster has enough resources). Hence, the cluster should recover from this situation and there should not be a lot to worry about. Cheers, Till On Sun, May 30, 2021 at 7:36 AM Kai Fu <zzfu...@gmail.com> wrote: > Hi team, > > We encountered an issue during recovery from checkpoint. It's recovering > because the downstream Kafka sink is full for a while and the job is failed > and keeps trying to recover(The downstream is full for about 4 hours). The > job cannot recover from checkpoint successfully even if after we scaled up > the Kafka cluster and shows the following exception. Is there any guidance > on how to locate and avoid this kind of issue? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *2021-05-30 01:31:21,419 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - > Connecting to ResourceManager > akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30 > 01:31:21,422 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor > [] - Resolved ResourceManager address, beginning registration2021-05-30 > 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor > [] - Fatal error occurred in TaskExecutor > akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException: > The TaskExecutor's registration at the ResourceManager > akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_* > has been rejected: Rejected TaskExecutor registration at the ResourceManger > because: The ResourceManager does not recognize this TaskExecutor. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > ~[?:1.8.0_272] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > ~[?:1.8.0_272] at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > ~[?:1.8.0_272] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.13.1.jar:1.13.1] at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR > org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal > error occurred while executing the TaskManager. Shutting it down...* > > -- > *Best wishes,* > *- Kai* >