Hi Soheil, I don't think just overriding the window trigger function is sufficient, since your logic effectively changes the how elements are assigned to a window. Based on a quick scan I think your use case might be able to reuse the DynamicGapSessionWIndow [1], where you will have to create a customized session timeout extractor based on how many messages is currently in a window. and you should be able to reuse the trigger.
Thanks. Rong [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.html On Sat, Jul 14, 2018 at 10:08 PM Soheil Pourbafrani <soheil.i...@gmail.com> wrote: > I want to have a time window to trigger data processing in two following > condition: > 1 - The window has 3 messages > 2- Or any number of message (less than 3) is in the window and it reaches > a timeout > > I know someone should extend Trigger class: > > public static class MyWindowTrigger <W extends Window> extends > Trigger<Object, W> { > > @Override > public TriggerResult onElement(Object o, long l, W w, TriggerContext > triggerContext) throws Exception { > > } > > @Override > public TriggerResult onProcessingTime(long l, W w, TriggerContext > triggerContext) throws Exception { > return TriggerResult.CONTINUE ; > } > > @Override > public TriggerResult onEventTime(long l, W w, TriggerContext > triggerContext) throws Exception { > return TriggerResult.CONTINUE ; > } > > @Override > public void clear(W w, TriggerContext triggerContext) throws Exception { > > } > > But I don't know how should I check the number of messages in the window > and set a timeout. Can someone help? >