Hi Dev Team,

I’m Jeremy, a Solutions Architect supporting the Kinesis Data Analytics for 
Apache Flink service at AWS and wanted to open a discussion about a particular 
problem our customers have faced, and how new functionality within the Table 
and SQL API’s within Apache Flink could help solve it.

The Problem: suppose a user defines a table that streams data from Kafka / 
kinesis / etc and applies a watermark to the timestamp field in their incoming 
data. Today, elements that stream into this table that would be considered late 
by a passing watermark will be dropped.

Some workarounds: I’ve proposed to customers that they either use the 
DataStream API for handling late data via a Side 
Output<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>,
 or simply do not assign a watermark during table definition, and implement a 
UDF or similar to identify “late” events, then filter based on an added flag, 
for example.

Discussion: What would side outputs look like in the Table / SQL API’s?

An idea I had would be to allow for the definition of a Side Output in the 
table definition:

CREATE TABLE LateOrders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH (
   …
);

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
) WITH (
  ‘late.data.side-output-table’ = ‘LateOrders’
   …
);

Alternatives and considerations are highly encouraged, as I would like to 
understand what adding this functionality would look like and if there are any 
challenges here.

Thanks,
Jeremy


Reply via email to