Re: Best strategy for calculating percentage

2015-02-20 Thread Yiannis Gkoufas
Hi Fabian, thanks a lot for the clarification, will give it a shot and will let you know how it goes! BR On 20 February 2015 at 22:18, Fabian Hueske wrote: > Hi Yiannis, > > I think your program is not working correctly. The problem is with the > sum() aggregation function. > Right now, Flink'

Re: Best strategy for calculating percentage

2015-02-20 Thread Fabian Hueske
Hi Yiannis, I think your program is not working correctly. The problem is with the sum() aggregation function. Right now, Flink's aggregations function update values in place. That means, that all non-key and non-aggregations fields have undeterministic values. For example doing a groupBy(0,1).sum

Best strategy for calculating percentage

2015-02-20 Thread Yiannis Gkoufas
Hi there, I have the following scenario: My files have 2 attributes and 1 numeric value: (attr1,attr2,val) I want to generate the percentage of values of each of attr1 on the sum of val grouped on attr2 Currently I am doing it like this: input.map(e => e._2.toString.split(",")) .map(e=> (e(0),

Re: OutOfMemory during serialization

2015-02-20 Thread Robert Metzger
Lets create an issue in Flink to somehow fix the issue. Lets a) see if the new serializer registration in 0.9 allows users to replace the serializers if they had been already set by chill. and b) fix the issue in twitter/chill. I think we can ask them to release a new version with the fix (they se

Re: OutOfMemory during serialization

2015-02-20 Thread Ufuk Celebi
I've just looked into the BitSetSerializer of Chill. And it seems to be true that each bit is encoded as a boolean (for all bit positions <= "logical" length). Regarding the DataOutputSerializer: would help to catch OoM exceptions during resize operations and rethrow it with a more detailed mes

Re: OutOfMemory during serialization

2015-02-20 Thread Stephan Ewen
What happens (in the original stack trace) is the following: The serializer starts producing the byte stream data and we buffer it, to determine the length, before sending it over the network. While buffering that data, the memory runs out. It may be that you are simply short of memory, it may als

Re: OutOfMemory during serialization

2015-02-20 Thread Sebastian
I don't have a build unfortunately, I'm using the maven dependency. I'll try to find a workaround. Thx for your help. -s On 20.02.2015 12:44, Robert Metzger wrote: Hey Sebastian, I've fixed the issue in this branch: https://github.com/rmetzger/flink/tree/flink1589: Configuration c =newConfig

Re: OutOfMemory during serialization

2015-02-20 Thread Robert Metzger
Hey Sebastian, I've fixed the issue in this branch: https://github.com/rmetzger/flink/tree/flink1589: Configuration c = new Configuration(); c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f); final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c); I'll

Re: OutOfMemory during serialization

2015-02-20 Thread Robert Metzger
Hi Sebastian, Looks like you've found a limitation of Flink. I've already filed two JIRAs to resolve the issue ( https://issues.apache.org/jira/browse/FLINK-1588, https://issues.apache.org/jira/browse/FLINK-1589). I don't know your setup, when you use Flink just as a dependency without a version

Re: OutOfMemory during serialization

2015-02-20 Thread Sebastian
I'm running flink from my IDE, how do change this setting in that context? On 20.02.2015 11:41, Fabian Hueske wrote: Have you tried to increase the heap size by shrinking the TM-managed memory? Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM memory (taskmanager.memory

Re: OutOfMemory during serialization

2015-02-20 Thread Fabian Hueske
Have you tried to increase the heap size by shrinking the TM-managed memory? Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM memory (taskmanager.memory.size) in the flink-config.yaml [1]. Cheers, Fabian [1] http://flink.apache.org/docs/0.8/config.html > On 20 Feb 201

OutOfMemory during serialization

2015-02-20 Thread Sebastian
Hi, I get a strange out of memory error from the serialization code when I try to run the following program: def compute(trackingGraphFile: String, domainIndexFile: String, outputPath: String) = { implicit val env = ExecutionEnvironment.getExecutionEnvironment val edges = GraphUtils.readEd

OutOfMemory during serialization

2015-02-20 Thread Sebastian
Hi, I get a strange out of memory error from the serialization code when I try to run the following program: def compute(trackingGraphFile: String, domainIndexFile: String, outputPath: String) = { implicit val env = ExecutionEnvironment.getExecutionEnvironment val edges = GraphUtils.readEd