Hi Ahmad,

First of all, you need to preaggregate the data in a 5 minute tumbling
window. For example, if your aggregation function is count or sum, this is
simple.
You have a 5 min tumbling window that just emits a count or sum every 5
minutes.

The ProcessFunction then has a MapState<Integer, IntermediateAgg> (called
buffer). IntermediateAgg is the result type of the tumbling window and the
MapState is used like an array with the Integer key being the position
pointer to the value. You will only use the pointers 0 to 287 to store the
288 intermediate aggregation values and use the MapState as a ring buffer.
For that you need a ValueState<Integer> (called pointer) that is a pointer
to the position that is overwritten next. Finally, you have a
ValueState<Result> (called result) that stores the result of the last
window.

When the ProcessFunction receives a new intermediate result, it will
perform the following steps:

1) get the oldest intermediate result: buffer.get(pointer)
2) override the oldest intermediate result by the newly received
intermediate result: buffer.put(pointer, new-intermediate-result)
3) increment the pointer by 1 and reset it to 0 if it became 288
4) subtract the oldest intermediate result from the result
5) add the newly received intermediate result to the result. Update the
result state and emit the result

Note, this only works for certain aggregation functions. Depending on the
function, you cannot pre-aggregate which is a hard requirement for this
approach.

Best, Fabian

Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan <
ahmad.has...@gmail.com>:

>
> Hi Fabian,
>
> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > - Pre-aggregate records in a 5 minute Tumbling window. However,
> pre-aggregation does not work for FoldFunctions.
> > - Implement the window as a custom ProcessFunction that maintains a
> state of 288 events and aggregates and retracts the pre-aggregated records.
> >
> > Best, Fabian
>
> We are finally implementing processFunction to replace Flink Sliding
> Window. Please can you elaborate how can we implement the sliding window as
> processfunction like you explained above. I am struggling to understand how
> will I keep track of what events belong to which window. We have 24hr
> running sliding window with 5 min slide (288 windows). How do I emulate 288
> windows in processfunction with 5 min slide?
>
> 288 sliding windows cause flink checkpoints to hang and never finish even
> in an hour even with MapState RocksDB. So we decide to get rid of sliding
> window and use process function to implement sliding window logic.
>
> Best,

Reply via email to