Thanks for the response! Sad that that side output for late data is not supported in Table API and SQL. I will start the discussions regarding this.
In the meanwhile, I am trying to use the built-in function CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have is : I am creating a table with Kafka connector and defining the watermark in that table. Reference to this table definition can be found in the mail above. Next, I apply a tumbling window SQL query on this table. I want to collect the late data for this window operation. I am not clear how would CURRENT_WATERMARK function help me in getting the late data for the window operator. Also, I am a bit confused regarding the way we determine if an event is late for a window operator. From the WindowOperator code : protected boolean isElementLate(StreamRecord<IN> element) { return (windowAssigner.isEventTime()) && (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark()); } it seems the operator maintains a currentWatermark. I am trying to understand how does this currentWatermark change during the course of the operator receiving the first event that belongs to this window until the time this window fires. Please help understanding these. Thanks ________________________________ From: Feng Jin <jinfeng1...@gmail.com> Sent: 06 March 2024 07:08 To: Sunny S <sunny8...@outlook.in> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Handling late events with Table API / SQL You can use the CURRENT_WATERMARK(rowtime) function for some filtering, please refer to [1] for details. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/ Best, Feng On Wed, Mar 6, 2024 at 1:56 AM Sunny S <sunny8...@outlook.in<mailto:sunny8...@outlook.in>> wrote: Hi, I am using Flink SQL to create a table something like this : CREATE TABLE some-table ( ..., ..., ..., ..., event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3), WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'some-topic', + 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id<http://properties.group.id>' = 'testGroup', 'value.format' = 'csv' ) I want to understand how can I deal with late events / out of order events when using Flink SQL / Table API? How can I collect the late / out of order events to a side output with Table API / SQL? Thanks