Hi Antonio,

Cascading window aggregations as done in your example is a good idea and is
preferable if the aggregation function is combinable, which is true for sum
(count can be done as sum of 1s).

Best, Fabian

2018-06-09 4:00 GMT+02:00 antonio saldivar <[email protected]>:

> Hello
>
> Has anyone work this way? I am asking because I have to get the
> aggregation ( Sum and Count) for multiple windows size  (10 mins, 20 mins,
> 30 mins) please let me know if this works properly or is there other good
> solution.
>
>
> DataStream<String> data = ...
> // append a Long 1 to each record to count it.
> DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne);
>
> DataStream<Tuple2<String, Long>> 1minCnts = withOnes
>   // key by String field
>   .keyBy(0)
>   // define time window
>   .timeWindow(Time.of(1, MINUTES))
>   // sum ones of the Long field
>   // in practice you want to use an incrementally aggregating ReduceFunction 
> and
>   // a WindowFunction to extract the start/end timestamp of the window
>   .sum(1);
>
> // emit 1-min counts to wherever you need it
> 1minCnts.addSink(new YourSink());
>
> // compute 5-min counts based on 1-min counts
> DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
>   // key by String field
>   .keyBy(0)
>   // define time window of 5 minutes
>   .timeWindow(Time.of(5, MINUTES))
>   // sum the 1-minute counts in the Long field
>   .sum(1);
>
> // emit 5-min counts to wherever you need it
> 5minCnts.addSink(new YourSink());
>
> // continue with 1 day window and 1 week window
>
> Thank you Regards
>
>
>

Reply via email to