Great!

Thanks for the feedback.

Cheers, Fabian

Am Mo., 19. Aug. 2019 um 17:12 Uhr schrieb Ahmad Hassan <
ahmad.has...@gmail.com>:

>
> Thank you Fabian. This works really well.
>
> Best Regards,
>
> On Fri, 16 Aug 2019 at 09:22, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Ahmad,
>>
>> The ProcessFunction should not rely on new records to come (i..e, do the
>> processsing in the onElement() method) but rather register a timer every 5
>> minutes and perform the processing when the timer fires in onTimer().
>> Essentially, you'd only collect data the data in `processElement()` and
>> process in `onTimer()`.
>> You need to make sure that you have timers registered, as long as there's
>> data in the ring buffer.
>>
>> Best, Fabian
>>
>> Am Do., 15. Aug. 2019 um 19:20 Uhr schrieb Ahmad Hassan <
>> ahmad.has...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> In this case, how do we emit tumbling window when there are no events?
>>> Otherwise it’s not possible to emulate a sliding window in process function
>>> and move the buffer ring every 5 mins when there are no events.
>>>
>>> Yes I can create a periodic source function but how can it be associated
>>> with all the keyed windows.
>>>
>>> Thanks.
>>>
>>> Best,
>>>
>>> On 2 Aug 2019, at 12:49, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>> Ok, I won't go into the implementation detail.
>>>
>>> The idea is to track all products that were observed in the last five
>>> minutes (i.e., unique product ids) in a five minute tumbling window.
>>> Every five minutes, the observed products are send to a process function
>>> that collects the data of the last 24 hours and updates the current result
>>> by adding the data of the latest 5 minutes and removing the data of the 5
>>> minutes that fell out of the 24 hour window.
>>>
>>> I don't know your exact business logic, but this is the rough scheme
>>> that I would follow.
>>>
>>> Cheers, Fabian
>>>
>>> Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan <
>>> ahmad.has...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for this detail. However, our pipeline is keeping track of list
>>>> of products seen in 24 hour with 5 min slide (288 windows).
>>>>
>>>> inStream
>>>>
>>>> .filter(Objects::*nonNull*)
>>>>
>>>> .keyBy(*TENANT*)
>>>>
>>>> .window(SlidingProcessingTimeWindows.*of*(Time.*minutes*(24), Time.
>>>> *minutes*(5)))
>>>>
>>>> .trigger(TimeTrigger.*create*())
>>>>
>>>> .evictor(CountEvictor.*of*(1))
>>>>
>>>> .process(*new* MetricProcessWindowFunction());
>>>>
>>>>
>>>> Trigger just fires for onElement and MetricProcessWindowFunction just
>>>> store stats for each product within MapState
>>>>
>>>> and emit only if it reaches expiry. Evictor just empty the window as
>>>> all products state is within MapState. Flink 1.7.0 checkpointing just hangs
>>>> and expires while processing our pipeline.
>>>>
>>>>
>>>> However, with your proposed solution, how would we be able to achieve
>>>> this sliding window mechanism of emitting 24 hour window every 5 minute
>>>> using processfunction ?
>>>>
>>>>
>>>> Best,
>>>>
>>>>
>>>> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>>> Hi Ahmad,
>>>>>
>>>>> First of all, you need to preaggregate the data in a 5 minute tumbling
>>>>> window. For example, if your aggregation function is count or sum, this is
>>>>> simple.
>>>>> You have a 5 min tumbling window that just emits a count or sum every
>>>>> 5 minutes.
>>>>>
>>>>> The ProcessFunction then has a MapState<Integer, IntermediateAgg>
>>>>> (called buffer). IntermediateAgg is the result type of the tumbling window
>>>>> and the MapState is used like an array with the Integer key being the
>>>>> position pointer to the value. You will only use the pointers 0 to 287 to
>>>>> store the 288 intermediate aggregation values and use the MapState as a
>>>>> ring buffer. For that you need a ValueState<Integer> (called pointer) that
>>>>> is a pointer to the position that is overwritten next. Finally, you have a
>>>>> ValueState<Result> (called result) that stores the result of the last
>>>>> window.
>>>>>
>>>>> When the ProcessFunction receives a new intermediate result, it will
>>>>> perform the following steps:
>>>>>
>>>>> 1) get the oldest intermediate result: buffer.get(pointer)
>>>>> 2) override the oldest intermediate result by the newly received
>>>>> intermediate result: buffer.put(pointer, new-intermediate-result)
>>>>> 3) increment the pointer by 1 and reset it to 0 if it became 288
>>>>> 4) subtract the oldest intermediate result from the result
>>>>> 5) add the newly received intermediate result to the result. Update
>>>>> the result state and emit the result
>>>>>
>>>>> Note, this only works for certain aggregation functions. Depending on
>>>>> the function, you cannot pre-aggregate which is a hard requirement for 
>>>>> this
>>>>> approach.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan <
>>>>> ahmad.has...@gmail.com>:
>>>>>
>>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>> >
>>>>>> > - Pre-aggregate records in a 5 minute Tumbling window. However,
>>>>>> pre-aggregation does not work for FoldFunctions.
>>>>>> > - Implement the window as a custom ProcessFunction that maintains a
>>>>>> state of 288 events and aggregates and retracts the pre-aggregated 
>>>>>> records.
>>>>>> >
>>>>>> > Best, Fabian
>>>>>>
>>>>>> We are finally implementing processFunction to replace Flink Sliding
>>>>>> Window. Please can you elaborate how can we implement the sliding window 
>>>>>> as
>>>>>> processfunction like you explained above. I am struggling to understand 
>>>>>> how
>>>>>> will I keep track of what events belong to which window. We have 24hr
>>>>>> running sliding window with 5 min slide (288 windows). How do I emulate 
>>>>>> 288
>>>>>> windows in processfunction with 5 min slide?
>>>>>>
>>>>>> 288 sliding windows cause flink checkpoints to hang and never finish
>>>>>> even in an hour even with MapState RocksDB. So we decide to get rid of
>>>>>> sliding window and use process function to implement sliding window 
>>>>>> logic.
>>>>>>
>>>>>> Best,
>>>>>
>>>>>

Reply via email to