Hi Fanbin, The blink planner batch sql operators requires managed memory, and the amount of managed memory needed depends on your job. The failure is because the slot, according to your cluster configurations, does not have enough managed memory to fulfill the requests.
To fix the problem, you would need to configure more managed memory for your task executors. You can set the config option "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in your case) * numberOfSlots'. It's not clear to me why the exactly same code works on emr. Were you running the same version of flink? Thank you~ Xintong Song On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <[email protected]> wrote: > Hi, > > with Flink 1.9 running in docker mode, I have a batch job and got the > following error message. > > However, it works totally fine with the same code on EMR. I checked the > log and here is the only difference: > managedMemoryInMB=138 . (the working ones has 0 value) > > did anybody see this before? > Thanks, > Fanbin > > > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot > failed > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357) > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345) > at java.util.concurrent.CompletableFuture.uniWhenComplete( > CompletableFuture.java:774) > at java.util.concurrent.CompletableFuture.uniWhenCompleteStage( > CompletableFuture.java:792) > at java.util.concurrent.CompletableFuture.whenComplete( > CompletableFuture.java:2153) > at org.apache.flink.runtime.concurrent.FutureUtils > .whenCompleteAsyncIfNotDone(FutureUtils.java:940) > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > .requestSlotFromResourceManager(SlotPoolImpl.java:339) > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306) > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448) > at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl > .requestNewAllocatedSlot(SchedulerImpl.java:262) > at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl > .allocateMultiTaskSlot(SchedulerImpl.java:542) > at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl > .allocateSharedSlot(SchedulerImpl.java:341) > at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl > .internalAllocateSlot(SchedulerImpl.java:168) > at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl > .allocateSlotInternal(SchedulerImpl.java:149) > at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl > .allocateBatchSlot(SchedulerImpl.java:129) > at org.apache.flink.runtime.executiongraph. > SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot( > SlotProviderStrategy.java:109) > at org.apache.flink.runtime.executiongraph.Execution > .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556) > at java.util.concurrent.CompletableFuture.uniComposeStage( > CompletableFuture.java:995) > at java.util.concurrent.CompletableFuture.thenCompose( > CompletableFuture.java:2137) > at org.apache.flink.runtime.executiongraph.Execution > .allocateAndAssignSlotForExecution(Execution.java:554) > at org.apache.flink.runtime.executiongraph.Execution > .allocateResourcesForExecution(Execution.java:496) > at org.apache.flink.runtime.executiongraph.Execution > .scheduleForExecution(Execution.java:439) > at org.apache.flink.runtime.executiongraph.ExecutionVertex > .scheduleForExecution(ExecutionVertex.java:674) > at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer( > Execution.java:850) > at org.apache.flink.runtime.executiongraph.Execution > .scheduleOrUpdateConsumers(Execution.java:887) > at org.apache.flink.runtime.executiongraph.Execution.markFinished( > Execution.java:1064) > at org.apache.flink.runtime.executiongraph.ExecutionGraph > .updateStateInternal(ExecutionGraph.java:1548) > at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState( > ExecutionGraph.java:1521) > at org.apache.flink.runtime.scheduler.LegacyScheduler > .updateTaskExecutionState(LegacyScheduler.java:289) > at org.apache.flink.runtime.jobmaster.JobMaster > .updateTaskExecutionState(JobMaster.java:377) > at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:279) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( > AkkaRpcActor.java:194) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor > .handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( > AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool > .java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java: > 1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.resourcemanager.exceptions. > ResourceManagerException: Could not fulfill slot request > c7de65260c8d428b2e295e5afb205242. > at java.util.concurrent.CompletableFuture.encodeThrowable( > CompletableFuture.java:292) > at java.util.concurrent.CompletableFuture.completeThrowable( > CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture > .java:607) > at java.util.concurrent.CompletableFuture.uniApplyStage( > CompletableFuture.java:628) > at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture > .java:1996) > at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc( > AkkaInvocationHandler.java:214) > at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke( > AkkaInvocationHandler.java:129) > at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler > .invoke(FencedAkkaInvocationHandler.java:78) > at com.sun.proxy.$Proxy8.requestSlot(Unknown Source) > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > .requestSlotFromResourceManager(SlotPoolImpl.java:334) > ... 48 more > Caused by: org.apache.flink.runtime.resourcemanager.exceptions. > ResourceManagerException: Could not fulfill slot request > c7de65260c8d428b2e295e5afb205242. > at org.apache.flink.runtime.resourcemanager.slotmanager. > SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315) > at org.apache.flink.runtime.resourcemanager.ResourceManager > .requestSlot(ResourceManager.java:443) > at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) > ... 24 more > Caused by: org.apache.flink.runtime.resourcemanager.exceptions. > UnfulfillableSlotRequestException: Could not fulfill slot request > c7de65260c8d428b2e295e5afb205242. Requested resource profile ( > ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0, > nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is > unfulfillable. > at org.apache.flink.runtime.resourcemanager.slotmanager. > SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768) > at org.apache.flink.runtime.resourcemanager.slotmanager. > SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310) > ... 26 more >
