Thank you very much Fabian I found that solution in the link below and this is the bes fit for my use case https://stackoverflow.com/questions/47458497/apache-flink-how-to-apply-multiple-counting-window-functions?rq=1
I am still testing how to Count (example. numTransactions >= 3) then I sum I am also using *Tumbling windows * *Best Regards* 2018-06-10 6:04 GMT-04:00 Fabian Hueske <[email protected]>: > Hi Antonio, > > Cascading window aggregations as done in your example is a good idea and > is preferable if the aggregation function is combinable, which is true for > sum (count can be done as sum of 1s). > > Best, Fabian > > 2018-06-09 4:00 GMT+02:00 antonio saldivar <[email protected]>: > >> Hello >> >> Has anyone work this way? I am asking because I have to get the >> aggregation ( Sum and Count) for multiple windows size (10 mins, 20 mins, >> 30 mins) please let me know if this works properly or is there other good >> solution. >> >> >> DataStream<String> data = ... >> // append a Long 1 to each record to count it. >> DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); >> >> DataStream<Tuple2<String, Long>> 1minCnts = withOnes >> // key by String field >> .keyBy(0) >> // define time window >> .timeWindow(Time.of(1, MINUTES)) >> // sum ones of the Long field >> // in practice you want to use an incrementally aggregating ReduceFunction >> and >> // a WindowFunction to extract the start/end timestamp of the window >> .sum(1); >> >> // emit 1-min counts to wherever you need it >> 1minCnts.addSink(new YourSink()); >> >> // compute 5-min counts based on 1-min counts >> DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts >> // key by String field >> .keyBy(0) >> // define time window of 5 minutes >> .timeWindow(Time.of(5, MINUTES)) >> // sum the 1-minute counts in the Long field >> .sum(1); >> >> // emit 5-min counts to wherever you need it >> 5minCnts.addSink(new YourSink()); >> >> // continue with 1 day window and 1 week window >> >> Thank you Regards >> >> >> >
