[ https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420868#comment-15420868 ]
Robert Metzger commented on FLINK-4393: --------------------------------------- What's the DataSet size you are trying to collect()? The problem is that the amount of data you can transfer from the cluster to your client is limited by a) the amount of heap space at the client and the amount of data the RPC system can transfer. IIRC, the RPC system's limit is 10MB with the default configuration. > Failed to serialize accumulators for task > ----------------------------------------- > > Key: FLINK-4393 > URL: https://issues.apache.org/jira/browse/FLINK-4393 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.1.0 > Environment: Redhat 6 > Reporter: Sajeev Ramakrishnan > > Dear Team, > I am getting the below exception while trying to use the Table API by > looping through the DataSet using collect() method. > {code} > 2016-08-15 07:18:52,503 WARN > org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to > serialize accumulators for task. > java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Suppressed: java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822) > at > java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719) > at > java.io.ObjectOutputStream.close(ObjectOutputStream.java:740) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:303) > ... 28 more > 2016-08-15 07:18:52,508 ERROR > org.apache.flink.runtime.executiongraph.ExecutionGraph - Failed to > deserialize final accumulator results. > java.lang.NullPointerException > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1176) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:615) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:614) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:614) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2016-08-15 07:18:52,509 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSink > (collect()) (1/1) (71714abe057281428f5ca2bfe980b750) switched from RUNNING to > FAILED > 2016-08-15 07:18:52,510 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 01e11bf3947869463e00a3f53196384d (Flink Java > Job at Mon Aug 15 07:17:27 BST 2016) changed to FAILING. > java.lang.NullPointerException > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1176) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:615) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:614) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:614) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2016-08-15 07:18:52,510 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Discarding > the results produced by task execution a9646a1cab421fe1f3ce6fd544cfbd84 > 2016-08-15 07:18:52,512 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Discarding > the results produced by task execution 71714abe057281428f5ca2bfe980b750 > 2016-08-15 07:18:52,514 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 01e11bf3947869463e00a3f53196384d (Flink Java > Job at Mon Aug 15 07:17:27 BST 2016) changed to FAILED. > java.lang.NullPointerException > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1176) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:615) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:614) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:614) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > Could you please throw some light on this? > Regards, > Sajeev -- This message was sent by Atlassian JIRA (v6.3.4#6332)