Hi,

One of my Flink on YARN jobs got into a weird situation after a fail-fast 
restart. The restart was triggered by loss of a TaskManager, but when the job 
was recovering, one of its subtask (1/24) couldn’t be deployed, and finally 
failed with NoResourceAvailableException. 

Looking into the logs, I find the JobManager was requesting containers for the 
subtask again and again, but indeed it got a container in each round of 
requests, which seemed to be lost silently (with no error logs) or never 
registered. 

At last, I canceled the job and resubmitted it, and the problem was gone. 

I thought it might be something wrong with the TaskManager initialization, but 
if it is the case there would be some errors in JobManagar’s log, right? 

Sorry for not being able to reproduce the problem, but does anyone have any 
idea on this? Thanks a lot!

This is the cause of the job restart:
```
2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Closing TaskExecutor connection 
container_1536852167599_809133_02_000016 because: Container released on a 
*lost* node
2018-09-27 12:33:11,639 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister 
TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
2018-09-27 12:33:11,639 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING 
to FAILED.
org.apache.flink.util.FlinkException: The assigned slot 
container_1536852167599_809133_02_000016_0 was removed.
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
        at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

And then JobManager kept requesting and returning containers before timeout:
```
2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_000832 
- Remaining pending container requests: 1
2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Adding keytab 
hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
 to the AM container local resource bucket
2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Creating container launch context for TaskManagers
2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Starting TaskManagers
2018-09-27 12:38:11,340 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : gdc-dn254:8041
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_000833 
- Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Returning excess container 
container_1536852167599_809133_02_000833.
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_000834 
- Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Returning excess container 
container_1536852167599_809133_02_000834.
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_000835 
- Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Returning excess container 
container_1536852167599_809133_02_000835.
———————— skip some similar logs ——————— 
2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_002284 
- Remaining pending container requests: 1
2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Adding keytab 
hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
 to the AM container local resource bucket
2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Creating container launch context for TaskManagers
2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Starting TaskManagers
2018-09-27 12:43:15,727 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : gdc-dn521:8041
2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_002285 
- Remaining pending container requests: 0
2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Returning excess container 
container_1536852167599_809133_02_002285.
2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_002286 
- Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_002351 
- Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_002352 
- Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Returning excess container 
container_1536852167599_809133_02_002352.
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1536852167599_809133_02_002353 
- Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Returning excess container 
container_1536852167599_809133_02_002353.
2018-09-27 12:43:16,831 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (1/24) of job 
4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.
2018-09-27 12:43:21,831 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (1/24) of job 
4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.
2018-09-27 12:43:21,833 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot 
request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
2018-09-27 12:43:21,837 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream 
(4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 300000 ms. Slots 
required: 72, slots allocated: 69
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at 
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
        at akka.dispatch.OnComplete.internal(Future.scala:258)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

Best,
Paul Lam

Reply via email to