Hi Jing, Thank you for your reply, that cluster is terminated and will provide the log if it occurs again.
On Wed, Jun 2, 2021 at 11:17 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Kai, > The reason why job job cannot be recovered maybe not directly related to > the exception you mentioned in your email. > Would you like provide complete jobmanager.log and taskmanager.log. Maybe > we could find some hints there. > > Best regards, > JING ZHANG > > Kai Fu <zzfu...@gmail.com> 于2021年6月2日周三 上午7:23写道: > >> HI Till, >> >> Thank you for your response, per my observation that the process lasted >> for ~1 day, and cannot be recovered and we killed the cluster finally. >> >> On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Kai, >>> >>> The rejection you are seeing should not be serious. The way this can >>> happen is the following: If Yarn restarts the application master, Flink >>> will try to recover previously started containers. If this is not possible >>> or Yarn only tells about a subset of the previously allocated containers, >>> then it can happen that if a container that has not been reported to the >>> new ResourceManager tries to register is rejected because it is not known. >>> The idea behind this behaviour is to only accept those resources which one >>> has knowingly requested in order to free other resources which might belong >>> to another Yarn application. >>> >>> In any case, the newly started Flink ResourceManager should request new >>> containers so that there are enough TaskManagers available to run your job >>> (assuming that the Yarn cluster has enough resources). Hence, the cluster >>> should recover from this situation and there should not be a lot to worry >>> about. >>> >>> Cheers, >>> Till >>> >>> On Sun, May 30, 2021 at 7:36 AM Kai Fu <zzfu...@gmail.com> wrote: >>> >>>> Hi team, >>>> >>>> We encountered an issue during recovery from checkpoint. It's >>>> recovering because the downstream Kafka sink is full for a while and the >>>> job is failed and keeps trying to recover(The downstream is full for about >>>> 4 hours). The job cannot recover from checkpoint successfully even if after >>>> we scaled up the Kafka cluster and shows the following exception. Is there >>>> any guidance on how to locate and avoid this kind of issue? >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *2021-05-30 01:31:21,419 INFO >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - >>>> Connecting to ResourceManager >>>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30 >>>> 01:31:21,422 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >>>> [] - Resolved ResourceManager address, beginning registration2021-05-30 >>>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor >>>> [] - Fatal error occurred in TaskExecutor >>>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException: >>>> The TaskExecutor's registration at the ResourceManager >>>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_* >>>> has been rejected: Rejected TaskExecutor registration at the ResourceManger >>>> because: The ResourceManager does not recognize this TaskExecutor. >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >>>> ~[?:1.8.0_272] at >>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >>>> ~[?:1.8.0_272] at >>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >>>> ~[?:1.8.0_272] at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1] at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR >>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal >>>> error occurred while executing the TaskManager. Shutting it down...* >>>> >>>> -- >>>> *Best wishes,* >>>> *- Kai* >>>> >>> >> >> -- >> *Best wishes,* >> *- Kai* >> > -- *Best wishes,* *- Kai*