Hello Kostas, Sorry for late reply. But I couldn't understand how to apply split in datastream, such as in below to get the distinct output stream element with the count after applying group by and reduce.
DataStream<Tuple2<String, Long>> gridWithDensity = pointsWithGridCoordinates.map(new AddCountAppender()) .keyBy(2).reduce(*new GridPointsCount()*).map(new RetrieveGridWithCount()); gridWithDensity.print(); Current Output: Required Output: (33330,1) (33330,3) (33330,2) (00000,4) (00000,1) (00000,2) (00000,3) (33330,3) (00000,4) public static final class GridPointsCount implements ReduceFunction<Tuple4<Point, Grid, String, Long>> { @Override public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, String, Long> val1, Tuple4<Point, Grid, String, Long> val2) { return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, val1.f3 + val2.f3); } } Regards, Subash Basnet On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <k.klou...@data-artisans.com > wrote: > [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible > for Automatic Cleanup! (k.klou...@data-artisans.com) Add cleanup rule > <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DDbXSEeCvlLA38dy4LWQ%252Bbi5EVsEyM7uPcveSQFq%252FvFY%253D%26token%3DiyAq2d4gLBvR1lxgjbsxqD%252BdBWvTfV7BV7%252BvSygyQXwgHoGt5X14QdpMF1iSW4G0Qw7Sb6h%252FaXTQuS4dPnyuWCemTmCcMq0fJSpZwsztLpp9PMU7tCLvpRqvo9N%252B9Aj7ixZD8zvIdLvXB2%252FQqkPEDw%253D%253D&tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> > | More info > <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> > > Hi Subash, > > You should also split your elements in windows. > If not, Flink emits an element for each incoming record. > That is why you have: > > (1,1) > (1,2) > (1,3) > > … > > Kostas > > > On Aug 22, 2016, at 5:58 PM, subash basnet <yasub...@gmail.com> wrote: > > > > Hello all, > > > > I grouped by the input based on it's id to count the number of elements > in each group. > > DataStream<Tuple2<String, Long>> gridWithCount; > > Upon printing the above datastream it shows with duplicate rows: > > Output: > > (1, 1) > > (1,2) > > (2,1) > > (1,3) > > (2,2)....... > > > > Whereas I wanted the distinct rows with final count: > > Needed Output: > > (1,3) > > (2,2).. > > > > What could be the way to achieve this. > > > > > > Regards, > > Subash Basnet > > >