Just to add a drawback in solution 2) you may have some issues because window boundaries may not be aligned. For example the elements of a day window may be split between the last day of a month and the first of the next month.
Kostas > On Aug 11, 2016, at 2:21 PM, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hi Shanon, > > From what I understand, you want to have your results windowed by different > different durations, e.g. by minute, by day, > by month and you use the evictor to decide which elements should go into > each window. If I am correct, then I do not > think that you need the evictor which bounds you to keep all the elements > that the operator has seen (because it uses a listState). > > In this case you can do one of the following: > > 1) if you just want to have the big window (by month) and all the smaller > ones to appear as early firings of the big one, then I would > suggest you to go with a custom trigger. The trigger has access to > watermarks, can register both event and processing time timers (so you can > have firings whenever you want (per minute, per day, etc), can have state > (e.g.element counter), and can decide to FIRE or FIRE_AND_PURGE. > > The only downside is that all intermediate firings will appear to belong to > the big window. This means that the beginning and the end o the by-minute and > daily firings will be those of the month that they belong to. If this is not > a problem, I would go for that. > > 2) If the above is a problem, then what you can do, is key your input stream > and then have 3 different windowing strategies, e.g. by minute, by day and by > month. This way you will have also the desired window boundaries. This would > look like: > > keyedStream.timeWindow(byMonth).addSink … > keyedStream.timeWindow(byDay).addSink … > keyedStream.timeWindow(byMinute).addSink … > > Please let us know if this answers your question and if you need any more > help. > > Kostas > >> On Aug 10, 2016, at 10:15 PM, Shannon Carey <sca...@expedia.com >> <mailto:sca...@expedia.com>> wrote: >> >> Hi Aljoscha, >> >> Yes, I am using an Evictor, and I think I have seen the problem you are >> referring to. However, that's not what I'm talking about. >> >> If you re-read my first email, the main point is the following: if users >> desire updates more frequently than window watermarks are reached, then >> window state behaves suboptimally. It doesn't matter if there's an evictor >> or not. Specifically: >> >> If I have a windows "A" that I fire multiple times in order to provide >> incremental results as data comes in instead of waiting for the watermark to >> purge the window >> And that window's events are gathered into another, bigger window "B" >> And I want to keep only the latest event from each upstream window "A" (by >> timestamp, where each window pane has its own timestamp) >> Even if I have a fold/reduce method on the bigger window "B" to make sure >> that each updated event from "A" overwrites the previous event (by timestamp) >> Window "B" will hold in state all events from windows "A", including all the >> incremental events that were fired by processing-time triggers, even though >> I don't actually need those events because the reducer gets rid of them >> >> An example description of execution flow: >> Event x >> Window A receives event, trigger waits for processing time delay, then emits >> event x(time=1, count=1) >> Window B receives event, trigger waits for processing time delay, then >> executes fold() and emits event(time=1 => count=1), but internal Window >> state looks like [x(time=1, count=1)] >> Event y >> Window A receives event, trigger '', then emits event y(time=1, count=2) >> Window B receives event, trigger '', then executes fold() and emits >> event(time=1 => count=2), but internal Window state looks like [x(time=1, >> count=1), y(time=1, count=2)] >> Watermark z >> Window A receives watermark, trigger's event timer is reached, fires and >> purges and emits current state as event z(time=1, count=2) >> Window B receives event, trigger waits for processing time delay, then >> executes fold() and emits event(time=1 => count=2), but internal Window >> state looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)] >> As you can see, the internal window state continues to grow despite what >> fold() is doing. >> >> Does that explanation help interpret my original email? >> >> -Shannon >> >> >> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> >> Date: Wednesday, August 10, 2016 at 12:18 PM >> To: "user@flink.apache.org <mailto:user@flink.apache.org>" >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> Subject: Re: Firing windows multiple times >> >> Hi, >> from your mail I'm gathering that you are in fact using an Evictor, is that >> correct? If not, then the window operator should not keep all the elements >> ever received for a window but only the aggregated result. >> >> Side note, there seems to be a bug in EvictingWindowOperator that causes >> evicted elements to not actually be removed from the state. They are only >> filtered from the Iterable that is given to the WindowFunction. I opened a >> Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369 >> <https://issues.apache.org/jira/browse/FLINK-4369> >> >> Cheers, >> Aljoscha >> >> On Wed, 10 Aug 2016 at 18:19 Shannon Carey <sca...@expedia.com >> <mailto:sca...@expedia.com>> wrote: >> One unfortunate aspect of using a fold() instead of a window is that the >> fold function has no knowledge of the watermarks. As a result, it is >> difficult to ensure that only items before the current watermark are >> included in the aggregation, and that old items are evicted correctly. This >> fact lends more support to the idea of using a custom operator (though that >> is more complex) or adding support for this use case within Flink. >> >> -Shannon >