HI Till,

Thank you for your response, per my observation that the process lasted for
~1 day, and cannot be recovered and we killed the cluster finally.

On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Kai,
>
> The rejection you are seeing should not be serious. The way this can
> happen is the following: If Yarn restarts the application master, Flink
> will try to recover previously started containers. If this is not possible
> or Yarn only tells about a subset of the previously allocated containers,
> then it can happen that if a container that has not been reported to the
> new ResourceManager tries to register is rejected because it is not known.
> The idea behind this behaviour is to only accept those resources which one
> has knowingly requested in order to free other resources which might belong
> to another Yarn application.
>
> In any case, the newly started Flink ResourceManager should request new
> containers so that there are enough TaskManagers available to run your job
> (assuming that the Yarn cluster has enough resources). Hence, the cluster
> should recover from this situation and there should not be a lot to worry
> about.
>
> Cheers,
> Till
>
> On Sun, May 30, 2021 at 7:36 AM Kai Fu <zzfu...@gmail.com> wrote:
>
>> Hi team,
>>
>> We encountered an issue during recovery from checkpoint. It's recovering
>> because the downstream Kafka sink is full for a while and the job is failed
>> and keeps trying to recover(The downstream is full for about 4 hours). The
>> job cannot recover from checkpoint successfully even if after we scaled up
>> the Kafka cluster and shows the following exception. Is there any guidance
>> on how to locate and avoid this kind of issue?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *2021-05-30 01:31:21,419 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
>> Connecting to ResourceManager
>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30
>> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
>>     [] - Resolved ResourceManager address, beginning registration2021-05-30
>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
>>     [] - Fatal error occurred in TaskExecutor
>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
>> The TaskExecutor's registration at the ResourceManager
>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
>> has been rejected: Rejected TaskExecutor registration at the ResourceManger
>> because: The ResourceManager does not recognize this TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> ~[?:1.8.0_272]        at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> ~[?:1.8.0_272]        at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_272]        at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]        at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal
>> error occurred while executing the TaskManager. Shutting it down...*
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*

Reply via email to