Hi, that would certainly be possible? What do you think can be gained by having knowledge about the current watermark in the WindowFunction, in a specific case, possibly?
Cheers, Aljoscha On Wed, 24 Aug 2016 at 23:21 Shannon Carey <sca...@expedia.com> wrote: > What do you think about adding the current watermark to the window > function metadata in FLIP-2? > > From: Shannon Carey <sca...@expedia.com> > Date: Friday, August 12, 2016 at 6:24 PM > To: Aljoscha Krettek <aljos...@apache.org>, "user@flink.apache.org" < > user@flink.apache.org> > > Subject: Re: Firing windows multiple times > > Thanks Aljoscha, I didn't know about those. Yes, they look like handy > changes, especially to enable flexible approaches for eviction. In > particular, having the current watermark available to the evictor via > EvictorContext is helpful: it will be able to evict the old data more > easily without needing to rely on Window#maxTimestamp(). > > However, I think you might still be missing a piece. Specifically, it > would still not be possible for the window function to choose which items > to aggregate based on the current watermark. In particular, it is desirable > to be able to aggregate only the items below the watermark, omitting items > which have come in with timestamps larger than the watermark. Does that > make sense? > > -Shannon > > From: Aljoscha Krettek <aljos...@apache.org> > Date: Friday, August 12, 2016 at 4:25 AM > To: "user@flink.apache.org" <user@flink.apache.org> > Subject: Re: Firing windows multiple times > > Hi, > there is already this FLIP: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor > which > also links to a mailing list discussion. And this FLIP: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata. > The former proposes to enhance the Evictor API a bit, among other things we > propose to give the evictor access to the current watermark. The other FLIP > proposes to extend the amount of meta-data we give to the window function. > The first to things we propose to add is a "firing reason" that would tell > you whether this was an early firing, an on time firing or a late firing. > The second thing is a firing counter that would tell you how many times the > trigger has fired so far for the current window. > > Would a combination of these help with your use case? > > Cheers, > Aljoscha > > On Thu, 11 Aug 2016 at 19:19 Shannon Carey <sca...@expedia.com> wrote: > >> "If Window B is a Folding Window and does not have an evictor then it >> should not keep the list of all received elements." >> >> Agreed! Upon closer inspection, the behavior I'm describing is only >> present when using EvictingWindowOperator, not when using WindowOperator. I >> misread line 382 of WindowOperator which calls windowState.add(): in >> actuality, the windowState is a FoldingState which incorporates the >> user-provided fold function in order to eagerly fold the data. In contrast, >> if you use an evictor, EvictingWindowOperator has the behavior I describe. >> >> I am already using a custom Trigger which uses a processing timer to FIRE >> a short time after a new event comes in, and an event timer to >> FIRE_AND_PURGE. >> >> It seems that I can achieve the desired effect by avoiding use of an >> evictor so that the intermediate events are not retained in an >> EvictingWindowOperator's state, and perform any necessary eviction within >> my fold function. This has the aforementioned drawbacks of the windowed >> fold function not knowing about watermarks, and therefore it is difficult >> to be precise about choosing which items to evict. However, this seems to >> be the best choice within the current framework. >> >> Interestingly, it appears that TimeEvictor doesn't really know about >> watermarks either. When a window emits an event, regardless of how it was >> fired, it is assigned the timestamp given by its window's maxTimestamp(), >> which might be much greater than the processing time that actually fired >> the event. Then, TimeEvictor compares the max timestamp of all items in the >> window against the other ones in order to determine which ones to evict. >> Basically, it assumes that the events were emitted due to the window >> terminating with FIRE_AND_PURGE. What if we gave more information >> (specifically, the current watermark) to the evictor in order to allow it >> to deal with a mix of intermediate events (fired by processing time) and >> final events (fired by event time when the watermark reaches the window)? >> That value is already available in the WindowOperator & could be passed to >> the Evictor very easily. It would be an API change, of course. >> >> Other than that, is it worth considering a change to >> EvictingWindowOperator to allow user-supplied functions to reduce the size >> of its state when people fire upstream windows repeatedly? From what I see >> when I monitor the state with debugger print statements, the >> EvictingWindowOperator is definitely holding on to all the elements ever >> received, not just the aggregated result. You can see this clearly because >> EvictingWindowOperator holds a ListState instead of a FoldingState. The >> user-provided fold function is only applied upon fire(). >> >> -Shannon >> >> >>