I've just looked into the BitSetSerializer of Chill. And it seems to be true that each bit is encoded as a boolean (for all bit positions <= "logical" length).
Regarding the DataOutputSerializer: would help to catch OoM exceptions during resize operations and rethrow it with a more detailed message (how large the buffer is currently, new size after resize). On 20 Feb 2015, at 13:22, Stephan Ewen <se...@apache.org> wrote: > What happens (in the original stack trace) is the following: The serializer > starts producing the byte stream data and we buffer it, to determine the > length, before sending it over the network. While buffering that data, the > memory runs out. > > It may be that you are simply short of memory, it may also be that the > serializer (here the Kryo Chill BitsetSerializer) is simply extremely > inefficient in terms of space. It seems that it tries to write a boolean > (coded as one byte) per bit. That is blowing up your bitset quite a bit. > > A solution may also be to register a better bitset serializer. Chill's > default one seems to be sort of inefficient... > > > > > > On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ssc.o...@googlemail.com> wrote: > I don't have a build unfortunately, I'm using the maven dependency. I'll try > to find a workaround. Thx for your help. > > -s > > On 20.02.2015 12:44, Robert Metzger wrote: > Hey Sebastian, > > I've fixed the issue in this branch: > https://github.com/rmetzger/flink/tree/flink1589: > > Configuration c =newConfiguration(); > c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f); > finalExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(c); > > > I'll also backport the fix to the release-0.8 branch to make it > available in the 0.8.2 release. > > Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build. > > > Best, > Robert > > On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetz...@apache.org > <mailto:rmetz...@apache.org>> wrote: > > Hi Sebastian, > > Looks like you've found a limitation of Flink. > I've already filed two JIRAs to resolve the issue > (https://issues.apache.org/jira/browse/FLINK-1588, > https://issues.apache.org/jira/browse/FLINK-1589). > > I don't know your setup, when you use Flink just as a dependency > without a version being checked out, there is probably no way right > now to use change the configuration settings. > Then, you have to start yourself a local cluster > (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)). > You can then either submit your job with ./bin/flink or using the > RemoteExecutionEnvironment > (ExecutionEnvironment.createRemoteEnvironment()). > > If you have the Flink source checked out, you can also hard-code the > configuration values into org.apache.flink.client.LocalExecutor. > > > By the way, Flink 0.8.1 is now available on maven central (I suspect > you had to build it yourself yesterday evening). > But given these issues here, it doesn't matter for you anymore ;) > > > Best, > Robert > > > > On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.o...@googlemail.com > <mailto:ssc.o...@googlemail.com>> wrote: > > I'm running flink from my IDE, how do change this setting in > that context? > > > On 20.02.2015 11:41, Fabian Hueske wrote: > > Have you tried to increase the heap size by shrinking the > TM-managed memory? > > Reduce the fraction (taskmanager.memory.fraction) or fix the > amount of TM memory (taskmanager.memory.size) in the > flink-config.yaml [1]. > > Cheers, Fabian > > [1] http://flink.apache.org/docs/__0.8/config.html > <http://flink.apache.org/docs/0.8/config.html> > > > On 20 Feb 2015, at 11:30, Sebastian > <ssc.o...@googlemail.com > <mailto:ssc.o...@googlemail.com>> wrote: > > Hi, > > I get a strange out of memory error from the > serialization code when I try to run the following program: > > def compute(trackingGraphFile: String, domainIndexFile: > String, > outputPath: String) = { > > implicit val env = > ExecutionEnvironment.__getExecutionEnvironment > > val edges = GraphUtils.readEdges(__trackingGraphFile) > val domains = GraphUtils.readVertices(__domainIndexFile) > > val domainsByCompany = DomainsByCompany.mapping > val companyEdges = edges.filter { edge => > domainsByCompany.contains(__edge.src.toInt) } > .map { edge => domainsByCompany(edge.src.__toInt) -> > edge.target.toInt } > .distinct > > val companyBitMaps = companyEdges.groupBy(0).__reduceGroup { > domainsByCompany: Iterator[(String,Int)] => > > var company = "" > val seenAt = new util.BitSet(42889800) > > for ((name, domain) <- domainsByCompany) { > company = name > seenAt.set(domain) > } > > company -> seenAt > } > > companyBitMaps.print() > > env.execute() > > } > > > The error looks as follows: > > > 2015-02-20 11:22:54 INFO JobClient:345 - > java.lang.OutOfMemoryError: Java heap space > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249) > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93) > at > > org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39) > at com.esotericsoftware.kryo.io > > <http://com.esotericsoftware.kryo.io>.__Output.flush(Output.java:163) > at com.esotericsoftware.kryo.io > > <http://com.esotericsoftware.kryo.io>.__Output.require(Output.java:__142) > at com.esotericsoftware.kryo.io > > <http://com.esotericsoftware.kryo.io>.__Output.writeBoolean(Output.__java:613) > at > > com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42) > at > > com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29) > at > > com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599) > at > > org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155) > at > > org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91) > at > > org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30) > at > > org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51) > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76) > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.__network.api.RecordWriter.emit(__RecordWriter.java:82) > at > > org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88) > at > > org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262) > at > > org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124) > at > > org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493) > at > > org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360) > at > > org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257) > at java.lang.Thread.run(Thread.__java:745) > > I run the job locally, giving 2GB of Ram to the VM. The > code will produce less than 10 groups and the bitsets > used internally should not be larger than a few megabytes. > > Any tips on how to fix this? > > Best, > Sebastian > > PS: Still waiting for a reduceGroup that gives me the key ;) > > > > > > >