Thanks Fabian, Just for completion. In that 1 min window, is my modified count trigger still valid ? Say, if in that one minute window, I have 100 events after 30 s, it will still fire at 30th second ?
Cheers, anwar. On Fri, Nov 27, 2015 at 3:31 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Anwar, > > You trigger looks good! > > I just want to make sure you know what it is exactly happening after a > window was evaluated and purged. > Once a window was purged, the whole window is cleared and removed. If a > new element arrives, that would have fit into the purged window, a new > window with exactly the same time boundaries is created, i.e., if you have > a 5 min time window, that is fired and purged in minute 4 and a new element > arrived immediately after the purging, it is put into a window, that will > only "exist" for 1 more minute (and not starting a new 5 minute window). > > Cheers, Fabian > > > 2015-11-27 14:59 GMT+01:00 Anwar Rizal <anriza...@gmail.com>: > >> Thanks Fabian and Aljoscha, >> >> I try to implement the trigger as you described as follow: >> >> https://gist.github.com/anonymous/d0578a4d27768a75bea4 >> <https://gist.github.com/anonymous/d0578a4d27768a75bea4> >> >> It works fine , indeed. >> >> Thanks, >> Anwar >> >> >> On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi Anwar, >>> what Fabian wrote is completely right. I just want to give the reasoning >>> for why the CountTrigger behaves as it does. The idea was to have Triggers >>> that clearly focus on one thing and then at some point add combination >>> triggers. For example, an OrTrigger that triggers if either of it’s sub >>> triggers triggers, or an AndTrigger that triggers after both its sub >>> triggers fire. (There is also more complex stuff that could be thought of >>> here.) >>> >>> Cheers, >>> Aljoscha >>> > On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote: >>> > >>> > >>> > Hi, >>> > >>> > a regular tumbling time window of 5 seconds gets all elements within >>> that period of time (semantics of time varies for processing, ingestion, >>> and event time modes) and triggers the execution after 5 seconds. >>> > >>> > If you define a custom trigger, the assignment policy remains the >>> same, but the trigger condition is overwritten (it is NOT additional but >>> replaces the default condition), i.e., in your implementation, it will only >>> trigger when 100 elements arrived. In order to trigger also when the window >>> time expires, you have to register a timer (processing time or event time >>> timer) via the trigger context. >>> > NOTE: The window assigner will continue to assign elements to the >>> window, even if the window was already evaluated. If you PURGE the window >>> and an element arrives after that, a new window is created. >>> > >>> > To implement your trigger, you have to register a timer in the >>> onEvent() method with: >>> > ctx.registerEventTimeTimer(window.getEnd) >>> > You can to that in every onEvent() call, because the timer is always >>> overwritten. >>> > >>> > NOTE: you should use Flink’s keyed-state (access via triggerContext) >>> if you want to keep state such as the current count. >>> > >>> > Hope this helps. Please let me know if you have further questions. >>> > Fabian >>> > >>> > >>> > >>> > >>> > From: Matthias J. Sax >>> > Sent: Friday, November 27, 2015 08:44 >>> > To: user@flink.apache.org >>> > Subject: Re: Doubt about window and count trigger >>> > >>> > >>> > Hi, >>> > >>> > a Trigger is an *additional* condition for intermediate (early) >>> > evaluation of the window. Thus, it is not "or-ed" to the basic window >>> > definition. >>> > >>> > If you want to have an or-ed window condition, you can customize it by >>> > specifying your own window definition. >>> > >>> > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put >>> your code here */ ); >>> > >>> > -Matthias >>> > >>> > >>> > On 11/26/2015 11:40 PM, Anwar Rizal wrote: >>> > > Hi all, >>> > > >>> > > From the documentation: >>> > > "The |Trigger| specifies when the function that comes after the >>> window >>> > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each >>> window." >>> > > >>> > > So, basically, if I specify: >>> > > >>> > > |keyedStream >>> > > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)) >>> > > .trigger(CountTrigger.of(100))| >>> > > >>> > > | >>> > > | >>> > > >>> > > |The execution of the window function is triggered when the count >>> reaches 100 in the time window of 5 seconds. If you have a system that >>> never reaches 100 in 5 seconds, basically you will never have the window >>> fired.| >>> > > >>> > > | >>> > > | >>> > > >>> > > |My question is, what would be the best option to have behavior as >>> follow:| >>> > > >>> > > |The execution of the window function is triggered when 5 seconds is >>> reached or 100 events are received before 5 seconds.| >>> > > >>> > > >>> > > I think of implementing my own trigger that looks like CountTrigger, >>> but that will fire also when the end of time window is reached (at the >>> moment, it just returns Continue, instead of Fired). But maybe there's a >>> better way ? >>> > > >>> > > Is there a reason why CountTrigger is implemented as it is >>> implemented today, and not as I described above (5 seconds or 100 events >>> reached, whichever comes first). >>> > > >>> > > >>> > > Thanks, >>> > > >>> > > Anwar. >>> > > >>> >>> >> >