Hi Paul,

I am also cc’ing Till and Gary who may be able to help, but to give them more 
information,
it would help if you told us which Flink version you are using.

Cheers,
Kostas

> On Sep 27, 2018, at 1:24 PM, Paul Lam <paullin3...@gmail.com> wrote:
> 
> Hi,
> 
> One of my Flink on YARN jobs got into a weird situation after a fail-fast 
> restart. The restart was triggered by loss of a TaskManager, but when the job 
> was recovering, one of its subtask (1/24) couldn’t be deployed, and finally 
> failed with NoResourceAvailableException. 
> 
> Looking into the logs, I find the JobManager was requesting containers for 
> the subtask again and again, but indeed it got a container in each round of 
> requests, which seemed to be lost silently (with no error logs) or never 
> registered. 
> 
> At last, I canceled the job and resubmitted it, and the problem was gone. 
> 
> I thought it might be something wrong with the TaskManager initialization, 
> but if it is the case there would be some errors in JobManagar’s log, right? 
> 
> Sorry for not being able to reproduce the problem, but does anyone have any 
> idea on this? Thanks a lot!
> 
> This is the cause of the job restart:
> ```
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Closing TaskExecutor connection 
> container_1536852167599_809133_02_000016 because: Container released on a 
> *lost* node
> 2018-09-27 12:33:11,639 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
> 2018-09-27 12:33:11,639 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
> LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING 
> to FAILED.
> org.apache.flink.util.FlinkException: The assigned slot 
> container_1536852167599_809133_02_000016_0 was removed.
>       at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
>       at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
>       at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
>       at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
>       at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
>       at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>       at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> 
> And then JobManager kept requesting and returning containers before timeout:
> ```
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_000832 - Remaining pending container 
> requests: 1
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Adding keytab 
> hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
>  to the AM container local resource bucket
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Creating container launch context for TaskManagers
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Starting TaskManagers
> 2018-09-27 12:38:11,340 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : gdc-dn254:8041
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_000833 - Remaining pending container 
> requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Returning excess container 
> container_1536852167599_809133_02_000833.
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_000834 - Remaining pending container 
> requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Returning excess container 
> container_1536852167599_809133_02_000834.
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_000835 - Remaining pending container 
> requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Returning excess container 
> container_1536852167599_809133_02_000835.
> ———————— skip some similar logs ——————— 
> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_002284 - Remaining pending container 
> requests: 1
> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Adding keytab 
> hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
>  to the AM container local resource bucket
> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Creating container launch context for TaskManagers
> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Starting TaskManagers
> 2018-09-27 12:43:15,727 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : gdc-dn521:8041
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_002285 - Remaining pending container 
> requests: 0
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Returning excess container 
> container_1536852167599_809133_02_002285.
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_002286 - Remaining pending container 
> requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_002351 - Remaining pending container 
> requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_002352 - Remaining pending container 
> requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Returning excess container 
> container_1536852167599_809133_02_002352.
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1536852167599_809133_02_002353 - Remaining pending container 
> requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Returning excess container 
> container_1536852167599_809133_02_002353.
> 2018-09-27 12:43:16,831 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> triggering task Source: Custom Source (1/24) of job 
> 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> 2018-09-27 12:43:21,831 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> triggering task Source: Custom Source (1/24) of job 
> 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> 2018-09-27 12:43:21,833 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot 
> request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
> 2018-09-27 12:43:21,837 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream 
> (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate all requires slots within timeout of 300000 ms. Slots 
> required: 72, slots allocated: 69
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
>       at akka.dispatch.OnComplete.internal(Future.scala:258)
>       at akka.dispatch.OnComplete.internal(Future.scala:256)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>       at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> 
> Best,
> Paul Lam

Reply via email to