Have you tried to increase the heap size by shrinking the TM-managed memory?
Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM memory (taskmanager.memory.size) in the flink-config.yaml [1]. Cheers, Fabian [1] http://flink.apache.org/docs/0.8/config.html > On 20 Feb 2015, at 11:30, Sebastian <ssc.o...@googlemail.com> wrote: > > Hi, > > I get a strange out of memory error from the serialization code when I try to > run the following program: > > def compute(trackingGraphFile: String, domainIndexFile: String, > outputPath: String) = { > > implicit val env = ExecutionEnvironment.getExecutionEnvironment > > val edges = GraphUtils.readEdges(trackingGraphFile) > val domains = GraphUtils.readVertices(domainIndexFile) > > val domainsByCompany = DomainsByCompany.mapping > val companyEdges = edges.filter { edge => > domainsByCompany.contains(edge.src.toInt) } > .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt } > .distinct > > val companyBitMaps = companyEdges.groupBy(0).reduceGroup { > domainsByCompany: Iterator[(String,Int)] => > > var company = "" > val seenAt = new util.BitSet(42889800) > > for ((name, domain) <- domainsByCompany) { > company = name > seenAt.set(domain) > } > > company -> seenAt > } > > companyBitMaps.print() > > env.execute() > > } > > > The error looks as follows: > > > 2015-02-20 11:22:54 INFO JobClient:345 - java.lang.OutOfMemoryError: Java > heap space > at > org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249) > at > org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) > at com.esotericsoftware.kryo.io.Output.require(Output.java:142) > at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613) > at > com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42) > at > com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88) > at > org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262) > at > org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) > at java.lang.Thread.run(Thread.java:745) > > I run the job locally, giving 2GB of Ram to the VM. The code will produce > less than 10 groups and the bitsets used internally should not be larger than > a few megabytes. > > Any tips on how to fix this? > > Best, > Sebastian > > PS: Still waiting for a reduceGroup that gives me the key ;) > > >