Hi Diwakar,

the issue is that you fire_and_purge the state, you should just FIRE on the
first element (or else you lose the information that you received the
element already).
You'd use FIRE_AND_PURGE on the last element though.

On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
[email protected]> wrote:

> Hi Diwakar,
>
> I'm not sure I fully understand your question.
> If event handling in one window depends on some other windows than
> TriggerContext.getPartitionedState can not be used. Triggers don't have
> access to the global state (only to key-window scoped state).
> If that's what you want then please consider ProcessWindowFunction [1]
> where you can use context.globalState() in your process function.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <[email protected]>
> wrote:
>
>>
>> Hello,
>>
>> I'm trying to use a custom trigger for one of my use case. I have a basic
>> logic (as shown below) of using keyBy on the input stream and using a
>> window of 1 min.
>>
>> .keyBy(<key selector>)
>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> .trigger(new CustomTrigger())
>> .aggregate(Input.getAggregationFunction(), new
>> AggregationProcessingWindow());
>>
>>
>> My custom trigger is expected to fire the first event of the keyBy
>> instantly and any subsequent events should be aggregated in the window.
>>
>> .trigger(new Trigger<Record, TimeWindow>() {
>>> @Override
>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>> ValueState<Boolean> firstSeen =
>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>> if(firstSeen.value() == null) {
>>> firstSeen.update(true);
>>> // fire trigger to early evaluate window and purge that event.
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> // Continue. Do not evaluate window per element
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> // final evaluation and purge window state
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> @Override
>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>>> throws Exception {
>>>
>>> }
>>> })
>>
>>
>>
>>
>> Currently, I see (for each window and same key) the first event of the
>> window is always fired. But I want to see this happening for only the first
>> window and for the subsequent window it should aggregate all the events and
>> then fire.
>>
>> Example : all the records have the same key.
>> current output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3 : first event in the window-2 : fired record
>> 4, record 5 : - 2 events in the window-2 : fired.
>>
>> expected output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>> window-2 should not fire the first event of the same key.
>>
>> I'm reading it here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>> but not able to solve it. Any pointers would be helpful.
>>
>> Thanks.
>>
>

Reply via email to