Sorry I mistyped the code, it should be *timeWindow**(Time.minutes(10))* instead of *window**(Time.minutes(10)).*
On Wed, Aug 24, 2016 at 9:29 PM, Yassine Marzougui <yassmar...@gmail.com> wrote: > Hi subash, > > A stream is infinite, hence it has no notion of "final" count. To get > distinct counts you need to define a period (= a window [1] ) over which > you count elements and emit a result, by adding a winow operator before the > reduce. > For example the following will emit distinct counts every 10 minutes over > the last 10 minutes period: > > *stream.keyby(2)* > * .window(Time.minutes(10))* > * .reduce(new GridPointsCount())* > > [1] https://ci.apache.org/projects/flink/flink-docs- > master/apis/streaming/windows.html > > > On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <yasub...@gmail.com> wrote: > >> Hello Kostas, >> >> Sorry for late reply. But I couldn't understand how to apply split in >> datastream, such as in below to get the distinct output stream element with >> the count after applying group by and reduce. >> >> DataStream<Tuple2<String, Long>> gridWithDensity = >> pointsWithGridCoordinates.map(new AddCountAppender()) >> .keyBy(2).reduce(*new GridPointsCount()*).map(new >> RetrieveGridWithCount()); >> gridWithDensity.print(); >> >> Current Output: >> Required Output: >> (33330,1) >> (33330,3) >> (33330,2) >> (00000,4) >> (00000,1) >> (00000,2) >> (00000,3) >> (33330,3) >> (00000,4) >> >> public static final class GridPointsCount implements >> ReduceFunction<Tuple4<Point, Grid, String, Long>> { >> @Override >> public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, >> String, Long> val1, >> Tuple4<Point, Grid, String, Long> val2) { >> return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, >> val1.f3 + val2.f3); >> } >> } >> >> >> Regards, >> Subash Basnet >> >> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> [image: Boxbe] <https://www.boxbe.com/overview> This message is >>> eligible for Automatic Cleanup! (k.klou...@data-artisans.com) Add >>> cleanup rule >>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DDbXSEeCvlLA38dy4LWQ%252Bbi5EVsEyM7uPcveSQFq%252FvFY%253D%26token%3DiyAq2d4gLBvR1lxgjbsxqD%252BdBWvTfV7BV7%252BvSygyQXwgHoGt5X14QdpMF1iSW4G0Qw7Sb6h%252FaXTQuS4dPnyuWCemTmCcMq0fJSpZwsztLpp9PMU7tCLvpRqvo9N%252B9Aj7ixZD8zvIdLvXB2%252FQqkPEDw%253D%253D&tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>> | More info >>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>> >>> Hi Subash, >>> >>> You should also split your elements in windows. >>> If not, Flink emits an element for each incoming record. >>> That is why you have: >>> >>> (1,1) >>> (1,2) >>> (1,3) >>> >>> … >>> >>> Kostas >>> >>> > On Aug 22, 2016, at 5:58 PM, subash basnet <yasub...@gmail.com> wrote: >>> > >>> > Hello all, >>> > >>> > I grouped by the input based on it's id to count the number of >>> elements in each group. >>> > DataStream<Tuple2<String, Long>> gridWithCount; >>> > Upon printing the above datastream it shows with duplicate rows: >>> > Output: >>> > (1, 1) >>> > (1,2) >>> > (2,1) >>> > (1,3) >>> > (2,2)....... >>> > >>> > Whereas I wanted the distinct rows with final count: >>> > Needed Output: >>> > (1,3) >>> > (2,2).. >>> > >>> > What could be the way to achieve this. >>> > >>> > >>> > Regards, >>> > Subash Basnet >>> >>> >>> >> >