Just to add a drawback in solution 2) you may have some issues because window 
boundaries may not 
be aligned. For example the elements of a day window may be split between the 
last day of a month 
and the first of the next month.

Kostas

> On Aug 11, 2016, at 2:21 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Shanon,
> 
> From what I understand, you want to have your results windowed by different 
> different durations, e.g. by minute, by day,
> by month and you use the evictor to  decide which elements should go into 
> each window. If I am correct, then I do not 
> think that you need the evictor which bounds you to keep all the elements 
> that the operator has seen (because it uses a listState).
> 
> In this case you can do one of the following:
> 
> 1) if you just want to have the big window (by month) and all the smaller 
> ones to appear as early firings of the big one, then I would 
> suggest you to go with a custom trigger. The trigger has access to 
> watermarks, can register both event and processing time timers (so you can 
> have firings whenever you want (per minute, per day, etc), can have state 
> (e.g.element counter), and can decide to FIRE or FIRE_AND_PURGE.
> 
> The only downside is that all intermediate firings will appear to belong to 
> the big window. This means that the beginning and the end o the by-minute and 
> daily firings will be those of the month that they belong to. If this is not 
> a problem, I would go for that.
> 
> 2) If the above is a problem, then what you can do, is key your input stream 
> and then have 3 different windowing strategies, e.g. by minute, by day and by 
> month. This way you will have also the desired window boundaries. This would 
> look like:
> 
> keyedStream.timeWindow(byMonth).addSink …
> keyedStream.timeWindow(byDay).addSink …
> keyedStream.timeWindow(byMinute).addSink …
> 
> Please let us know if this answers your question and if you need any more 
> help.
> 
> Kostas
>  
>> On Aug 10, 2016, at 10:15 PM, Shannon Carey <sca...@expedia.com 
>> <mailto:sca...@expedia.com>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> Yes, I am using an Evictor, and I think I have seen the problem you are 
>> referring to. However, that's not what I'm talking about.
>> 
>> If you re-read my first email, the main point is the following: if users 
>> desire updates more frequently than window watermarks are reached, then 
>> window state behaves suboptimally. It doesn't matter if there's an evictor 
>> or not. Specifically:
>> 
>> If I have a windows "A" that I fire multiple times in order to provide 
>> incremental results as data comes in instead of waiting for the watermark to 
>> purge the window
>> And that window's events are gathered into another, bigger window "B"
>> And I want to keep only the latest event from each upstream window "A" (by 
>> timestamp, where each window pane has its own timestamp)
>> Even if I have a fold/reduce method on the bigger window "B" to make sure 
>> that each updated event from "A" overwrites the previous event (by timestamp)
>> Window "B" will hold in state all events from windows "A", including all the 
>> incremental events that were fired by processing-time triggers, even though 
>> I don't actually need those events because the reducer gets rid of them
>> 
>> An example description of execution flow:
>> Event x
>> Window A receives event, trigger waits for processing time delay, then emits 
>> event x(time=1, count=1)
>> Window B receives event, trigger waits for processing time delay, then 
>> executes fold() and emits event(time=1 => count=1), but internal Window 
>> state looks like [x(time=1, count=1)]
>> Event y
>> Window A receives event, trigger '', then emits event y(time=1, count=2)
>> Window B receives event, trigger '', then executes fold() and emits 
>> event(time=1 => count=2), but internal Window state looks like [x(time=1, 
>> count=1), y(time=1, count=2)]
>> Watermark z
>> Window A receives watermark, trigger's event timer is reached, fires and 
>> purges and emits current state as event z(time=1, count=2)
>> Window B receives event, trigger waits for processing time delay, then 
>> executes fold() and emits event(time=1 => count=2), but internal Window 
>> state looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]
>> As you can see, the internal window state continues to grow despite what 
>> fold() is doing.
>> 
>> Does that explanation help interpret my original email?
>> 
>> -Shannon
>> 
>> 
>> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
>> Date: Wednesday, August 10, 2016 at 12:18 PM
>> To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>> Subject: Re: Firing windows multiple times
>> 
>> Hi,
>> from your mail I'm gathering that you are in fact using an Evictor, is that 
>> correct? If not, then the window operator should not keep all the elements 
>> ever received for a window but only the aggregated result.
>> 
>> Side note, there seems to be a bug in EvictingWindowOperator that causes 
>> evicted elements to not actually be removed from the state. They are only 
>> filtered from the Iterable that is given to the WindowFunction. I opened a 
>> Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369 
>> <https://issues.apache.org/jira/browse/FLINK-4369>
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Wed, 10 Aug 2016 at 18:19 Shannon Carey <sca...@expedia.com 
>> <mailto:sca...@expedia.com>> wrote:
>> One unfortunate aspect of using a fold() instead of a window is that the 
>> fold function has no knowledge of the watermarks. As a result, it is 
>> difficult to ensure that only items before the current watermark are 
>> included in the aggregation, and that old items are evicted correctly. This 
>> fact lends more support to the idea of using a custom operator (though that 
>> is more complex) or adding support for this use case within Flink.
>> 
>> -Shannon
> 

Reply via email to