Hi! If you look into the code of WindowOperator you'll see a cleanup timer is registered for each element. This cleanup timer is used to deal with late records. I suppose that is the timer which calls the onEventTime in your trigger.
Trigger is a class for the user to decide whether to fire a window given the current element or row time. Deciding which window an element belongs to is the work of the window assigner, not the trigger. By using the CountTrigger on a TumbleWindow, the window of the current element will be fired when the counting exceeds the given threshold. For example if a window contains 6 elements and the counting threshold is 5, the same window will be fired 2 times. However if the window contains only 4 elements this window will be lost, unless the CountTrigger responds to the cleanup timer. Aeden Jameson <aeden.jame...@gmail.com> 于2021年9月2日周四 上午12:42写道: > Hi Caizhi, > > Thanks for responding. What i mean specifically is that if I do > something like this > > env > .addSource(events) > .keyBy(....) > .window(TumblingEventTimeWindows.of(Time.seconds(1))) > .trigger(new EmptyTrigger()) > .process(new MyProcessFunction()) > .addSink(...) > > Where EmptyTrigger is defined as, > > public class EmptyTrigger extends Trigger<MyElement, TimeWindow> { > @Override > public TriggerResult onElement(MyElement element, long timestamp, > TimeWindow window, TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > System.out.println("Processing Time: " + time); > return TriggerResult.FIRE; > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > System.out.println("Event Time: " + time); > return TriggerResult.FIRE; > } > > @Override > public void clear(TimeWindow window, TriggerContext ctx) throws > Exception { > > } > } > > I will see the onEventTime method above fire every second. However I have > not registered any timers in the above EmptyTrigger. My question becomes > where are those timers coming from? The documentation states, > > "By specifying a trigger using trigger() you are overwriting the default > trigger of a WindowAssigner." > > > This is the puzzling part to me about the above statement. > > Thanks, > Aeden > > > > > > > On Tue, Aug 31, 2021 at 7:58 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> I don't quite understand this problem. But if you look into >> WindowedStream#trigger you'll find that the trigger >> of WindowOperatorBuilder will change when you call that method, and thus >> the default trigger will be overwritten by calling WindowedStream#trigger. >> >> Aeden Jameson <aeden.jame...@gmail.com> 于2021年9月1日周三 上午12:32写道: >> >>> Flink Version: 1.13.2 >>> >>> In the section on Default Triggers of Window Assigners the documentation >>> States >>> >>> By specifying a trigger using trigger() you are overwriting the default >>> trigger of a WindowAssigner. For example, if you specify a CountTrigger >>> for TumblingEventTimeWindows you will no longer get window firings >>> based on the progress of time but only by count. >>> >>> While I find the example given to be true because the CountTrigger >>> doesn't FIRE in OnEventTime the timers of the default trigger are still >>> being registered. Is that accurate? So when writing a custom trigger >>> against the default trigger of a window one still needs to handle those >>> timers and decide to fire them or not. It may be just me, but the >>> documentation gives the impression that nothing would happen if one >>> specifies a custom trigger with .trigger(...). I'm I understanding what's >>> going on in the code around timers, default and custom triggers correctly? >>> If so it seems one solution to this would be to inherit from said >>> WindowAssign and override the getDefaultTrigger method. >>> >>> >>> -- >>> Thanks, >>> Aeden >>> >>> >>> >