Hi Kostas, Sorry, I forget that. I'm using Flink 1.5.3.
Best, Paul Lam Kostas Kloudas <k.klou...@data-artisans.com> 于2018年9月27日周四 下午8:22写道: > Hi Paul, > > I am also cc’ing Till and Gary who may be able to help, but to give them > more information, > it would help if you told us which Flink version you are using. > > Cheers, > Kostas > > > On Sep 27, 2018, at 1:24 PM, Paul Lam <paullin3...@gmail.com> wrote: > > > > Hi, > > > > One of my Flink on YARN jobs got into a weird situation after a > fail-fast restart. The restart was triggered by loss of a TaskManager, but > when the job was recovering, one of its subtask (1/24) couldn’t be > deployed, and finally failed with NoResourceAvailableException. > > > > Looking into the logs, I find the JobManager was requesting containers > for the subtask again and again, but indeed it got a container in each > round of requests, which seemed to be lost silently (with no error logs) or > never registered. > > > > At last, I canceled the job and resubmitted it, and the problem was > gone. > > > > I thought it might be something wrong with the TaskManager > initialization, but if it is the case there would be some errors in > JobManagar’s log, right? > > > > Sorry for not being able to reproduce the problem, but does anyone have > any idea on this? Thanks a lot! > > > > This is the cause of the job restart: > > ``` > > 2018-09-27 12:33:11,639 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection > container_1536852167599_809133_02_000016 because: Container released on a > *lost* node > > 2018-09-27 12:33:11,639 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - > Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the > SlotManager. > > 2018-09-27 12:33:11,639 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from > RUNNING to FAILED. > > org.apache.flink.util.FlinkException: The assigned slot > container_1536852167599_809133_02_000016_0 was removed. > > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372) > > at > org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802) > > at > org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > ``` > > > > And then JobManager kept requesting and returning containers before > timeout: > > ``` > > 2018-09-27 12:38:11,072 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_000832 - Remaining pending container > requests: 1 > > 2018-09-27 12:38:11,072 INFO org.apache.flink.yarn.YarnResourceManager > - Adding keytab > hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab > to the AM container local resource bucket > > 2018-09-27 12:38:11,338 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > > 2018-09-27 12:38:11,338 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers > > 2018-09-27 12:38:11,340 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Opening proxy : gdc-dn254:8041 > > 2018-09-27 12:38:11,346 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_000833 - Remaining pending container > requests: 0 > > 2018-09-27 12:38:11,346 INFO org.apache.flink.yarn.YarnResourceManager > - Returning excess container > container_1536852167599_809133_02_000833. > > 2018-09-27 12:38:11,346 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_000834 - Remaining pending container > requests: 0 > > 2018-09-27 12:38:11,346 INFO org.apache.flink.yarn.YarnResourceManager > - Returning excess container > container_1536852167599_809133_02_000834. > > 2018-09-27 12:38:11,346 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_000835 - Remaining pending container > requests: 0 > > 2018-09-27 12:38:11,346 INFO org.apache.flink.yarn.YarnResourceManager > - Returning excess container > container_1536852167599_809133_02_000835. > > ———————— skip some similar logs ——————— > > 2018-09-27 12:43:15,449 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_002284 - Remaining pending container > requests: 1 > > 2018-09-27 12:43:15,449 INFO org.apache.flink.yarn.YarnResourceManager > - Adding keytab > hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab > to the AM container local resource bucket > > 2018-09-27 12:43:15,725 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > > 2018-09-27 12:43:15,725 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers > > 2018-09-27 12:43:15,727 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Opening proxy : gdc-dn521:8041 > > 2018-09-27 12:43:15,733 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_002285 - Remaining pending container > requests: 0 > > 2018-09-27 12:43:15,733 INFO org.apache.flink.yarn.YarnResourceManager > - Returning excess container > container_1536852167599_809133_02_002285. > > 2018-09-27 12:43:15,733 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_002286 - Remaining pending container > requests: 0 > > 2018-09-27 12:43:15,734 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_002351 - Remaining pending container > requests: 0 > > 2018-09-27 12:43:15,734 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_002352 - Remaining pending container > requests: 0 > > 2018-09-27 12:43:15,734 INFO org.apache.flink.yarn.YarnResourceManager > - Returning excess container > container_1536852167599_809133_02_002352. > > 2018-09-27 12:43:15,734 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1536852167599_809133_02_002353 - Remaining pending container > requests: 0 > > 2018-09-27 12:43:15,734 INFO org.apache.flink.yarn.YarnResourceManager > - Returning excess container > container_1536852167599_809133_02_002353. > > 2018-09-27 12:43:16,831 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source (1/24) of job > 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > > 2018-09-27 12:43:21,831 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source (1/24) of job > 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > > 2018-09-27 12:43:21,833 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Pending > slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out. > > 2018-09-27 12:43:21,837 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to > FAILING. > > > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate all requires slots within timeout of 300000 ms. Slots > required: 72, slots allocated: 69 > > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984) > > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > > at > org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770) > > at akka.dispatch.OnComplete.internal(Future.scala:258) > > at akka.dispatch.OnComplete.internal(Future.scala:256) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) > > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) > > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) > > at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > > at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > ``` > > > > Best, > > Paul Lam > >