Hi!

If you look into the code of WindowOperator you'll see a cleanup timer is
registered for each element. This cleanup timer is used to deal with late
records. I suppose that is the timer which calls the onEventTime in your
trigger.

Trigger is a class for the user to decide whether to fire a window given
the current element or row time. Deciding which window an element belongs
to is the work of the window assigner, not the trigger. By using the
CountTrigger on a TumbleWindow, the window of the current element will be
fired when the counting exceeds the given threshold. For example if a
window contains 6 elements and the counting threshold is 5, the same window
will be fired 2 times. However if the window contains only 4 elements this
window will be lost, unless the CountTrigger responds to the cleanup timer.

Aeden Jameson <aeden.jame...@gmail.com> 于2021年9月2日周四 上午12:42写道:

> Hi Caizhi,
>
>    Thanks for responding. What i mean specifically is that if I do
> something like this
>
>         env
>             .addSource(events)
>             .keyBy(....)
>             .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>             .trigger(new EmptyTrigger())
>             .process(new MyProcessFunction())
>             .addSink(...)
>
> Where EmptyTrigger is defined as,
>
> public class EmptyTrigger extends Trigger<MyElement, TimeWindow> {
>     @Override
>     public TriggerResult onElement(MyElement element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         System.out.println("Processing Time: " + time);
>         return TriggerResult.FIRE;
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         System.out.println("Event Time: " + time);
>         return TriggerResult.FIRE;
>     }
>
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws
> Exception {
>
>     }
> }
>
> I will see the onEventTime method above fire every second. However I have
> not registered any timers in the above EmptyTrigger. My question becomes
> where are those timers coming from? The documentation states,
>
> "By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner."
>
>
> This is the puzzling part to me about the above statement.
>
> Thanks,
> Aeden
>
>
>
>
>
>
> On Tue, Aug 31, 2021 at 7:58 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> I don't quite understand this problem. But if you look into
>> WindowedStream#trigger you'll find that the trigger
>> of WindowOperatorBuilder will change when you call that method, and thus
>> the default trigger will be overwritten by calling WindowedStream#trigger.
>>
>> Aeden Jameson <aeden.jame...@gmail.com> 于2021年9月1日周三 上午12:32写道:
>>
>>> Flink Version: 1.13.2
>>>
>>> In the section on Default Triggers of Window Assigners the documentation
>>> States
>>>
>>> By specifying a trigger using trigger() you are overwriting the default
>>> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>>>  for TumblingEventTimeWindows you will no longer get window firings
>>> based on the progress of time but only by count.
>>>
>>> While I find the example given to be true because the CountTrigger
>>> doesn't FIRE in OnEventTime the timers of the default trigger are still
>>> being registered. Is that accurate? So when writing a custom trigger
>>> against the default trigger of a window one still needs to handle those
>>> timers and decide to fire them or not. It may be just me, but the
>>> documentation gives the impression that nothing would happen if one
>>> specifies a custom trigger with .trigger(...).  I'm I understanding what's
>>> going on in the code around timers, default and custom triggers correctly?
>>> If so it seems one solution to this would be to inherit from said
>>> WindowAssign and override the getDefaultTrigger method.
>>>
>>>
>>> --
>>> Thanks,
>>> Aeden
>>>
>>>
>>>
>

Reply via email to