Hi Arvid, Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced duplicates though the result is still the same i.e record 1 is fired both at the start and the end of the window. so for every window i see the first event of the window is coming twice in the output.
I'm trying to explain again the desired behaviour, hopefully it becomes clear. all the records have the same key. current output. > record 1 : first event in the window-1 : fired > record 2 : last event in the window-1 : fired > record 3 : first event in the window-2 : fired. [this should not have > fired since it has the same Key as all other records.] > record 4, record 5 : - 2 events in the window-2 : fired. > expected output. > record 1 : first event in the window-1 : fired > record 2 : last event in the window-1 : fired > record 3,4,5 : all event in the window-2 : fired I think my problem is to store KeyBy values between windows. For example, I want to retain the KeyBy for 1 day. In that case, record 1 is fired instantly, all other records (of same key as record1) are always grouped in each window (say 1 min) instead of firing instantly. Thanks! On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise <[email protected]> wrote: > Hi Diwakar, > > the issue is that you fire_and_purge the state, you should just FIRE on > the first element (or else you lose the information that you received the > element already). > You'd use FIRE_AND_PURGE on the last element though. > > On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman < > [email protected]> wrote: > >> Hi Diwakar, >> >> I'm not sure I fully understand your question. >> If event handling in one window depends on some other windows than >> TriggerContext.getPartitionedState can not be used. Triggers don't have >> access to the global state (only to key-window scoped state). >> If that's what you want then please consider ProcessWindowFunction [1] >> where you can use context.globalState() in your process function. >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction >> >> Regards, >> Roman >> >> >> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <[email protected]> >> wrote: >> >>> >>> Hello, >>> >>> I'm trying to use a custom trigger for one of my use case. I have a >>> basic logic (as shown below) of using keyBy on the input stream and using a >>> window of 1 min. >>> >>> .keyBy(<key selector>) >>> .window(TumblingEventTimeWindows.of(Time.seconds(60))) >>> .trigger(new CustomTrigger()) >>> .aggregate(Input.getAggregationFunction(), new >>> AggregationProcessingWindow()); >>> >>> >>> My custom trigger is expected to fire the first event of the keyBy >>> instantly and any subsequent events should be aggregated in the window. >>> >>> .trigger(new Trigger<Record, TimeWindow>() { >>>> @Override >>>> public TriggerResult onElement(Record record, long l, TimeWindow >>>> timeWindow, TriggerContext triggerContext) throws Exception { >>>> ValueState<Boolean> firstSeen = >>>> triggerContext.getPartitionedState(firstSceenDescriptor); >>>> if(firstSeen.value() == null) { >>>> firstSeen.update(true); >>>> // fire trigger to early evaluate window and purge that event. >>>> return TriggerResult.FIRE_AND_PURGE; >>>> } >>>> // Continue. Do not evaluate window per element >>>> return TriggerResult.CONTINUE; >>>> } >>>> @Override >>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, >>>> TriggerContext triggerContext) throws Exception { >>>> // final evaluation and purge window state >>>> return TriggerResult.FIRE_AND_PURGE; >>>> } >>>> @Override >>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow, >>>> TriggerContext triggerContext) throws Exception { >>>> return TriggerResult.CONTINUE; >>>> } >>>> @Override >>>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext) >>>> throws Exception { >>>> >>>> } >>>> }) >>> >>> >>> >>> >>> Currently, I see (for each window and same key) the first event of the >>> window is always fired. But I want to see this happening for only the first >>> window and for the subsequent window it should aggregate all the events and >>> then fire. >>> >>> Example : all the records have the same key. >>> current output. >>> record 1 : first event in the window-1 : fired record 2 : last event in >>> the window-1 : fired record 3 : first event in the window-2 : fired record >>> 4, record 5 : - 2 events in the window-2 : fired. >>> >>> expected output. >>> record 1 : first event in the window-1 : fired record 2 : last event in >>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired >>> window-2 should not fire the first event of the same key. >>> >>> I'm reading it here >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge >>> but not able to solve it. Any pointers would be helpful. >>> >>> Thanks. >>> >>
