I'm trying to understand how watermarks work in FlinkSQL. I’ve created the following tables:
CREATE TABLE currency_rates ( currency STRING, conversion_rate STRING, update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'currency_rates', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'currency_rates1', 'key.format' = 'raw', 'value.format' = 'avro', 'properties.auto.offset.reset' = 'earliest', 'value.fields-include' = 'ALL', 'scan.watermark.idle-timeout' = '1000' ); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'orders1', 'value.format' = 'avro', 'properties.auto.offset.reset' = 'latest', 'value.fields-include' = 'ALL' ); To test this, I run the following query: SELECT orders.order_id, orders.price, orders.currency, currency_rates.conversion_rate, orders.order_time, currency_rates.update_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency; These are the inserts I’m making: INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP); INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP); INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP); The following result is displayed: order_idpricecurrencyconversion_rateorder_timeupdate_time ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279 Then, I run: INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP); INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP - interval '5' minutes); INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP); And: order_idpricecurrencyconversion_rateorder_timeupdate_time ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279 ORD002 100.00 USD value 1 2024-10-29 21:42:46.936 2024-10-29 21:41:59.279 ORD007 2000.00 USD <NULL> 2024-10-29 21:39:32.560 <NULL>. <<<<<----- I don't understand why record ORD007 appears, as it was inserted with a 5-minute delay, so I thought it should not be included because it came late.