Hi. You cannot collect grouped dataset directly. You can collect grouped data 
by using reduceGroup method.
Following code is example:

import org.apache.flink.util.Collector
val result = grouped_ds.reduceGroup {
  (in, out: Collector[(Int, Seq[Int])]) => {
    val seq = in.toSeq
    // I assumed the first element in tuple is value and second element is key
    // because you grouped by 1 (same as second element in scala tuple)
    val key = seq(0)._2
    val values = seq.map(_._1)

    out.collect((key, values))
  }
}.collect()

Then you can collect the data as (key1, (values, …), (key2, (values, …), (key3, 
(values, …), ...

Regards,
Chiwan Park

> On Jun 11, 2015, at 11:01 PM, Maximilian Alber <alber.maximil...@gmail.com> 
> wrote:
> 
> Hi Flinksters,
> 
> I tried to call collect on a grouped data set, somehow it did not work. Is 
> this intended? If yes, why?
> 
> Code snippet:
>     // group a data set according to second field:
>     val grouped_ds = cross_ds.groupBy(1)
>     println("After groupBy: "+grouped_ds.collect())
> 
> Error:
> 
> [ant:scalac] 
> /media/alber/datadisk/tmp/flink/code/test/src/main/scala/test.scala:107: 
> error: value collect is not a member of 
> org.apache.flink.api.scala.GroupedDataSet[(Int, Int)]
> [ant:scalac]     println("After groupBy: "+grouped_ds.collect())
> [ant:scalac]                                          ^
> [ant:scalac] one error found
> 
> 
> Thanks!
> Cheers,
> Max




Reply via email to