Flink's sliding window didn't work well for our use case at SAP as the
checkpointing freezes with 288 sliding windows per tenant. Implementing
sliding window through tumbling window / process function reduces the
checkpointing time to few seconds. We will see how that scales with 1000 or
more tenants to get the better idea about scalability.

Best Regards,

On Mon, 19 Aug 2019 at 16:16, Fabian Hueske <fhue...@gmail.com> wrote:

> Great!
>
> Thanks for the feedback.
>
> Cheers, Fabian
>
> Am Mo., 19. Aug. 2019 um 17:12 Uhr schrieb Ahmad Hassan <
> ahmad.has...@gmail.com>:
>
>>
>> Thank you Fabian. This works really well.
>>
>> Best Regards,
>>
>> On Fri, 16 Aug 2019 at 09:22, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Ahmad,
>>>
>>> The ProcessFunction should not rely on new records to come (i..e, do the
>>> processsing in the onElement() method) but rather register a timer every 5
>>> minutes and perform the processing when the timer fires in onTimer().
>>> Essentially, you'd only collect data the data in `processElement()` and
>>> process in `onTimer()`.
>>> You need to make sure that you have timers registered, as long as
>>> there's data in the ring buffer.
>>>
>>> Best, Fabian
>>>
>>> Am Do., 15. Aug. 2019 um 19:20 Uhr schrieb Ahmad Hassan <
>>> ahmad.has...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> In this case, how do we emit tumbling window when there are no events?
>>>> Otherwise it’s not possible to emulate a sliding window in process function
>>>> and move the buffer ring every 5 mins when there are no events.
>>>>
>>>> Yes I can create a periodic source function but how can it be
>>>> associated with all the keyed windows.
>>>>
>>>> Thanks.
>>>>
>>>> Best,
>>>>
>>>> On 2 Aug 2019, at 12:49, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>> Ok, I won't go into the implementation detail.
>>>>
>>>> The idea is to track all products that were observed in the last five
>>>> minutes (i.e., unique product ids) in a five minute tumbling window.
>>>> Every five minutes, the observed products are send to a process
>>>> function that collects the data of the last 24 hours and updates the
>>>> current result by adding the data of the latest 5 minutes and removing the
>>>> data of the 5 minutes that fell out of the 24 hour window.
>>>>
>>>> I don't know your exact business logic, but this is the rough scheme
>>>> that I would follow.
>>>>
>>>> Cheers, Fabian
>>>>
>>>> Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan <
>>>> ahmad.has...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for this detail. However, our pipeline is keeping track of list
>>>>> of products seen in 24 hour with 5 min slide (288 windows).
>>>>>
>>>>> inStream
>>>>>
>>>>> .filter(Objects::*nonNull*)
>>>>>
>>>>> .keyBy(*TENANT*)
>>>>>
>>>>> .window(SlidingProcessingTimeWindows.*of*(Time.*minutes*(24), Time.
>>>>> *minutes*(5)))
>>>>>
>>>>> .trigger(TimeTrigger.*create*())
>>>>>
>>>>> .evictor(CountEvictor.*of*(1))
>>>>>
>>>>> .process(*new* MetricProcessWindowFunction());
>>>>>
>>>>>
>>>>> Trigger just fires for onElement and MetricProcessWindowFunction just
>>>>> store stats for each product within MapState
>>>>>
>>>>> and emit only if it reaches expiry. Evictor just empty the window as
>>>>> all products state is within MapState. Flink 1.7.0 checkpointing just 
>>>>> hangs
>>>>> and expires while processing our pipeline.
>>>>>
>>>>>
>>>>> However, with your proposed solution, how would we be able to achieve
>>>>> this sliding window mechanism of emitting 24 hour window every 5 minute
>>>>> using processfunction ?
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>>
>>>>> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ahmad,
>>>>>>
>>>>>> First of all, you need to preaggregate the data in a 5 minute
>>>>>> tumbling window. For example, if your aggregation function is count or 
>>>>>> sum,
>>>>>> this is simple.
>>>>>> You have a 5 min tumbling window that just emits a count or sum every
>>>>>> 5 minutes.
>>>>>>
>>>>>> The ProcessFunction then has a MapState<Integer, IntermediateAgg>
>>>>>> (called buffer). IntermediateAgg is the result type of the tumbling 
>>>>>> window
>>>>>> and the MapState is used like an array with the Integer key being the
>>>>>> position pointer to the value. You will only use the pointers 0 to 287 to
>>>>>> store the 288 intermediate aggregation values and use the MapState as a
>>>>>> ring buffer. For that you need a ValueState<Integer> (called pointer) 
>>>>>> that
>>>>>> is a pointer to the position that is overwritten next. Finally, you have 
>>>>>> a
>>>>>> ValueState<Result> (called result) that stores the result of the last
>>>>>> window.
>>>>>>
>>>>>> When the ProcessFunction receives a new intermediate result, it will
>>>>>> perform the following steps:
>>>>>>
>>>>>> 1) get the oldest intermediate result: buffer.get(pointer)
>>>>>> 2) override the oldest intermediate result by the newly received
>>>>>> intermediate result: buffer.put(pointer, new-intermediate-result)
>>>>>> 3) increment the pointer by 1 and reset it to 0 if it became 288
>>>>>> 4) subtract the oldest intermediate result from the result
>>>>>> 5) add the newly received intermediate result to the result. Update
>>>>>> the result state and emit the result
>>>>>>
>>>>>> Note, this only works for certain aggregation functions. Depending on
>>>>>> the function, you cannot pre-aggregate which is a hard requirement for 
>>>>>> this
>>>>>> approach.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan <
>>>>>> ahmad.has...@gmail.com>:
>>>>>>
>>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>> >
>>>>>>> > - Pre-aggregate records in a 5 minute Tumbling window. However,
>>>>>>> pre-aggregation does not work for FoldFunctions.
>>>>>>> > - Implement the window as a custom ProcessFunction that maintains
>>>>>>> a state of 288 events and aggregates and retracts the pre-aggregated
>>>>>>> records.
>>>>>>> >
>>>>>>> > Best, Fabian
>>>>>>>
>>>>>>> We are finally implementing processFunction to replace Flink Sliding
>>>>>>> Window. Please can you elaborate how can we implement the sliding 
>>>>>>> window as
>>>>>>> processfunction like you explained above. I am struggling to understand 
>>>>>>> how
>>>>>>> will I keep track of what events belong to which window. We have 24hr
>>>>>>> running sliding window with 5 min slide (288 windows). How do I emulate 
>>>>>>> 288
>>>>>>> windows in processfunction with 5 min slide?
>>>>>>>
>>>>>>> 288 sliding windows cause flink checkpoints to hang and never finish
>>>>>>> even in an hour even with MapState RocksDB. So we decide to get rid of
>>>>>>> sliding window and use process function to implement sliding window 
>>>>>>> logic.
>>>>>>>
>>>>>>> Best,
>>>>>>
>>>>>>

Reply via email to