Hi team, We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue?
*2021-05-30 01:31:21,419 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).2021-05-30 01:31:21,422 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Resolved ResourceManager address, beginning registration2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Fatal error occurred in TaskExecutor akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor. at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,428 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...* -- *Best wishes,* *- Kai*