Twitter has merged my improved BitSetSerializer for Kryo: https://github.com/twitter/chill/pull/220 Once they've released a new version, I'll update our twitter-chill dependency.
On Fri, Feb 20, 2015 at 2:13 PM, Robert Metzger <rmetz...@apache.org> wrote: > Lets create an issue in Flink to somehow fix the issue. > > Lets a) see if the new serializer registration in 0.9 allows users to > replace the serializers if they had been already set by chill. > and b) fix the issue in twitter/chill. > I think we can ask them to release a new version with the fix (they seem > to release quite often). Also, I made good experiences with contributing to > twitter/chill. > > On Fri, Feb 20, 2015 at 2:02 PM, Ufuk Celebi <u...@apache.org> wrote: > >> I've just looked into the BitSetSerializer of Chill. And it seems to be >> true that each bit is encoded as a boolean (for all bit positions <= >> "logical" length). >> >> Regarding the DataOutputSerializer: would help to catch OoM exceptions >> during resize operations and rethrow it with a more detailed message (how >> large the buffer is currently, new size after resize). >> >> On 20 Feb 2015, at 13:22, Stephan Ewen <se...@apache.org> wrote: >> >> > What happens (in the original stack trace) is the following: The >> serializer starts producing the byte stream data and we buffer it, to >> determine the length, before sending it over the network. While buffering >> that data, the memory runs out. >> > >> > It may be that you are simply short of memory, it may also be that the >> serializer (here the Kryo Chill BitsetSerializer) is simply extremely >> inefficient in terms of space. It seems that it tries to write a boolean >> (coded as one byte) per bit. That is blowing up your bitset quite a bit. >> > >> > A solution may also be to register a better bitset serializer. Chill's >> default one seems to be sort of inefficient... >> > >> > >> > >> > >> > >> > On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ssc.o...@googlemail.com> >> wrote: >> > I don't have a build unfortunately, I'm using the maven dependency. >> I'll try to find a workaround. Thx for your help. >> > >> > -s >> > >> > On 20.02.2015 12:44, Robert Metzger wrote: >> > Hey Sebastian, >> > >> > I've fixed the issue in this branch: >> > https://github.com/rmetzger/flink/tree/flink1589: >> > >> > Configuration c =newConfiguration(); >> > c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f); >> > finalExecutionEnvironment env = >> ExecutionEnvironment.createLocalEnvironment(c); >> > >> > >> > I'll also backport the fix to the release-0.8 branch to make it >> > available in the 0.8.2 release. >> > >> > Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build. >> > >> > >> > Best, >> > Robert >> > >> > On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetz...@apache.org >> > <mailto:rmetz...@apache.org>> wrote: >> > >> > Hi Sebastian, >> > >> > Looks like you've found a limitation of Flink. >> > I've already filed two JIRAs to resolve the issue >> > (https://issues.apache.org/jira/browse/FLINK-1588, >> > https://issues.apache.org/jira/browse/FLINK-1589). >> > >> > I don't know your setup, when you use Flink just as a dependency >> > without a version being checked out, there is probably no way right >> > now to use change the configuration settings. >> > Then, you have to start yourself a local cluster >> > (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)). >> > You can then either submit your job with ./bin/flink or using the >> > RemoteExecutionEnvironment >> (ExecutionEnvironment.createRemoteEnvironment()). >> > >> > If you have the Flink source checked out, you can also hard-code the >> > configuration values into org.apache.flink.client.LocalExecutor. >> > >> > >> > By the way, Flink 0.8.1 is now available on maven central (I suspect >> > you had to build it yourself yesterday evening). >> > But given these issues here, it doesn't matter for you anymore ;) >> > >> > >> > Best, >> > Robert >> > >> > >> > >> > On Fri, Feb 20, 2015 at 11:48 AM, Sebastian < >> ssc.o...@googlemail.com >> > <mailto:ssc.o...@googlemail.com>> wrote: >> > >> > I'm running flink from my IDE, how do change this setting in >> > that context? >> > >> > >> > On 20.02.2015 11:41, Fabian Hueske wrote: >> > >> > Have you tried to increase the heap size by shrinking the >> > TM-managed memory? >> > >> > Reduce the fraction (taskmanager.memory.fraction) or fix the >> > amount of TM memory (taskmanager.memory.size) in the >> > flink-config.yaml [1]. >> > >> > Cheers, Fabian >> > >> > [1] http://flink.apache.org/docs/__0.8/config.html >> > <http://flink.apache.org/docs/0.8/config.html> >> > >> > >> > On 20 Feb 2015, at 11:30, Sebastian >> > <ssc.o...@googlemail.com >> > <mailto:ssc.o...@googlemail.com>> wrote: >> > >> > Hi, >> > >> > I get a strange out of memory error from the >> > serialization code when I try to run the following >> program: >> > >> > def compute(trackingGraphFile: String, domainIndexFile: >> > String, >> > outputPath: String) = { >> > >> > implicit val env = >> > ExecutionEnvironment.__getExecutionEnvironment >> > >> > val edges = GraphUtils.readEdges(__trackingGraphFile) >> > val domains = GraphUtils.readVertices(__domainIndexFile) >> > >> > val domainsByCompany = DomainsByCompany.mapping >> > val companyEdges = edges.filter { edge => >> > domainsByCompany.contains(__edge.src.toInt) } >> > .map { edge => domainsByCompany(edge.src.__toInt) -> >> > edge.target.toInt } >> > .distinct >> > >> > val companyBitMaps = >> companyEdges.groupBy(0).__reduceGroup { >> > domainsByCompany: Iterator[(String,Int)] => >> > >> > var company = "" >> > val seenAt = new util.BitSet(42889800) >> > >> > for ((name, domain) <- domainsByCompany) { >> > company = name >> > seenAt.set(domain) >> > } >> > >> > company -> seenAt >> > } >> > >> > companyBitMaps.print() >> > >> > env.execute() >> > >> > } >> > >> > >> > The error looks as follows: >> > >> > >> > 2015-02-20 11:22:54 INFO JobClient:345 - >> > java.lang.OutOfMemoryError: Java heap space >> > at org.apache.flink.runtime.io >> > <http://org.apache.flink.runtime.io >> >.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249) >> > at org.apache.flink.runtime.io >> > <http://org.apache.flink.runtime.io >> >.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93) >> > at >> > >> >> org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39) >> > at com.esotericsoftware.kryo.io >> > <http://com.esotericsoftware.kryo.io >> >.__Output.flush(Output.java:163) >> > at com.esotericsoftware.kryo.io >> > <http://com.esotericsoftware.kryo.io >> >.__Output.require(Output.java:__142) >> > at com.esotericsoftware.kryo.io >> > <http://com.esotericsoftware.kryo.io >> >.__Output.writeBoolean(Output.__java:613) >> > at >> > >> com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42) >> > at >> > >> com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29) >> > at >> > >> com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599) >> > at >> > >> >> org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155) >> > at >> > >> >> org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91) >> > at >> > >> >> org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30) >> > at >> > >> >> org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51) >> > at org.apache.flink.runtime.io >> > <http://org.apache.flink.runtime.io >> >.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76) >> > at org.apache.flink.runtime.io >> > <http://org.apache.flink.runtime.io >> >.__network.api.RecordWriter.emit(__RecordWriter.java:82) >> > at >> > >> >> org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88) >> > at >> > >> >> org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262) >> > at >> > >> >> org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124) >> > at >> > >> >> org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493) >> > at >> > >> >> org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360) >> > at >> > >> >> org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257) >> > at java.lang.Thread.run(Thread.__java:745) >> > >> > I run the job locally, giving 2GB of Ram to the VM. The >> > code will produce less than 10 groups and the bitsets >> > used internally should not be larger than a few >> megabytes. >> > >> > Any tips on how to fix this? >> > >> > Best, >> > Sebastian >> > >> > PS: Still waiting for a reduceGroup that gives me the >> key ;) >> > >> > >> > >> > >> > >> > >> > >> >> >