Hi Fabian, thanks a lot for the clarification, will give it a shot and will let you know how it goes!
BR On 20 February 2015 at 22:18, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Yiannis, > > I think your program is not working correctly. The problem is with the > sum() aggregation function. > Right now, Flink's aggregations function update values in place. That > means, that all non-key and non-aggregations fields have undeterministic > values. > For example doing a groupBy(0,1).sum(2) on two Tuples <0,1,2,3> and > <0,1,4,5> could either result in <0,1,6,3> or <0,1,6,5>. > > I would solve your task as follows: > > input.map(e => e._2.toString.split(",")) > .map(e=> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble)) // we > duplicate the value later > .groupBy(0,1) > .sum(2) > .groupBy(1) > .groupReduce(new CustomGroupReduceFunction()) > > The CustomGroupReduceFunction remembers all fields 0 and 2 and computes > the sum of field 2. Once all values have been read, you compute for each > remembered field 2 the percentage to the sum of all fields 2 and emit a new > tuple. > > This does only work, if the number of records in one group is not too > large. Otherwise you might face OutOfMemoryExceptions. > If the optimizer is clever enough, the data is only sorted once. > > Let me know, if you have any questions. > > Cheers, Fabian > > > 2015-02-20 22:01 GMT+01:00 Yiannis Gkoufas <johngou...@gmail.com>: > >> Hi there, >> >> I have the following scenario: >> My files have 2 attributes and 1 numeric value: >> (attr1,attr2,val) >> I want to generate the percentage of values of each of attr1 on the sum >> of val grouped on attr2 >> Currently I am doing it like this: >> >> input.map(e => e._2.toString.split(",")) >> .map(e=> >> (e(0),Utils.getMonthFromDate(e(1).toLong),e(3).toDouble,e(3).toDouble)) >> .groupBy(0,1) >> .sum(2) >> .groupBy(1) >> .sum(3) >> .map(e => (e._1,e._2,scala.math.BigDecimal(e._3*1.0/e._4*1.0).toString())) >> >> Is there a more efficient way to calculate this? >> >> Thanks a lot! >> >> >> >