Hi Fabian, In this case, how do we emit tumbling window when there are no events? Otherwise it’s not possible to emulate a sliding window in process function and move the buffer ring every 5 mins when there are no events.
Yes I can create a periodic source function but how can it be associated with all the keyed windows. Thanks. Best, > On 2 Aug 2019, at 12:49, Fabian Hueske <fhue...@gmail.com> wrote: > > Ok, I won't go into the implementation detail. > > The idea is to track all products that were observed in the last five minutes > (i.e., unique product ids) in a five minute tumbling window. > Every five minutes, the observed products are send to a process function that > collects the data of the last 24 hours and updates the current result by > adding the data of the latest 5 minutes and removing the data of the 5 > minutes that fell out of the 24 hour window. > > I don't know your exact business logic, but this is the rough scheme that I > would follow. > > Cheers, Fabian > >> Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan >> <ahmad.has...@gmail.com>: >> Hi Fabian, >> >> Thanks for this detail. However, our pipeline is keeping track of list of >> products seen in 24 hour with 5 min slide (288 windows). >> >> inStream >> .filter(Objects::nonNull) >> .keyBy(TENANT) >> .window(SlidingProcessingTimeWindows.of(Time.minutes(24), Time.minutes(5))) >> .trigger(TimeTrigger.create()) >> .evictor(CountEvictor.of(1)) >> .process(new MetricProcessWindowFunction()); >> >> Trigger just fires for onElement and MetricProcessWindowFunction just store >> stats for each product within MapState >> and emit only if it reaches expiry. Evictor just empty the window as all >> products state is within MapState. Flink 1.7.0 checkpointing just hangs and >> expires while processing our pipeline. >> >> However, with your proposed solution, how would we be able to achieve this >> sliding window mechanism of emitting 24 hour window every 5 minute using >> processfunction ? >> >> Best, >> >> >>> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote: >>> Hi Ahmad, >>> >>> First of all, you need to preaggregate the data in a 5 minute tumbling >>> window. For example, if your aggregation function is count or sum, this is >>> simple. >>> You have a 5 min tumbling window that just emits a count or sum every 5 >>> minutes. >>> >>> The ProcessFunction then has a MapState<Integer, IntermediateAgg> (called >>> buffer). IntermediateAgg is the result type of the tumbling window and the >>> MapState is used like an array with the Integer key being the position >>> pointer to the value. You will only use the pointers 0 to 287 to store the >>> 288 intermediate aggregation values and use the MapState as a ring buffer. >>> For that you need a ValueState<Integer> (called pointer) that is a pointer >>> to the position that is overwritten next. Finally, you have a >>> ValueState<Result> (called result) that stores the result of the last >>> window. >>> >>> When the ProcessFunction receives a new intermediate result, it will >>> perform the following steps: >>> >>> 1) get the oldest intermediate result: buffer.get(pointer) >>> 2) override the oldest intermediate result by the newly received >>> intermediate result: buffer.put(pointer, new-intermediate-result) >>> 3) increment the pointer by 1 and reset it to 0 if it became 288 >>> 4) subtract the oldest intermediate result from the result >>> 5) add the newly received intermediate result to the result. Update the >>> result state and emit the result >>> >>> Note, this only works for certain aggregation functions. Depending on the >>> function, you cannot pre-aggregate which is a hard requirement for this >>> approach. >>> >>> Best, Fabian >>> >>>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan >>>> <ahmad.has...@gmail.com>: >>>> >>>> Hi Fabian, >>>> >>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote: >>>> > >>>> > - Pre-aggregate records in a 5 minute Tumbling window. However, >>>> > pre-aggregation does not work for FoldFunctions. >>>> > - Implement the window as a custom ProcessFunction that maintains a >>>> > state of 288 events and aggregates and retracts the pre-aggregated >>>> > records. >>>> > >>>> > Best, Fabian >>>> >>>> We are finally implementing processFunction to replace Flink Sliding >>>> Window. Please can you elaborate how can we implement the sliding window >>>> as processfunction like you explained above. I am struggling to understand >>>> how will I keep track of what events belong to which window. We have 24hr >>>> running sliding window with 5 min slide (288 windows). How do I emulate >>>> 288 windows in processfunction with 5 min slide? >>>> >>>> 288 sliding windows cause flink checkpoints to hang and never finish even >>>> in an hour even with MapState RocksDB. So we decide to get rid of sliding >>>> window and use process function to implement sliding window logic. >>>> >>>> Best,