Hello,

I'm trying to use a custom trigger for one of my use case. I have a basic
logic (as shown below) of using keyBy on the input stream and using a
window of 1 min.

.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomTrigger())
.aggregate(Input.getAggregationFunction(), new
AggregationProcessingWindow());


My custom trigger is expected to fire the first event of the keyBy
instantly and any subsequent events should be aggregated in the window.

.trigger(new Trigger<Record, TimeWindow>() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState<Boolean> firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
> throws Exception {
>
> }
> })




Currently, I see (for each window and same key) the first event of the
window is always fired. But I want to see this happening for only the first
window and for the subsequent window it should aggregate all the events and
then fire.

Example : all the records have the same key.
current output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3 : first event in the window-2 : fired record 4,
record 5 : - 2 events in the window-2 : fired.

expected output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key.

I'm reading it here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
but not able to solve it. Any pointers would be helpful.

Thanks.

Reply via email to