Hello Antonio, .collect() method should be use with caution as it's collecting the DataSet (multiple partitions on multiple TM) into a List single list on JM (so in memory) Unless you have a lot of RAM, you can not use it this way and you probably should not I recommend you to use a sink to print it into a formatted file instead (like CSV one) or if it's too big, into something splittable
Regards, Bastien ------------------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le jeu. 10 févr. 2022 à 20:32, Antonio Si <antonio...@gmail.com> a écrit : > Hi, > > I am using the stateful processing api to read the states from a savepoint > file. > It works fine when the state size is small, but when the state size is > larger, around 11GB, I am getting an OOM. I think it happens when it is > doing a dataSource.collect() to obtain the states. The stackTrace is copied > at the end of the message. > > Any suggestions or hints would be very helpful. > > Thanks in advance. > > Antonio. > > java.lang.OutOfMemoryError: null > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > ~[?:1.8.0_282] > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > ~[?:1.8.0_282] > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > ~[?:1.8.0_282] > at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > ~[?:1.8.0_282] > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > ~[?:1.8.0_282] > at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > ~[?:1.8.0_282] > at > java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699) > ~[?:1.8.0_282] > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_282] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_282] > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > ~[?:1.8.0_282] > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:1.8.0_282] > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > ~[?:1.8.0_282] > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.11-1.12.2.jar:1.12.2] > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.12.2.jar:1.12.2] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.12.2.jar:1.12.2] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.12.2.jar:1.12.2] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.12.2.jar:1.12.2] >