I've just looked into the BitSetSerializer of Chill. And it seems to be true 
that each bit is encoded as a boolean (for all bit positions <= "logical" 
length).

Regarding the DataOutputSerializer: would help to catch OoM exceptions during 
resize operations and rethrow it with a more detailed message (how large the 
buffer is currently, new size after resize).

On 20 Feb 2015, at 13:22, Stephan Ewen <se...@apache.org> wrote:

> What happens (in the original stack trace) is the following: The serializer 
> starts producing the byte stream data and we buffer it, to determine the 
> length, before sending it over the network. While buffering that data, the 
> memory runs out.
> 
> It may be that you are simply short of memory, it may also be that the 
> serializer (here the Kryo Chill BitsetSerializer) is simply extremely 
> inefficient in terms of space. It seems that it tries to write a boolean 
> (coded as one byte) per bit. That is blowing up your bitset quite a bit.
> 
> A solution may also be to register a better bitset serializer. Chill's 
> default one seems to be sort of inefficient...
> 
> 
> 
> 
> 
> On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ssc.o...@googlemail.com> wrote:
> I don't have a build unfortunately, I'm using the maven dependency. I'll try 
> to find a workaround. Thx for your help.
> 
> -s
> 
> On 20.02.2015 12:44, Robert Metzger wrote:
> Hey Sebastian,
> 
> I've fixed the issue in this branch:
> https://github.com/rmetzger/flink/tree/flink1589:
> 
> Configuration c =newConfiguration();
> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
> finalExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment(c);
> 
> 
> I'll also backport the fix to the release-0.8 branch to make it
> available in the 0.8.2 release.
> 
> Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
> 
> 
> Best,
> Robert
> 
> On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetz...@apache.org
> <mailto:rmetz...@apache.org>> wrote:
> 
>     Hi Sebastian,
> 
>     Looks like you've found a limitation of Flink.
>     I've already filed two JIRAs to resolve the issue
>     (https://issues.apache.org/jira/browse/FLINK-1588,
>     https://issues.apache.org/jira/browse/FLINK-1589).
> 
>     I don't know your setup, when you use Flink just as a dependency
>     without a version being checked out, there is probably no way right
>     now to use change the configuration settings.
>     Then, you have to start yourself a local cluster
>     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>     You can then either submit your job with ./bin/flink or using the
>     RemoteExecutionEnvironment 
> (ExecutionEnvironment.createRemoteEnvironment()).
> 
>     If you have the Flink source checked out, you can also hard-code the
>     configuration values into org.apache.flink.client.LocalExecutor.
> 
> 
>     By the way, Flink 0.8.1 is now available on maven central (I suspect
>     you had to build it yourself yesterday evening).
>     But given these issues here, it doesn't matter for you anymore ;)
> 
> 
>     Best,
>     Robert
> 
> 
> 
>     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.o...@googlemail.com
>     <mailto:ssc.o...@googlemail.com>> wrote:
> 
>         I'm running flink from my IDE, how do change this setting in
>         that context?
> 
> 
>         On 20.02.2015 11:41, Fabian Hueske wrote:
> 
>             Have you tried to increase the heap size by shrinking the
>             TM-managed memory?
> 
>             Reduce the fraction (taskmanager.memory.fraction) or fix the
>             amount of TM memory (taskmanager.memory.size) in the
>             flink-config.yaml [1].
> 
>             Cheers, Fabian
> 
>             [1] http://flink.apache.org/docs/__0.8/config.html
>             <http://flink.apache.org/docs/0.8/config.html>
> 
> 
>                 On 20 Feb 2015, at 11:30, Sebastian
>                 <ssc.o...@googlemail.com
>                 <mailto:ssc.o...@googlemail.com>> wrote:
> 
>                 Hi,
> 
>                 I get a strange out of memory error from the
>                 serialization code when I try to run the following program:
> 
>                 def compute(trackingGraphFile: String, domainIndexFile:
>                 String,
>                    outputPath: String) = {
> 
>                 implicit val env =
>                 ExecutionEnvironment.__getExecutionEnvironment
> 
>                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>                 val domains = GraphUtils.readVertices(__domainIndexFile)
> 
>                 val domainsByCompany = DomainsByCompany.mapping
>                 val companyEdges = edges.filter { edge =>
>                      domainsByCompany.contains(__edge.src.toInt) }
>                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>                 edge.target.toInt }
>                    .distinct
> 
>                 val companyBitMaps = companyEdges.groupBy(0).__reduceGroup {
>                      domainsByCompany: Iterator[(String,Int)] =>
> 
>                      var company = ""
>                      val seenAt = new util.BitSet(42889800)
> 
>                      for ((name, domain) <- domainsByCompany) {
>                        company = name
>                        seenAt.set(domain)
>                      }
> 
>                      company -> seenAt
>                    }
> 
>                    companyBitMaps.print()
> 
>                    env.execute()
> 
>                 }
> 
> 
>                 The error looks as follows:
> 
> 
>                 2015-02-20 11:22:54 INFO  JobClient:345 -
>                 java.lang.OutOfMemoryError: Java heap space
>                          at org.apache.flink.runtime.io
>                 
> <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
>                          at org.apache.flink.runtime.io
>                 
> <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
>                          at
>                 
> org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
>                          at com.esotericsoftware.kryo.io
>                 
> <http://com.esotericsoftware.kryo.io>.__Output.flush(Output.java:163)
>                          at com.esotericsoftware.kryo.io
>                 
> <http://com.esotericsoftware.kryo.io>.__Output.require(Output.java:__142)
>                          at com.esotericsoftware.kryo.io
>                 
> <http://com.esotericsoftware.kryo.io>.__Output.writeBoolean(Output.__java:613)
>                          at
>                 
> com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
>                          at
>                 
> com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
>                          at
>                 
> com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
>                          at
>                 
> org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
>                          at
>                 
> org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>                          at
>                 
> org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>                          at
>                 
> org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
>                          at org.apache.flink.runtime.io
>                 
> <http://org.apache.flink.runtime.io>.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
>                          at org.apache.flink.runtime.io
>                 
> <http://org.apache.flink.runtime.io>.__network.api.RecordWriter.emit(__RecordWriter.java:82)
>                          at
>                 
> org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
>                          at
>                 
> org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>                          at
>                 
> org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
>                          at
>                 
> org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
>                          at
>                 
> org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
>                          at
>                 
> org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
>                          at java.lang.Thread.run(Thread.__java:745)
> 
>                 I run the job locally, giving 2GB of Ram to the VM. The
>                 code will produce less than 10 groups and the bitsets
>                 used internally should not be larger than a few megabytes.
> 
>                 Any tips on how to fix this?
> 
>                 Best,
>                 Sebastian
> 
>                 PS: Still waiting for a reduceGroup that gives me the key ;)
> 
> 
> 
> 
> 
> 
> 

Reply via email to