Hi Fanbin,

The blink planner batch sql operators requires managed memory, and the
amount of managed memory needed depends on your job. The failure is because
the slot, according to your cluster configurations, does not have enough
managed memory to fulfill the requests.

To fix the problem, you would need to configure more managed memory for
your task executors. You can set the config option
"taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
your case) * numberOfSlots'.

It's not clear to me why the exactly same code works on emr. Were you
running the same version of flink?

Thank you~

Xintong Song



On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <[email protected]> wrote:

> Hi,
>
> with Flink 1.9 running in docker mode, I have a batch job and got the
> following error message.
>
> However, it works totally fine with the same code on EMR. I checked the
> log and here is the only difference:
> managedMemoryInMB=138 . (the working ones has 0 value)
>
> did anybody see this before?
> Thanks,
> Fanbin
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot
> failed
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:774)
>     at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
> CompletableFuture.java:792)
>     at java.util.concurrent.CompletableFuture.whenComplete(
> CompletableFuture.java:2153)
>     at org.apache.flink.runtime.concurrent.FutureUtils
> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSharedSlot(SchedulerImpl.java:341)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .internalAllocateSlot(SchedulerImpl.java:168)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlotInternal(SchedulerImpl.java:149)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateBatchSlot(SchedulerImpl.java:129)
>     at org.apache.flink.runtime.executiongraph.
> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
> SlotProviderStrategy.java:109)
>     at org.apache.flink.runtime.executiongraph.Execution
> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
>     at java.util.concurrent.CompletableFuture.uniComposeStage(
> CompletableFuture.java:995)
>     at java.util.concurrent.CompletableFuture.thenCompose(
> CompletableFuture.java:2137)
>     at org.apache.flink.runtime.executiongraph.Execution
> .allocateAndAssignSlotForExecution(Execution.java:554)
>     at org.apache.flink.runtime.executiongraph.Execution
> .allocateResourcesForExecution(Execution.java:496)
>     at org.apache.flink.runtime.executiongraph.Execution
> .scheduleForExecution(Execution.java:439)
>     at org.apache.flink.runtime.executiongraph.ExecutionVertex
> .scheduleForExecution(ExecutionVertex.java:674)
>     at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer(
> Execution.java:850)
>     at org.apache.flink.runtime.executiongraph.Execution
> .scheduleOrUpdateConsumers(Execution.java:887)
>     at org.apache.flink.runtime.executiongraph.Execution.markFinished(
> Execution.java:1064)
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
> .updateStateInternal(ExecutionGraph.java:1548)
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(
> ExecutionGraph.java:1521)
>     at org.apache.flink.runtime.scheduler.LegacyScheduler
> .updateTaskExecutionState(LegacyScheduler.java:289)
>     at org.apache.flink.runtime.jobmaster.JobMaster
> .updateTaskExecutionState(JobMaster.java:377)
>     at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:279)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:194)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
> AkkaRpcActor.java:152)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> .java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
> 1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.resourcemanager.exceptions.
> ResourceManagerException: Could not fulfill slot request
> c7de65260c8d428b2e295e5afb205242.
>     at java.util.concurrent.CompletableFuture.encodeThrowable(
> CompletableFuture.java:292)
>     at java.util.concurrent.CompletableFuture.completeThrowable(
> CompletableFuture.java:308)
>     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture
> .java:607)
>     at java.util.concurrent.CompletableFuture.uniApplyStage(
> CompletableFuture.java:628)
>     at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture
> .java:1996)
>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(
> AkkaInvocationHandler.java:214)
>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(
> AkkaInvocationHandler.java:129)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler
> .invoke(FencedAkkaInvocationHandler.java:78)
>     at com.sun.proxy.$Proxy8.requestSlot(Unknown Source)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestSlotFromResourceManager(SlotPoolImpl.java:334)
>     ... 48 more
> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
> ResourceManagerException: Could not fulfill slot request
> c7de65260c8d428b2e295e5afb205242.
>     at org.apache.flink.runtime.resourcemanager.slotmanager.
> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315)
>     at org.apache.flink.runtime.resourcemanager.ResourceManager
> .requestSlot(ResourceManager.java:443)
>     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>     ... 24 more
> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
> UnfulfillableSlotRequestException: Could not fulfill slot request
> c7de65260c8d428b2e295e5afb205242. Requested resource profile (
> ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is
> unfulfillable.
>     at org.apache.flink.runtime.resourcemanager.slotmanager.
> SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768)
>     at org.apache.flink.runtime.resourcemanager.slotmanager.
> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310)
>     ... 26 more
>

Reply via email to