Have you tried to increase the heap size by shrinking the TM-managed memory?

Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM 
memory (taskmanager.memory.size) in the flink-config.yaml [1].

Cheers, Fabian

[1] http://flink.apache.org/docs/0.8/config.html


> On 20 Feb 2015, at 11:30, Sebastian <ssc.o...@googlemail.com> wrote:
> 
> Hi,
> 
> I get a strange out of memory error from the serialization code when I try to 
> run the following program:
> 
> def compute(trackingGraphFile: String, domainIndexFile: String,
>  outputPath: String) = {
> 
> implicit val env = ExecutionEnvironment.getExecutionEnvironment
> 
> val edges = GraphUtils.readEdges(trackingGraphFile)
> val domains = GraphUtils.readVertices(domainIndexFile)
> 
> val domainsByCompany = DomainsByCompany.mapping
> val companyEdges = edges.filter { edge =>
>    domainsByCompany.contains(edge.src.toInt) }
>  .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt }
>  .distinct
> 
> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>    domainsByCompany: Iterator[(String,Int)] =>
> 
>    var company = ""
>    val seenAt = new util.BitSet(42889800)
> 
>    for ((name, domain) <- domainsByCompany) {
>      company = name
>      seenAt.set(domain)
>    }
> 
>    company -> seenAt
>  }
> 
>  companyBitMaps.print()
> 
>  env.execute()
> 
> }
> 
> 
> The error looks as follows:
> 
> 
> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError: Java 
> heap space
>       at 
> org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249)
>       at 
> org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93)
>       at 
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>       at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>       at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>       at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613)
>       at 
> com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42)
>       at 
> com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29)
>       at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>       at 
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>       at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>       at 
> org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>       at 
> org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
>       at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
>       at 
> org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262)
>       at 
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>       at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>       at java.lang.Thread.run(Thread.java:745)
> 
> I run the job locally, giving 2GB of Ram to the VM. The code will produce 
> less than 10 groups and the bitsets used internally should not be larger than 
> a few megabytes.
> 
> Any tips on how to fix this?
> 
> Best,
> Sebastian
> 
> PS: Still waiting for a reduceGroup that gives me the key ;)
> 
> 
> 

Reply via email to