HI Till, Thank you for your response, per my observation that the process lasted for ~1 day, and cannot be recovered and we killed the cluster finally.
On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Kai, > > The rejection you are seeing should not be serious. The way this can > happen is the following: If Yarn restarts the application master, Flink > will try to recover previously started containers. If this is not possible > or Yarn only tells about a subset of the previously allocated containers, > then it can happen that if a container that has not been reported to the > new ResourceManager tries to register is rejected because it is not known. > The idea behind this behaviour is to only accept those resources which one > has knowingly requested in order to free other resources which might belong > to another Yarn application. > > In any case, the newly started Flink ResourceManager should request new > containers so that there are enough TaskManagers available to run your job > (assuming that the Yarn cluster has enough resources). Hence, the cluster > should recover from this situation and there should not be a lot to worry > about. > > Cheers, > Till > > On Sun, May 30, 2021 at 7:36 AM Kai Fu <zzfu...@gmail.com> wrote: > >> Hi team, >> >> We encountered an issue during recovery from checkpoint. It's recovering >> because the downstream Kafka sink is full for a while and the job is failed >> and keeps trying to recover(The downstream is full for about 4 hours). The >> job cannot recover from checkpoint successfully even if after we scaled up >> the Kafka cluster and shows the following exception. Is there any guidance >> on how to locate and avoid this kind of issue? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *2021-05-30 01:31:21,419 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - >> Connecting to ResourceManager >> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30 >> 01:31:21,422 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >> [] - Resolved ResourceManager address, beginning registration2021-05-30 >> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor >> [] - Fatal error occurred in TaskExecutor >> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException: >> The TaskExecutor's registration at the ResourceManager >> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_* >> has been rejected: Rejected TaskExecutor registration at the ResourceManger >> because: The ResourceManager does not recognize this TaskExecutor. >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >> ~[?:1.8.0_272] at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >> ~[?:1.8.0_272] at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >> ~[?:1.8.0_272] at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.actor.ActorCell.invoke(ActorCell.scala:561) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.Mailbox.run(Mailbox.scala:225) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> [flink-dist_2.11-1.13.1.jar:1.13.1] at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal >> error occurred while executing the TaskManager. Shutting it down...* >> >> -- >> *Best wishes,* >> *- Kai* >> > -- *Best wishes,* *- Kai*