Great! Thanks for the feedback.
Cheers, Fabian Am Mo., 19. Aug. 2019 um 17:12 Uhr schrieb Ahmad Hassan < ahmad.has...@gmail.com>: > > Thank you Fabian. This works really well. > > Best Regards, > > On Fri, 16 Aug 2019 at 09:22, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Ahmad, >> >> The ProcessFunction should not rely on new records to come (i..e, do the >> processsing in the onElement() method) but rather register a timer every 5 >> minutes and perform the processing when the timer fires in onTimer(). >> Essentially, you'd only collect data the data in `processElement()` and >> process in `onTimer()`. >> You need to make sure that you have timers registered, as long as there's >> data in the ring buffer. >> >> Best, Fabian >> >> Am Do., 15. Aug. 2019 um 19:20 Uhr schrieb Ahmad Hassan < >> ahmad.has...@gmail.com>: >> >>> Hi Fabian, >>> >>> In this case, how do we emit tumbling window when there are no events? >>> Otherwise it’s not possible to emulate a sliding window in process function >>> and move the buffer ring every 5 mins when there are no events. >>> >>> Yes I can create a periodic source function but how can it be associated >>> with all the keyed windows. >>> >>> Thanks. >>> >>> Best, >>> >>> On 2 Aug 2019, at 12:49, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Ok, I won't go into the implementation detail. >>> >>> The idea is to track all products that were observed in the last five >>> minutes (i.e., unique product ids) in a five minute tumbling window. >>> Every five minutes, the observed products are send to a process function >>> that collects the data of the last 24 hours and updates the current result >>> by adding the data of the latest 5 minutes and removing the data of the 5 >>> minutes that fell out of the 24 hour window. >>> >>> I don't know your exact business logic, but this is the rough scheme >>> that I would follow. >>> >>> Cheers, Fabian >>> >>> Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan < >>> ahmad.has...@gmail.com>: >>> >>>> Hi Fabian, >>>> >>>> Thanks for this detail. However, our pipeline is keeping track of list >>>> of products seen in 24 hour with 5 min slide (288 windows). >>>> >>>> inStream >>>> >>>> .filter(Objects::*nonNull*) >>>> >>>> .keyBy(*TENANT*) >>>> >>>> .window(SlidingProcessingTimeWindows.*of*(Time.*minutes*(24), Time. >>>> *minutes*(5))) >>>> >>>> .trigger(TimeTrigger.*create*()) >>>> >>>> .evictor(CountEvictor.*of*(1)) >>>> >>>> .process(*new* MetricProcessWindowFunction()); >>>> >>>> >>>> Trigger just fires for onElement and MetricProcessWindowFunction just >>>> store stats for each product within MapState >>>> >>>> and emit only if it reaches expiry. Evictor just empty the window as >>>> all products state is within MapState. Flink 1.7.0 checkpointing just hangs >>>> and expires while processing our pipeline. >>>> >>>> >>>> However, with your proposed solution, how would we be able to achieve >>>> this sliding window mechanism of emitting 24 hour window every 5 minute >>>> using processfunction ? >>>> >>>> >>>> Best, >>>> >>>> >>>> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>>> Hi Ahmad, >>>>> >>>>> First of all, you need to preaggregate the data in a 5 minute tumbling >>>>> window. For example, if your aggregation function is count or sum, this is >>>>> simple. >>>>> You have a 5 min tumbling window that just emits a count or sum every >>>>> 5 minutes. >>>>> >>>>> The ProcessFunction then has a MapState<Integer, IntermediateAgg> >>>>> (called buffer). IntermediateAgg is the result type of the tumbling window >>>>> and the MapState is used like an array with the Integer key being the >>>>> position pointer to the value. You will only use the pointers 0 to 287 to >>>>> store the 288 intermediate aggregation values and use the MapState as a >>>>> ring buffer. For that you need a ValueState<Integer> (called pointer) that >>>>> is a pointer to the position that is overwritten next. Finally, you have a >>>>> ValueState<Result> (called result) that stores the result of the last >>>>> window. >>>>> >>>>> When the ProcessFunction receives a new intermediate result, it will >>>>> perform the following steps: >>>>> >>>>> 1) get the oldest intermediate result: buffer.get(pointer) >>>>> 2) override the oldest intermediate result by the newly received >>>>> intermediate result: buffer.put(pointer, new-intermediate-result) >>>>> 3) increment the pointer by 1 and reset it to 0 if it became 288 >>>>> 4) subtract the oldest intermediate result from the result >>>>> 5) add the newly received intermediate result to the result. Update >>>>> the result state and emit the result >>>>> >>>>> Note, this only works for certain aggregation functions. Depending on >>>>> the function, you cannot pre-aggregate which is a hard requirement for >>>>> this >>>>> approach. >>>>> >>>>> Best, Fabian >>>>> >>>>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan < >>>>> ahmad.has...@gmail.com>: >>>>> >>>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>> > >>>>>> > - Pre-aggregate records in a 5 minute Tumbling window. However, >>>>>> pre-aggregation does not work for FoldFunctions. >>>>>> > - Implement the window as a custom ProcessFunction that maintains a >>>>>> state of 288 events and aggregates and retracts the pre-aggregated >>>>>> records. >>>>>> > >>>>>> > Best, Fabian >>>>>> >>>>>> We are finally implementing processFunction to replace Flink Sliding >>>>>> Window. Please can you elaborate how can we implement the sliding window >>>>>> as >>>>>> processfunction like you explained above. I am struggling to understand >>>>>> how >>>>>> will I keep track of what events belong to which window. We have 24hr >>>>>> running sliding window with 5 min slide (288 windows). How do I emulate >>>>>> 288 >>>>>> windows in processfunction with 5 min slide? >>>>>> >>>>>> 288 sliding windows cause flink checkpoints to hang and never finish >>>>>> even in an hour even with MapState RocksDB. So we decide to get rid of >>>>>> sliding window and use process function to implement sliding window >>>>>> logic. >>>>>> >>>>>> Best, >>>>> >>>>>