Hi Kostas,

Sorry, I forget that. I'm using Flink 1.5.3.

Best,
Paul Lam

Kostas Kloudas <k.klou...@data-artisans.com> 于2018年9月27日周四 下午8:22写道:

> Hi Paul,
>
> I am also cc’ing Till and Gary who may be able to help, but to give them
> more information,
> it would help if you told us which Flink version you are using.
>
> Cheers,
> Kostas
>
> > On Sep 27, 2018, at 1:24 PM, Paul Lam <paullin3...@gmail.com> wrote:
> >
> > Hi,
> >
> > One of my Flink on YARN jobs got into a weird situation after a
> fail-fast restart. The restart was triggered by loss of a TaskManager, but
> when the job was recovering, one of its subtask (1/24) couldn’t be
> deployed, and finally failed with NoResourceAvailableException.
> >
> > Looking into the logs, I find the JobManager was requesting containers
> for the subtask again and again, but indeed it got a container in each
> round of requests, which seemed to be lost silently (with no error logs) or
> never registered.
> >
> > At last, I canceled the job and resubmitted it, and the problem was
> gone.
> >
> > I thought it might be something wrong with the TaskManager
> initialization, but if it is the case there would be some errors in
> JobManagar’s log, right?
> >
> > Sorry for not being able to reproduce the problem, but does anyone have
> any idea on this? Thanks a lot!
> >
> > This is the cause of the job restart:
> > ```
> > 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Closing TaskExecutor connection
> container_1536852167599_809133_02_000016 because: Container released on a
> *lost* node
> > 2018-09-27 12:33:11,639 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
> Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the
> SlotManager.
> > 2018-09-27 12:33:11,639 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
> LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from
> RUNNING to FAILED.
> > org.apache.flink.util.FlinkException: The assigned slot
> container_1536852167599_809133_02_000016_0 was removed.
> >       at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
> >       at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
> >       at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
> >       at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
> >       at
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
> >       at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
> >       at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> >       at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> >       at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> >       at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> >       at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> >       at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> >       at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >       at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >       at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >       at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >       at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >       at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >       at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >       at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > ```
> >
> > And then JobManager kept requesting and returning containers before
> timeout:
> > ```
> > 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_000832 - Remaining pending container
> requests: 1
> > 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Adding keytab
> hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
> to the AM container local resource bucket
> > 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Creating container launch context for TaskManagers
> > 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Starting TaskManagers
> > 2018-09-27 12:38:11,340 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : gdc-dn254:8041
> > 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_000833 - Remaining pending container
> requests: 0
> > 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Returning excess container
> container_1536852167599_809133_02_000833.
> > 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_000834 - Remaining pending container
> requests: 0
> > 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Returning excess container
> container_1536852167599_809133_02_000834.
> > 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_000835 - Remaining pending container
> requests: 0
> > 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Returning excess container
> container_1536852167599_809133_02_000835.
> > ———————— skip some similar logs ———————
> > 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_002284 - Remaining pending container
> requests: 1
> > 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Adding keytab
> hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
> to the AM container local resource bucket
> > 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Creating container launch context for TaskManagers
> > 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Starting TaskManagers
> > 2018-09-27 12:43:15,727 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : gdc-dn521:8041
> > 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_002285 - Remaining pending container
> requests: 0
> > 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Returning excess container
> container_1536852167599_809133_02_002285.
> > 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_002286 - Remaining pending container
> requests: 0
> > 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_002351 - Remaining pending container
> requests: 0
> > 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_002352 - Remaining pending container
> requests: 0
> > 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Returning excess container
> container_1536852167599_809133_02_002352.
> > 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Received new container:
> container_1536852167599_809133_02_002353 - Remaining pending container
> requests: 0
> > 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager
>                    - Returning excess container
> container_1536852167599_809133_02_002353.
> > 2018-09-27 12:43:16,831 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
> triggering task Source: Custom Source (1/24) of job
> 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.
> > 2018-09-27 12:43:21,831 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
> triggering task Source: Custom Source (1/24) of job
> 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.
> > 2018-09-27 12:43:21,833 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending
> slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
> > 2018-09-27 12:43:21,837 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to
> FAILING.
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 300000 ms. Slots
> required: 72, slots allocated: 69
> >       at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
> >       at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >       at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >       at
> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
> >       at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> >       at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> >       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >       at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
> >       at akka.dispatch.OnComplete.internal(Future.scala:258)
> >       at akka.dispatch.OnComplete.internal(Future.scala:256)
> >       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> >       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> >       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >       at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> >       at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> >       at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> >       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> >       at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
> >       at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
> >       at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> >       at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> >       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >       at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> >       at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> >       at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> >       at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> >       at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> >       at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> >       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> >       at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> >       at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >       at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >       at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >       at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > ```
> >
> > Best,
> > Paul Lam
>
>

Reply via email to