Hi,
that would certainly be possible? What do you think can be gained by having
knowledge about the current watermark in the WindowFunction, in a specific
case, possibly?

Cheers,
Aljoscha

On Wed, 24 Aug 2016 at 23:21 Shannon Carey <sca...@expedia.com> wrote:

> What do you think about adding the current watermark to the window
> function metadata in FLIP-2?
>
> From: Shannon Carey <sca...@expedia.com>
> Date: Friday, August 12, 2016 at 6:24 PM
> To: Aljoscha Krettek <aljos...@apache.org>, "user@flink.apache.org" <
> user@flink.apache.org>
>
> Subject: Re: Firing windows multiple times
>
> Thanks Aljoscha, I didn't know about those. Yes, they look like handy
> changes, especially to enable flexible approaches for eviction. In
> particular, having the current watermark available to the evictor via
> EvictorContext is helpful: it will be able to evict the old data more
> easily without needing to rely on Window#maxTimestamp().
>
> However, I think you might still be missing a piece. Specifically, it
> would still not be possible for the window function to choose which items
> to aggregate based on the current watermark. In particular, it is desirable
> to be able to aggregate only the items below the watermark, omitting items
> which have come in with timestamps larger than the watermark. Does that
> make sense?
>
> -Shannon
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Friday, August 12, 2016 at 4:25 AM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Firing windows multiple times
>
> Hi,
> there is already this FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
>  which
> also links to a mailing list discussion. And this FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
> The former proposes to enhance the Evictor API a bit, among other things we
> propose to give the evictor access to the current watermark. The other FLIP
> proposes to extend the amount of meta-data we give to the window function.
> The first to things we propose to add is a "firing reason" that would tell
> you whether this was an early firing, an on time firing or a late firing.
> The second thing is a firing counter that would tell you how many times the
> trigger has fired so far for the current window.
>
> Would a combination of these help with your use case?
>
> Cheers,
> Aljoscha
>
> On Thu, 11 Aug 2016 at 19:19 Shannon Carey <sca...@expedia.com> wrote:
>
>> "If Window B is a Folding Window and does not have an evictor then it
>> should not keep the list of all received elements."
>>
>> Agreed! Upon closer inspection, the behavior I'm describing is only
>> present when using EvictingWindowOperator, not when using WindowOperator. I
>> misread line 382 of WindowOperator which calls windowState.add(): in
>> actuality, the windowState is a FoldingState which incorporates the
>> user-provided fold function in order to eagerly fold the data. In contrast,
>> if you use an evictor, EvictingWindowOperator has the behavior I describe.
>>
>> I am already using a custom Trigger which uses a processing timer to FIRE
>> a short time after a new event comes in, and an event timer to
>> FIRE_AND_PURGE.
>>
>> It seems that I can achieve the desired effect by avoiding use of an
>> evictor so that the intermediate events are not retained in an
>> EvictingWindowOperator's state, and perform any necessary eviction within
>> my fold function. This has the aforementioned drawbacks of the windowed
>> fold function not knowing about watermarks, and therefore it is difficult
>> to be precise about choosing which items to evict. However, this seems to
>> be the best choice within the current framework.
>>
>> Interestingly, it appears that TimeEvictor doesn't really know about
>> watermarks either. When a window emits an event, regardless of how it was
>> fired, it is assigned the timestamp given by its window's maxTimestamp(),
>> which might be much greater than the processing time that actually fired
>> the event. Then, TimeEvictor compares the max timestamp of all items in the
>> window against the other ones in order to determine which ones to evict.
>> Basically, it assumes that the events were emitted due to the window
>> terminating with FIRE_AND_PURGE. What if we gave more information
>> (specifically, the current watermark) to the evictor in order to allow it
>> to deal with a mix of intermediate events (fired by processing time) and
>> final events (fired by event time when the watermark reaches the window)?
>> That value is already available in the WindowOperator & could be passed to
>> the Evictor very easily. It would be an API change, of course.
>>
>> Other than that, is it worth considering a change to
>> EvictingWindowOperator to allow user-supplied functions to reduce the size
>> of its state when people fire upstream windows repeatedly? From what I see
>> when I monitor the state with debugger print statements, the
>> EvictingWindowOperator is definitely holding on to all the elements ever
>> received, not just the aggregated result. You can see this clearly because
>> EvictingWindowOperator holds a ListState instead of a FoldingState. The
>> user-provided fold function is only applied upon fire().
>>
>> -Shannon
>>
>>
>>

Reply via email to