Hi,

I have written ETL jobs in Flink (DataSet API). When I execute them in IDE,
they run and finish fine. When I try to run them on my cluster, I get
"Insufficient number of network buffers" error.

I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB
each. I increased the number of buffers to 5000, but got the same error.
When I increased it further (say 7500), I get  exception listed below.

The DAG or execution plan is pretty big. What is recommended way to run
your jobs when the DAG becomes huge? Shall I break it into parts by calling
execute on execution environment in between jobs ?

Thanks,
Tarandeep

Exception I got after I tried to run with 7500 buffers:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
    at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Update task on instance
d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
    at
org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
    at akka.dispatch.OnFailure.internal(Future.scala:228)
    at akka.dispatch.OnFailure.internal(Future.scala:227)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at
scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
    at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
    at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    ... 2 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]]
after [10000 ms]
    at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
    at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
    at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
    at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
    at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
    at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
    at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
    at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
    at java.lang.Thread.run(Thread.java:745)

Reply via email to