I have the following use case:
1. Group by a specific field.
2. Get a list of all messages belonging to the group.
3. Count the number of records in the group.
With the use of DataSets, it is fairly easy to do this (see
|fromElements(("a-b", "data1", 1), ("a-c", "data2", 1), ("a-b", "data3",
1)). groupBy(0). reduceGroup { (it: Iterator[(String, String, Int)],
out: Collector[(String, List[String], Int)]) => { val group = it.toList
if (group.length > 0) out.collect((group(0)._1, group.map(_._2),
group.map(_._3).sum)) } |
So, now I am moving to DataStreams (since the input is really a
DataStream). From my perspective, a Window should provide the same
functionality as a DataSet. This would easify the process a lot:
1. Window the elements.
2. Apply the same operations as before.
Is there a way in Flink to do so? Otherwise, I would like to think of a
solution to this problem.