Hi Kai,

The rejection you are seeing should not be serious. The way this can happen
is the following: If Yarn restarts the application master, Flink will try
to recover previously started containers. If this is not possible or Yarn
only tells about a subset of the previously allocated containers, then it
can happen that if a container that has not been reported to the new
ResourceManager tries to register is rejected because it is not known. The
idea behind this behaviour is to only accept those resources which one has
knowingly requested in order to free other resources which might belong to
another Yarn application.

In any case, the newly started Flink ResourceManager should request new
containers so that there are enough TaskManagers available to run your job
(assuming that the Yarn cluster has enough resources). Hence, the cluster
should recover from this situation and there should not be a lot to worry
about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu <zzfu...@gmail.com> wrote:

> Hi team,
>
> We encountered an issue during recovery from checkpoint. It's recovering
> because the downstream Kafka sink is full for a while and the job is failed
> and keeps trying to recover(The downstream is full for about 4 hours). The
> job cannot recover from checkpoint successfully even if after we scaled up
> the Kafka cluster and shows the following exception. Is there any guidance
> on how to locate and avoid this kind of issue?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *2021-05-30 01:31:21,419 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
> Connecting to ResourceManager
> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30
> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
>     [] - Resolved ResourceManager address, beginning registration2021-05-30
> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
>     [] - Fatal error occurred in TaskExecutor
> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
> The TaskExecutor's registration at the ResourceManager
> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
> has been rejected: Rejected TaskExecutor registration at the ResourceManger
> because: The ResourceManager does not recognize this TaskExecutor.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> ~[?:1.8.0_272]        at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> ~[?:1.8.0_272]        at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_272]        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal
> error occurred while executing the TaskManager. Shutting it down...*
>
> --
> *Best wishes,*
> *- Kai*
>

Reply via email to