Re: groupBy on a Dataset of Maps

2016-04-29 Thread Punit Naik
I forgot to mention that in this code my out.collect method is outputting a tuple of Map[key,value] and the count as Int. On Fri, Apr 29, 2016 at 4:53 PM, Punit Naik wrote: > Anyways, I fixed it. To you groupBy you should attach this: > > .reduceGroup { > (in, out: org.apache.flink.util.Co

Re: groupBy on a Dataset of Maps

2016-04-29 Thread Punit Naik
Anyways, I fixed it. To you groupBy you should attach this: .reduceGroup { (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) => var v:Int = 0; var k:Map[key,value]=Map() for (t <- in) { v+=1; k=t } out.collect((k,v))

Re: groupBy on a Dataset of Maps

2016-04-29 Thread Punit Naik
What if after grouping I wanted to count the occurrences of the key "ga_date"? On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < stefano.bagh...@radicalbit.io> wrote: > The `get` method on the Scala map returns an Option, which is not > (currently) a valid key type for Flink (but there's ongoing

Re: groupBy on a Dataset of Maps

2016-04-29 Thread Punit Naik
okay I'll take all your points into consideration. On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < stefano.bagh...@radicalbit.io> wrote: > The `get` method on the Scala map returns an Option, which is not > (currently) a valid key type for Flink (but there's ongoing work on this > [1]). Flink

Re: groupBy on a Dataset of Maps

2016-04-29 Thread Stefano Baghino
The `get` method on the Scala map returns an Option, which is not (currently) a valid key type for Flink (but there's ongoing work on this [1]). Flink must be aware of how to use a particular type as a key if you want to group by a value of said type. See the advanced DataSet concepts in the offici

groupBy on a Dataset of Maps

2016-04-29 Thread Punit Naik
Below is my code: val env = ExecutionEnvironment.getExecutionEnvironment val data=env.readTextFile("file:///home/punit/test").flatMap( line => JSON.parseFull(line) ) val j=data.flatMap{ _ match {case map: Map[String, Any] => {List(Map("ga_date" -> map.get("ga_dateHour")