Hello, I'm trying to use a custom trigger for one of my use case. I have a basic logic (as shown below) of using keyBy on the input stream and using a window of 1 min.
.keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .trigger(new CustomTrigger()) .aggregate(Input.getAggregationFunction(), new AggregationProcessingWindow()); My custom trigger is expected to fire the first event of the keyBy instantly and any subsequent events should be aggregated in the window. .trigger(new Trigger<Record, TimeWindow>() { > @Override > public TriggerResult onElement(Record record, long l, TimeWindow > timeWindow, TriggerContext triggerContext) throws Exception { > ValueState<Boolean> firstSeen = > triggerContext.getPartitionedState(firstSceenDescriptor); > if(firstSeen.value() == null) { > firstSeen.update(true); > // fire trigger to early evaluate window and purge that event. > return TriggerResult.FIRE_AND_PURGE; > } > // Continue. Do not evaluate window per element > return TriggerResult.CONTINUE; > } > @Override > public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, > TriggerContext triggerContext) throws Exception { > // final evaluation and purge window state > return TriggerResult.FIRE_AND_PURGE; > } > @Override > public TriggerResult onEventTime(long l, TimeWindow timeWindow, > TriggerContext triggerContext) throws Exception { > return TriggerResult.CONTINUE; > } > @Override > public void clear(TimeWindow timeWindow, TriggerContext triggerContext) > throws Exception { > > } > }) Currently, I see (for each window and same key) the first event of the window is always fired. But I want to see this happening for only the first window and for the subsequent window it should aggregate all the events and then fire. Example : all the records have the same key. current output. record 1 : first event in the window-1 : fired record 2 : last event in the window-1 : fired record 3 : first event in the window-2 : fired record 4, record 5 : - 2 events in the window-2 : fired. expected output. record 1 : first event in the window-1 : fired record 2 : last event in the window-1 : fired record 3,4,5 : all event in the window-2 : fired window-2 should not fire the first event of the same key. I'm reading it here https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge but not able to solve it. Any pointers would be helpful. Thanks.