Ok, I won't go into the implementation detail.

The idea is to track all products that were observed in the last five
minutes (i.e., unique product ids) in a five minute tumbling window.
Every five minutes, the observed products are send to a process function
that collects the data of the last 24 hours and updates the current result
by adding the data of the latest 5 minutes and removing the data of the 5
minutes that fell out of the 24 hour window.

I don't know your exact business logic, but this is the rough scheme that I
would follow.

Cheers, Fabian

Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan <
ahmad.has...@gmail.com>:

> Hi Fabian,
>
> Thanks for this detail. However, our pipeline is keeping track of list of
> products seen in 24 hour with 5 min slide (288 windows).
>
> inStream
>
> .filter(Objects::*nonNull*)
>
> .keyBy(*TENANT*)
>
> .window(SlidingProcessingTimeWindows.*of*(Time.*minutes*(24), Time.
> *minutes*(5)))
>
> .trigger(TimeTrigger.*create*())
>
> .evictor(CountEvictor.*of*(1))
>
> .process(*new* MetricProcessWindowFunction());
>
>
> Trigger just fires for onElement and MetricProcessWindowFunction just
> store stats for each product within MapState
>
> and emit only if it reaches expiry. Evictor just empty the window as all
> products state is within MapState. Flink 1.7.0 checkpointing just hangs and
> expires while processing our pipeline.
>
>
> However, with your proposed solution, how would we be able to achieve this
> sliding window mechanism of emitting 24 hour window every 5 minute using
> processfunction ?
>
>
> Best,
>
>
> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Ahmad,
>>
>> First of all, you need to preaggregate the data in a 5 minute tumbling
>> window. For example, if your aggregation function is count or sum, this is
>> simple.
>> You have a 5 min tumbling window that just emits a count or sum every 5
>> minutes.
>>
>> The ProcessFunction then has a MapState<Integer, IntermediateAgg> (called
>> buffer). IntermediateAgg is the result type of the tumbling window and the
>> MapState is used like an array with the Integer key being the position
>> pointer to the value. You will only use the pointers 0 to 287 to store the
>> 288 intermediate aggregation values and use the MapState as a ring buffer.
>> For that you need a ValueState<Integer> (called pointer) that is a pointer
>> to the position that is overwritten next. Finally, you have a
>> ValueState<Result> (called result) that stores the result of the last
>> window.
>>
>> When the ProcessFunction receives a new intermediate result, it will
>> perform the following steps:
>>
>> 1) get the oldest intermediate result: buffer.get(pointer)
>> 2) override the oldest intermediate result by the newly received
>> intermediate result: buffer.put(pointer, new-intermediate-result)
>> 3) increment the pointer by 1 and reset it to 0 if it became 288
>> 4) subtract the oldest intermediate result from the result
>> 5) add the newly received intermediate result to the result. Update the
>> result state and emit the result
>>
>> Note, this only works for certain aggregation functions. Depending on the
>> function, you cannot pre-aggregate which is a hard requirement for this
>> approach.
>>
>> Best, Fabian
>>
>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan <
>> ahmad.has...@gmail.com>:
>>
>>>
>>> Hi Fabian,
>>>
>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote:
>>> >
>>> > - Pre-aggregate records in a 5 minute Tumbling window. However,
>>> pre-aggregation does not work for FoldFunctions.
>>> > - Implement the window as a custom ProcessFunction that maintains a
>>> state of 288 events and aggregates and retracts the pre-aggregated records.
>>> >
>>> > Best, Fabian
>>>
>>> We are finally implementing processFunction to replace Flink Sliding
>>> Window. Please can you elaborate how can we implement the sliding window as
>>> processfunction like you explained above. I am struggling to understand how
>>> will I keep track of what events belong to which window. We have 24hr
>>> running sliding window with 5 min slide (288 windows). How do I emulate 288
>>> windows in processfunction with 5 min slide?
>>>
>>> 288 sliding windows cause flink checkpoints to hang and never finish
>>> even in an hour even with MapState RocksDB. So we decide to get rid of
>>> sliding window and use process function to implement sliding window logic.
>>>
>>> Best,
>>
>>

Reply via email to