Hi Antonio, Cascading window aggregations as done in your example is a good idea and is preferable if the aggregation function is combinable, which is true for sum (count can be done as sum of 1s).
Best, Fabian 2018-06-09 4:00 GMT+02:00 antonio saldivar <[email protected]>: > Hello > > Has anyone work this way? I am asking because I have to get the > aggregation ( Sum and Count) for multiple windows size (10 mins, 20 mins, > 30 mins) please let me know if this works properly or is there other good > solution. > > > DataStream<String> data = ... > // append a Long 1 to each record to count it. > DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); > > DataStream<Tuple2<String, Long>> 1minCnts = withOnes > // key by String field > .keyBy(0) > // define time window > .timeWindow(Time.of(1, MINUTES)) > // sum ones of the Long field > // in practice you want to use an incrementally aggregating ReduceFunction > and > // a WindowFunction to extract the start/end timestamp of the window > .sum(1); > > // emit 1-min counts to wherever you need it > 1minCnts.addSink(new YourSink()); > > // compute 5-min counts based on 1-min counts > DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts > // key by String field > .keyBy(0) > // define time window of 5 minutes > .timeWindow(Time.of(5, MINUTES)) > // sum the 1-minute counts in the Long field > .sum(1); > > // emit 5-min counts to wherever you need it > 5minCnts.addSink(new YourSink()); > > // continue with 1 day window and 1 week window > > Thank you Regards > > >
