Hey there Antonio!

I'm just bringing what we discussed over slack to here.

Got it. I think the missing bit may be this.
> I'll edit your trigger code and show what I think would work. Non tested
> code


@Override public TriggerResult onElement(BigquerySnowplow element, long
timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<Long> firstEventTimestamp = ctx.getPartitionedState(descriptor);
if (firstEventTimestamp.value() == null) {
firstEventTimestamp.update(timestamp); ctx.registerEventTimeTimer(timestamp
+ maxSessionLengthMillis); } /* * We do NOT fire the window right away here
because * we only want to fire after the maximum session length passes, *
or if the inactivity gap triggers a session window close * at the operator
level. */ return TriggerResult.CONTINUE; } /** * Called when an event-time
timer you've registered fires. */ @Override public TriggerResult
onEventTime(long time, TimeWindow window, TriggerContext ctx) throws
Exception { // If an event-time timer fires, it implies that the
(firstEventTs + maxSessionLength) has been reached. // We want to fire AND
purge the window immediately. return TriggerResult.FIRE_AND_PURGE; } /** *
If you don't need any special handling on processing-time timers, * you can
simply ignore or do nothing here. */ @Override public TriggerResult
onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return
TriggerResult.CONTINUE; }

You said this code indeed merged both windows but the last event still
arrived on window1. And then my next suggestion was

So what you actually want to do is a system timer? Because the timer will
> trigger based on your watermark. If you want it to be strict 10 seconds I'd
> suggest you do it with system time rtather than event time. But this may
> bring some other pains.


Bringing it here so other people may benefit from it :D

Thread
<https://apache-flink.slack.com/archives/C03G7LJTS2G/p1740498241124159>


Att,
Pedro Mázala
+31 (06) 3819 3814
Be awesome


On Wed, 26 Feb 2025 at 10:45, Antonio Davide Cali via user <
user@flink.apache.org> wrote:

> Hello team, I'm trying to figure out how to correctly move on this.
>
> Scenario, I am using a EventTimeSessionWindows with a fixed gap.
> I need, also, to trigger a *FIRE_AND_PURGE* for closing a window after a
> fixed amount of time since the first event received for the window, *no
> matter what*.
>
> So for doing so I'm implementing a Trigger
>
> Not full code provided, but what I expect is that if gap in between *the
> current event timestamp* and the *first event arrived for the window*
> saved via the *descriptor* is greater than my *gap*, It fires and purge
> the window.
>
>
>   @Override
>     public TriggerResult onElement(BigquerySnowplow element, long
> timestamp, W window, TriggerContext ctx) throws Exception {
>         // Get the timestamp of the first event in the window
>         ValueState<Long> firstEventTimestamp =
> ctx.getPartitionedState(descriptor);
>
>         if (firstEventTimestamp.value() == null) {
>             firstEventTimestamp.update(timestamp);
>             ctx.registerEventTimeTimer(timestamp + gapDuration);
>         } else if (timestamp - firstEventTimestamp.value() > gapDuration) {
>
>             return TriggerResult.FIRE_AND_PURGE;
>         }
>
>         // Continue adding elements to the window
>         return TriggerResult.CONTINUE;
>     }
>
>
> This *kind of* works.
> Sample of the result below
>
> Window1: {"startTime":"2025-02-20 15:12:06.065","endTime":"2025-02-20
> 15:12:25.605","firstEventTime":"2025-02-20
> 15:12:06.065","lastEventTime":"2025-02-20
> 15:12:18.605","sessionEventsCount":10}
>
> Window2: {"startTime":"2025-02-20 15:12:06.065","endTime":"2025-02-20
> 15:12:25.605","firstEventTime":"2025-02-20
> 15:12:17.099","lastEventTime":"2025-02-20
> 15:12:17.099","sessionEventsCount":1}
>
> Window3: {"startTime":"2025-02-20 15:12:25.992","endTime":"2025-02-20
> 15:12:34.346","firstEventTime":"2025-02-20
> 15:12:25.992","lastEventTime":"2025-02-20
> 15:12:27.346","sessionEventsCount":7}
>
> The second window is created but I would actually expected different
> behavior:
>
>    1. startTime and endTime, respectively ctx.window.getStart() and 
> ctx.window.getEnd(),
>    should be different for *Window1* and *Window2*
>    2. *Window2* and *Window3* should be "*merged*" since the second one
>    should have started a new window that events of the third one should have
>    fallen into
>
>
> I'm trying to figure out correctly how to properly understand what I'm
> doing wrong.
> Any help would be very great
>
> Thank you
> Antonio
>
>
> ------------------------------
> This email and any files transmitted with it contain confidential
> information and/or privileged or personal advice. This email is intended
> for the addressee(s) stated above only. If you are not the addressee of the
> email please do not copy or forward it or otherwise use it or any part of
> it in any form whatsoever. If you have received this email in error please
> notify the sender and remove the e-mail from your system. Thank you.
>
> This is an email from the company Just Eat Takeaway.com N.V., a public
> limited liability company with corporate seat in Amsterdam, the
> Netherlands, and address at Piet Heinkade 61, 1019GM Amsterdam, registered
> with the Dutch Chamber of Commerce with number 08142836 and where the
> context requires, includes its subsidiaries and associated undertakings.
>

Reply via email to