Hey there Antonio! I'm just bringing what we discussed over slack to here.
Got it. I think the missing bit may be this. > I'll edit your trigger code and show what I think would work. Non tested > code @Override public TriggerResult onElement(BigquerySnowplow element, long timestamp, W window, TriggerContext ctx) throws Exception { ValueState<Long> firstEventTimestamp = ctx.getPartitionedState(descriptor); if (firstEventTimestamp.value() == null) { firstEventTimestamp.update(timestamp); ctx.registerEventTimeTimer(timestamp + maxSessionLengthMillis); } /* * We do NOT fire the window right away here because * we only want to fire after the maximum session length passes, * or if the inactivity gap triggers a session window close * at the operator level. */ return TriggerResult.CONTINUE; } /** * Called when an event-time timer you've registered fires. */ @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { // If an event-time timer fires, it implies that the (firstEventTs + maxSessionLength) has been reached. // We want to fire AND purge the window immediately. return TriggerResult.FIRE_AND_PURGE; } /** * If you don't need any special handling on processing-time timers, * you can simply ignore or do nothing here. */ @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } You said this code indeed merged both windows but the last event still arrived on window1. And then my next suggestion was So what you actually want to do is a system timer? Because the timer will > trigger based on your watermark. If you want it to be strict 10 seconds I'd > suggest you do it with system time rtather than event time. But this may > bring some other pains. Bringing it here so other people may benefit from it :D Thread <https://apache-flink.slack.com/archives/C03G7LJTS2G/p1740498241124159> Att, Pedro Mázala +31 (06) 3819 3814 Be awesome On Wed, 26 Feb 2025 at 10:45, Antonio Davide Cali via user < user@flink.apache.org> wrote: > Hello team, I'm trying to figure out how to correctly move on this. > > Scenario, I am using a EventTimeSessionWindows with a fixed gap. > I need, also, to trigger a *FIRE_AND_PURGE* for closing a window after a > fixed amount of time since the first event received for the window, *no > matter what*. > > So for doing so I'm implementing a Trigger > > Not full code provided, but what I expect is that if gap in between *the > current event timestamp* and the *first event arrived for the window* > saved via the *descriptor* is greater than my *gap*, It fires and purge > the window. > > > @Override > public TriggerResult onElement(BigquerySnowplow element, long > timestamp, W window, TriggerContext ctx) throws Exception { > // Get the timestamp of the first event in the window > ValueState<Long> firstEventTimestamp = > ctx.getPartitionedState(descriptor); > > if (firstEventTimestamp.value() == null) { > firstEventTimestamp.update(timestamp); > ctx.registerEventTimeTimer(timestamp + gapDuration); > } else if (timestamp - firstEventTimestamp.value() > gapDuration) { > > return TriggerResult.FIRE_AND_PURGE; > } > > // Continue adding elements to the window > return TriggerResult.CONTINUE; > } > > > This *kind of* works. > Sample of the result below > > Window1: {"startTime":"2025-02-20 15:12:06.065","endTime":"2025-02-20 > 15:12:25.605","firstEventTime":"2025-02-20 > 15:12:06.065","lastEventTime":"2025-02-20 > 15:12:18.605","sessionEventsCount":10} > > Window2: {"startTime":"2025-02-20 15:12:06.065","endTime":"2025-02-20 > 15:12:25.605","firstEventTime":"2025-02-20 > 15:12:17.099","lastEventTime":"2025-02-20 > 15:12:17.099","sessionEventsCount":1} > > Window3: {"startTime":"2025-02-20 15:12:25.992","endTime":"2025-02-20 > 15:12:34.346","firstEventTime":"2025-02-20 > 15:12:25.992","lastEventTime":"2025-02-20 > 15:12:27.346","sessionEventsCount":7} > > The second window is created but I would actually expected different > behavior: > > 1. startTime and endTime, respectively ctx.window.getStart() and > ctx.window.getEnd(), > should be different for *Window1* and *Window2* > 2. *Window2* and *Window3* should be "*merged*" since the second one > should have started a new window that events of the third one should have > fallen into > > > I'm trying to figure out correctly how to properly understand what I'm > doing wrong. > Any help would be very great > > Thank you > Antonio > > > ------------------------------ > This email and any files transmitted with it contain confidential > information and/or privileged or personal advice. This email is intended > for the addressee(s) stated above only. If you are not the addressee of the > email please do not copy or forward it or otherwise use it or any part of > it in any form whatsoever. If you have received this email in error please > notify the sender and remove the e-mail from your system. Thank you. > > This is an email from the company Just Eat Takeaway.com N.V., a public > limited liability company with corporate seat in Amsterdam, the > Netherlands, and address at Piet Heinkade 61, 1019GM Amsterdam, registered > with the Dutch Chamber of Commerce with number 08142836 and where the > context requires, includes its subsidiaries and associated undertakings. >