Hi Diwakar, the issue is that you fire_and_purge the state, you should just FIRE on the first element (or else you lose the information that you received the element already). You'd use FIRE_AND_PURGE on the last element though.
On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman < [email protected]> wrote: > Hi Diwakar, > > I'm not sure I fully understand your question. > If event handling in one window depends on some other windows than > TriggerContext.getPartitionedState can not be used. Triggers don't have > access to the global state (only to key-window scoped state). > If that's what you want then please consider ProcessWindowFunction [1] > where you can use context.globalState() in your process function. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction > > Regards, > Roman > > > On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <[email protected]> > wrote: > >> >> Hello, >> >> I'm trying to use a custom trigger for one of my use case. I have a basic >> logic (as shown below) of using keyBy on the input stream and using a >> window of 1 min. >> >> .keyBy(<key selector>) >> .window(TumblingEventTimeWindows.of(Time.seconds(60))) >> .trigger(new CustomTrigger()) >> .aggregate(Input.getAggregationFunction(), new >> AggregationProcessingWindow()); >> >> >> My custom trigger is expected to fire the first event of the keyBy >> instantly and any subsequent events should be aggregated in the window. >> >> .trigger(new Trigger<Record, TimeWindow>() { >>> @Override >>> public TriggerResult onElement(Record record, long l, TimeWindow >>> timeWindow, TriggerContext triggerContext) throws Exception { >>> ValueState<Boolean> firstSeen = >>> triggerContext.getPartitionedState(firstSceenDescriptor); >>> if(firstSeen.value() == null) { >>> firstSeen.update(true); >>> // fire trigger to early evaluate window and purge that event. >>> return TriggerResult.FIRE_AND_PURGE; >>> } >>> // Continue. Do not evaluate window per element >>> return TriggerResult.CONTINUE; >>> } >>> @Override >>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, >>> TriggerContext triggerContext) throws Exception { >>> // final evaluation and purge window state >>> return TriggerResult.FIRE_AND_PURGE; >>> } >>> @Override >>> public TriggerResult onEventTime(long l, TimeWindow timeWindow, >>> TriggerContext triggerContext) throws Exception { >>> return TriggerResult.CONTINUE; >>> } >>> @Override >>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext) >>> throws Exception { >>> >>> } >>> }) >> >> >> >> >> Currently, I see (for each window and same key) the first event of the >> window is always fired. But I want to see this happening for only the first >> window and for the subsequent window it should aggregate all the events and >> then fire. >> >> Example : all the records have the same key. >> current output. >> record 1 : first event in the window-1 : fired record 2 : last event in >> the window-1 : fired record 3 : first event in the window-2 : fired record >> 4, record 5 : - 2 events in the window-2 : fired. >> >> expected output. >> record 1 : first event in the window-1 : fired record 2 : last event in >> the window-1 : fired record 3,4,5 : all event in the window-2 : fired >> window-2 should not fire the first event of the same key. >> >> I'm reading it here >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge >> but not able to solve it. Any pointers would be helpful. >> >> Thanks. >> >
