Thanks for sharing this, Martijn--I will test it out! - Jeremy
On 1/19/23, 12:59 PM, "Martijn Visser" <martijnvis...@apache.org <mailto:martijnvis...@apache.org>> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Jeremy, Have you looked into the CURRENT_WATERMARK function [1] which will allow one to operate on late events without resorting to using the DataStream API? Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/functions/systemfunctions/#temporal-functions <https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/functions/systemfunctions/#temporal-functions> Op do 19 jan. 2023 om 19:26 schreef Ber, Jeremy <jd...@amazon.com.inva <mailto:jd...@amazon.com.inva>lid>: > Hi Dev Team, > > I’m Jeremy, a Solutions Architect supporting the Kinesis Data Analytics > for Apache Flink service at AWS and wanted to open a discussion about a > particular problem our customers have faced, and how new functionality > within the Table and SQL API’s within Apache Flink could help solve it. > > The Problem: suppose a user defines a table that streams data from Kafka / > kinesis / etc and applies a watermark to the timestamp field in their > incoming data. Today, elements that stream into this table that would be > considered late by a passing watermark will be dropped. > > Some workarounds: I’ve proposed to customers that they either use the > DataStream API for handling late data via a Side Output< > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>>, > or simply do not assign a watermark during table definition, and implement > a UDF or similar to identify “late” events, then filter based on an added > flag, for example. > > Discussion: What would side outputs look like in the Table / SQL API’s? > > An idea I had would be to allow for the definition of a Side Output in the > table definition: > > CREATE TABLE LateOrders ( > `user` BIGINT, > product STRING, > order_time TIMESTAMP(3) > ) WITH ( > … > ); > > CREATE TABLE Orders ( > `user` BIGINT, > product STRING, > order_time TIMESTAMP(3), > WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND, > ) WITH ( > ‘late.data.side-output-table’ = ‘LateOrders’ > … > ); > > Alternatives and considerations are highly encouraged, as I would like to > understand what adding this functionality would look like and if there are > any challenges here. > > Thanks, > Jeremy > > >