Hi,
I think this can be neatly expressed by using something like a tree of
windowed aggregations, i.e. you specify your smallest window computation
first and then specify larger window computations based smaller windows.
I've written an example that showcases this approach:
https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd

The basic idea in pseudo code is this:

DataStream input = ...
dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
weeklyAggregate = dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new
Sum())
monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new
Sum())

the benefit of this approach is that you don't duplicate computation and
that you can have incremental aggregation using a reduce function. When
manually keeping elements and evicting them based on time the amount of
state that would have to be kept would be much larger.

Does that make sense and would it help your use case?

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 23:18 Shannon Carey <sca...@expedia.com> wrote:

> Yes, let me describe an example use-case that I'm trying to implement
> efficiently within Flink.
>
> We've been asked to aggregate per-user data on a daily level, and from
> there produce aggregates on a variety of time frames. For example, 7 days,
> 30 days, 180 days, and 365 days.
>
> We can talk about the hardest one, the 365 day window, with the knowledge
> that adding the other time windows magnifies the problem.
>
> I can easily use tumbling time windows of 1-day size for the first
> aggregation. However, for the longer aggregation, if I take the naive
> approach and use a sliding window, the window size would be 365 days and
> the slide would be one day. If a user comes back every day, I run the risk
> of magnifying the size of the data by up to 365 because each day of data
> will be included in up to 365 year-long window panes. Also, if I want to
> fire the aggregate information more rapidly than once a day, then I have to
> worry about getting 365 different windows fired at the same time & trying
> to figure out which one to pay attention to, or coming up with a
> hare-brained custom firing trigger. We tried emitting each day-aggregate
> into a time series database and doing the final 365 day aggregation as a
> query, but that was more complicated than we wanted: in particular we'd
> like to have all the logic in the Flink job not split across different
> technology & infrastructure.
>
> The work-around I'm thinking of is to use a single window that contains
> 365 days of data (relative to the current watermark) on an ongoing basis.
> The windowing function would be responsible for evicting old data based on
> the current watermark.
>
> Does that make sense? Does it seem logical, or am I misunderstanding
> something about how Flink works?
>
> -Shannon
>
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Monday, August 29, 2016 at 3:56 AM
>
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Firing windows multiple times
>
> Hi,
> that would certainly be possible? What do you think can be gained by
> having knowledge about the current watermark in the WindowFunction, in a
> specific case, possibly?
>
> Cheers,
> Aljoscha
>
> On Wed, 24 Aug 2016 at 23:21 Shannon Carey <sca...@expedia.com> wrote:
>
>> What do you think about adding the current watermark to the window
>> function metadata in FLIP-2?
>>
>> From: Shannon Carey <sca...@expedia.com>
>> Date: Friday, August 12, 2016 at 6:24 PM
>> To: Aljoscha Krettek <aljos...@apache.org>, "user@flink.apache.org" <
>> user@flink.apache.org>
>>
>> Subject: Re: Firing windows multiple times
>>
>> Thanks Aljoscha, I didn't know about those. Yes, they look like handy
>> changes, especially to enable flexible approaches for eviction. In
>> particular, having the current watermark available to the evictor via
>> EvictorContext is helpful: it will be able to evict the old data more
>> easily without needing to rely on Window#maxTimestamp().
>>
>> However, I think you might still be missing a piece. Specifically, it
>> would still not be possible for the window function to choose which items
>> to aggregate based on the current watermark. In particular, it is desirable
>> to be able to aggregate only the items below the watermark, omitting items
>> which have come in with timestamps larger than the watermark. Does that
>> make sense?
>>
>> -Shannon
>>
>> From: Aljoscha Krettek <aljos...@apache.org>
>> Date: Friday, August 12, 2016 at 4:25 AM
>> To: "user@flink.apache.org" <user@flink.apache.org>
>> Subject: Re: Firing windows multiple times
>>
>> Hi,
>> there is already this FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
>>  which
>> also links to a mailing list discussion. And this FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
>> The former proposes to enhance the Evictor API a bit, among other things we
>> propose to give the evictor access to the current watermark. The other FLIP
>> proposes to extend the amount of meta-data we give to the window function.
>> The first to things we propose to add is a "firing reason" that would tell
>> you whether this was an early firing, an on time firing or a late firing.
>> The second thing is a firing counter that would tell you how many times the
>> trigger has fired so far for the current window.
>>
>> Would a combination of these help with your use case?
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 11 Aug 2016 at 19:19 Shannon Carey <sca...@expedia.com> wrote:
>>
>>> "If Window B is a Folding Window and does not have an evictor then it
>>> should not keep the list of all received elements."
>>>
>>> Agreed! Upon closer inspection, the behavior I'm describing is only
>>> present when using EvictingWindowOperator, not when using WindowOperator. I
>>> misread line 382 of WindowOperator which calls windowState.add(): in
>>> actuality, the windowState is a FoldingState which incorporates the
>>> user-provided fold function in order to eagerly fold the data. In contrast,
>>> if you use an evictor, EvictingWindowOperator has the behavior I describe.
>>>
>>> I am already using a custom Trigger which uses a processing timer to
>>> FIRE a short time after a new event comes in, and an event timer to
>>> FIRE_AND_PURGE.
>>>
>>> It seems that I can achieve the desired effect by avoiding use of an
>>> evictor so that the intermediate events are not retained in an
>>> EvictingWindowOperator's state, and perform any necessary eviction within
>>> my fold function. This has the aforementioned drawbacks of the windowed
>>> fold function not knowing about watermarks, and therefore it is difficult
>>> to be precise about choosing which items to evict. However, this seems to
>>> be the best choice within the current framework.
>>>
>>> Interestingly, it appears that TimeEvictor doesn't really know about
>>> watermarks either. When a window emits an event, regardless of how it was
>>> fired, it is assigned the timestamp given by its window's maxTimestamp(),
>>> which might be much greater than the processing time that actually fired
>>> the event. Then, TimeEvictor compares the max timestamp of all items in the
>>> window against the other ones in order to determine which ones to evict.
>>> Basically, it assumes that the events were emitted due to the window
>>> terminating with FIRE_AND_PURGE. What if we gave more information
>>> (specifically, the current watermark) to the evictor in order to allow it
>>> to deal with a mix of intermediate events (fired by processing time) and
>>> final events (fired by event time when the watermark reaches the window)?
>>> That value is already available in the WindowOperator & could be passed to
>>> the Evictor very easily. It would be an API change, of course.
>>>
>>> Other than that, is it worth considering a change to
>>> EvictingWindowOperator to allow user-supplied functions to reduce the size
>>> of its state when people fire upstream windows repeatedly? From what I see
>>> when I monitor the state with debugger print statements, the
>>> EvictingWindowOperator is definitely holding on to all the elements ever
>>> received, not just the aggregated result. You can see this clearly because
>>> EvictingWindowOperator holds a ListState instead of a FoldingState. The
>>> user-provided fold function is only applied upon fire().
>>>
>>> -Shannon
>>>
>>>
>>>

Reply via email to