Hi Fanbin,

> On YARN setups, this value is automatically configured to the size of the
> TaskManager's YARN container, minus a certain tolerance value.
>
If I understand correctly, you are running Flink standalone cluster both in
docker and on EMR? If that is the case, then this sentence has nothing to
do with your case, because it's describing about Yarn deployment.

It should also be irrelevant that you are using a machine with larger
memory on EMR, as long as the "taskmanager.heap.size" are the same. In your
case, I assume for both scenarios the default 1024m is used?

If "taskmanager.memory.size" is not explicitly specified, Flink will
automatically decide the managed memory size. The derived size of managed
memory is depended on the JVM free memory after launching the TM. Flink
will trigger a "System.gc()" after the TM is started, and read the JVM free
heap size after it. I guess the reason decreasing docker cpu cores works
might be that, less cpu cores somehow results in less heap memory
consumption, leaving more free heap memory, thus more managed memory.
AFAIK, there are several places in TM where Flink read the system cpu cores
and decide thread pool sizes accordingly. But this is just my guess and I
cannot confirm it.

I would suggest you to configure "taskmanager.memory.size" explicitly
anyway, to avoid potential problems caused by the uncertainty of JVM free
heap memory size. BTW, this randomness is eliminated in Flink 1.10.

Thank you~

Xintong Song



On Thu, Jan 9, 2020 at 3:04 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Xintong,
>
> Thanks for looking into this. I changed docker setting of #CPUs to a lower
> number and it works now.
> I was using the same code and same flink version. The reason that it works
> on EMR is that I'm using a machine with large memory.
> According to the doc:
> *JVM heap size for the TaskManagers, which are the parallel workers of the
> system. On YARN setups, this value is automatically configured to the size
> of the TaskManager's YARN container, minus a certain tolerance value.*
>
> The default value for JVM heap size is 1024m and I was configuring docker
> to have 6 CPUs and that failed blink batch jobs.
>
> Thanks for your help!
> Fanbin
>
> On Tue, Jan 7, 2020 at 7:51 PM Xintong Song <tonysong...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> The blink planner batch sql operators requires managed memory, and the
>> amount of managed memory needed depends on your job. The failure is because
>> the slot, according to your cluster configurations, does not have enough
>> managed memory to fulfill the requests.
>>
>> To fix the problem, you would need to configure more managed memory for
>> your task executors. You can set the config option
>> "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
>> your case) * numberOfSlots'.
>>
>> It's not clear to me why the exactly same code works on emr. Were you
>> running the same version of flink?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>>
>>> Hi,
>>>
>>> with Flink 1.9 running in docker mode, I have a batch job and got the
>>> following error message.
>>>
>>> However, it works totally fine with the same code on EMR. I checked the
>>> log and here is the only difference:
>>> managedMemoryInMB=138 . (the working ones has 0 value)
>>>
>>> did anybody see this before?
>>> Thanks,
>>> Fanbin
>>>
>>>
>>> org.apache.flink.runtime.jobmanager.scheduler.
>>> NoResourceAvailableException: No pooled slot available and request to
>>> ResourceManager for new slot failed
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>>>     at java.util.concurrent.CompletableFuture.uniWhenComplete(
>>> CompletableFuture.java:774)
>>>     at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
>>> CompletableFuture.java:792)
>>>     at java.util.concurrent.CompletableFuture.whenComplete(
>>> CompletableFuture.java:2153)
>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateSharedSlot(SchedulerImpl.java:341)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .internalAllocateSlot(SchedulerImpl.java:168)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateSlotInternal(SchedulerImpl.java:149)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateBatchSlot(SchedulerImpl.java:129)
>>>     at org.apache.flink.runtime.executiongraph.
>>> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
>>> SlotProviderStrategy.java:109)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
>>>     at java.util.concurrent.CompletableFuture.uniComposeStage(
>>> CompletableFuture.java:995)
>>>     at java.util.concurrent.CompletableFuture.thenCompose(
>>> CompletableFuture.java:2137)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .allocateAndAssignSlotForExecution(Execution.java:554)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .allocateResourcesForExecution(Execution.java:496)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .scheduleForExecution(Execution.java:439)
>>>     at org.apache.flink.runtime.executiongraph.ExecutionVertex
>>> .scheduleForExecution(ExecutionVertex.java:674)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .scheduleConsumer(Execution.java:850)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .scheduleOrUpdateConsumers(Execution.java:887)
>>>     at org.apache.flink.runtime.executiongraph.Execution.markFinished(
>>> Execution.java:1064)
>>>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> .updateStateInternal(ExecutionGraph.java:1548)
>>>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> .updateState(ExecutionGraph.java:1521)
>>>     at org.apache.flink.runtime.scheduler.LegacyScheduler
>>> .updateTaskExecutionState(LegacyScheduler.java:289)
>>>     at org.apache.flink.runtime.jobmaster.JobMaster
>>> .updateTaskExecutionState(JobMaster.java:377)
>>>     at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>> .handleRpcInvocation(AkkaRpcActor.java:279)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>> AkkaRpcActor.java:194)
>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>> AkkaRpcActor.java:152)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123
>>> )
>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:
>>> 21)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 170)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 171)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 171)
>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>> ForkJoinPool.java:1339)
>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
>>> 1979)
>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>> Caused by: java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.resourcemanager.exceptions.
>>> ResourceManagerException: Could not fulfill slot request
>>> c7de65260c8d428b2e295e5afb205242.
>>>     at java.util.concurrent.CompletableFuture.encodeThrowable(
>>> CompletableFuture.java:292)
>>>     at java.util.concurrent.CompletableFuture.completeThrowable(
>>> CompletableFuture.java:308)
>>>     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture
>>> .java:607)
>>>     at java.util.concurrent.CompletableFuture.uniApplyStage(
>>> CompletableFuture.java:628)
>>>     at java.util.concurrent.CompletableFuture.thenApply(
>>> CompletableFuture.java:1996)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler
>>> .invokeRpc(AkkaInvocationHandler.java:214)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(
>>> AkkaInvocationHandler.java:129)
>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler
>>> .invoke(FencedAkkaInvocationHandler.java:78)
>>>     at com.sun.proxy.$Proxy8.requestSlot(Unknown Source)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestSlotFromResourceManager(SlotPoolImpl.java:334)
>>>     ... 48 more
>>> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
>>> ResourceManagerException: Could not fulfill slot request
>>> c7de65260c8d428b2e295e5afb205242.
>>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>>> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315)
>>>     at org.apache.flink.runtime.resourcemanager.ResourceManager
>>> .requestSlot(ResourceManager.java:443)
>>>     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>>>     ... 24 more
>>> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
>>> UnfulfillableSlotRequestException: Could not fulfill slot request
>>> c7de65260c8d428b2e295e5afb205242. Requested resource profile (
>>> ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0,
>>> nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is
>>> unfulfillable.
>>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>>> SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768)
>>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>>> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310)
>>>     ... 26 more
>>>
>>

Reply via email to