Hi,

I've noticed that you are using an event time window, but the trigger fires
based on processing time.
You should also register an event time timer (for the window end). So that
trigger.onEventTime() will be called.
And it's safer to check if the state (firstSeen) value is true, not just
exists.

Regards,
Roman


On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a basic
> logic (as shown below) of using keyBy on the input stream and using a
> window of 1 min.
>
> .keyBy(<key selector>)
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger<Record, TimeWindow>() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState<Boolean> firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>> throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the first
> window and for the subsequent window it should aggregate all the events and
> then fire.
>
> Example : all the records have the same key.
> current output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3 : first event in the window-2 : fired record
> 4, record 5 : - 2 events in the window-2 : fired.
>
> expected output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
> window-2 should not fire the first event of the same key.
>
> I'm reading it here
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
> but not able to solve it. Any pointers would be helpful.
>
> Thanks.
>

Reply via email to