Xintong, Thanks for looking into this. I changed docker setting of #CPUs to a lower number and it works now. I was using the same code and same flink version. The reason that it works on EMR is that I'm using a machine with large memory. According to the doc: *JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.*
The default value for JVM heap size is 1024m and I was configuring docker to have 6 CPUs and that failed blink batch jobs. Thanks for your help! Fanbin On Tue, Jan 7, 2020 at 7:51 PM Xintong Song <tonysong...@gmail.com> wrote: > Hi Fanbin, > > The blink planner batch sql operators requires managed memory, and the > amount of managed memory needed depends on your job. The failure is because > the slot, according to your cluster configurations, does not have enough > managed memory to fulfill the requests. > > To fix the problem, you would need to configure more managed memory for > your task executors. You can set the config option > "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in > your case) * numberOfSlots'. > > It's not clear to me why the exactly same code works on emr. Were you > running the same version of flink? > > Thank you~ > > Xintong Song > > > > On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> Hi, >> >> with Flink 1.9 running in docker mode, I have a batch job and got the >> following error message. >> >> However, it works totally fine with the same code on EMR. I checked the >> log and here is the only difference: >> managedMemoryInMB=138 . (the working ones has 0 value) >> >> did anybody see this before? >> Thanks, >> Fanbin >> >> >> org.apache.flink.runtime.jobmanager.scheduler. >> NoResourceAvailableException: No pooled slot available and request to >> ResourceManager for new slot failed >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357) >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345) >> at java.util.concurrent.CompletableFuture.uniWhenComplete( >> CompletableFuture.java:774) >> at java.util.concurrent.CompletableFuture.uniWhenCompleteStage( >> CompletableFuture.java:792) >> at java.util.concurrent.CompletableFuture.whenComplete( >> CompletableFuture.java:2153) >> at org.apache.flink.runtime.concurrent.FutureUtils >> .whenCompleteAsyncIfNotDone(FutureUtils.java:940) >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >> .requestSlotFromResourceManager(SlotPoolImpl.java:339) >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306) >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448) >> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl >> .requestNewAllocatedSlot(SchedulerImpl.java:262) >> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl >> .allocateMultiTaskSlot(SchedulerImpl.java:542) >> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl >> .allocateSharedSlot(SchedulerImpl.java:341) >> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl >> .internalAllocateSlot(SchedulerImpl.java:168) >> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl >> .allocateSlotInternal(SchedulerImpl.java:149) >> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl >> .allocateBatchSlot(SchedulerImpl.java:129) >> at org.apache.flink.runtime.executiongraph. >> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot( >> SlotProviderStrategy.java:109) >> at org.apache.flink.runtime.executiongraph.Execution >> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556) >> at java.util.concurrent.CompletableFuture.uniComposeStage( >> CompletableFuture.java:995) >> at java.util.concurrent.CompletableFuture.thenCompose( >> CompletableFuture.java:2137) >> at org.apache.flink.runtime.executiongraph.Execution >> .allocateAndAssignSlotForExecution(Execution.java:554) >> at org.apache.flink.runtime.executiongraph.Execution >> .allocateResourcesForExecution(Execution.java:496) >> at org.apache.flink.runtime.executiongraph.Execution >> .scheduleForExecution(Execution.java:439) >> at org.apache.flink.runtime.executiongraph.ExecutionVertex >> .scheduleForExecution(ExecutionVertex.java:674) >> at org.apache.flink.runtime.executiongraph.Execution >> .scheduleConsumer(Execution.java:850) >> at org.apache.flink.runtime.executiongraph.Execution >> .scheduleOrUpdateConsumers(Execution.java:887) >> at org.apache.flink.runtime.executiongraph.Execution.markFinished( >> Execution.java:1064) >> at org.apache.flink.runtime.executiongraph.ExecutionGraph >> .updateStateInternal(ExecutionGraph.java:1548) >> at org.apache.flink.runtime.executiongraph.ExecutionGraph >> .updateState(ExecutionGraph.java:1521) >> at org.apache.flink.runtime.scheduler.LegacyScheduler >> .updateTaskExecutionState(LegacyScheduler.java:289) >> at org.apache.flink.runtime.jobmaster.JobMaster >> .updateTaskExecutionState(JobMaster.java:377) >> at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >> .handleRpcInvocation(AkkaRpcActor.java:279) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( >> AkkaRpcActor.java:194) >> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >> .handleRpcMessage(FencedAkkaRpcActor.java:74) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( >> AkkaRpcActor.java:152) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21 >> ) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170 >> ) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171 >> ) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171 >> ) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool >> .java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java: >> 1979) >> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( >> ForkJoinWorkerThread.java:107) >> Caused by: java.util.concurrent.CompletionException: >> org.apache.flink.runtime.resourcemanager.exceptions. >> ResourceManagerException: Could not fulfill slot request >> c7de65260c8d428b2e295e5afb205242. >> at java.util.concurrent.CompletableFuture.encodeThrowable( >> CompletableFuture.java:292) >> at java.util.concurrent.CompletableFuture.completeThrowable( >> CompletableFuture.java:308) >> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture >> .java:607) >> at java.util.concurrent.CompletableFuture.uniApplyStage( >> CompletableFuture.java:628) >> at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture >> .java:1996) >> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc( >> AkkaInvocationHandler.java:214) >> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke( >> AkkaInvocationHandler.java:129) >> at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler >> .invoke(FencedAkkaInvocationHandler.java:78) >> at com.sun.proxy.$Proxy8.requestSlot(Unknown Source) >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >> .requestSlotFromResourceManager(SlotPoolImpl.java:334) >> ... 48 more >> Caused by: org.apache.flink.runtime.resourcemanager.exceptions. >> ResourceManagerException: Could not fulfill slot request >> c7de65260c8d428b2e295e5afb205242. >> at org.apache.flink.runtime.resourcemanager.slotmanager. >> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315) >> at org.apache.flink.runtime.resourcemanager.ResourceManager >> .requestSlot(ResourceManager.java:443) >> at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) >> ... 24 more >> Caused by: org.apache.flink.runtime.resourcemanager.exceptions. >> UnfulfillableSlotRequestException: Could not fulfill slot request >> c7de65260c8d428b2e295e5afb205242. Requested resource profile ( >> ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0, >> nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is >> unfulfillable. >> at org.apache.flink.runtime.resourcemanager.slotmanager. >> SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768) >> at org.apache.flink.runtime.resourcemanager.slotmanager. >> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310) >> ... 26 more >> >