Hi Guillermo. ORD007 is included due to "LEFT Join" logic. The LEFT JOIN keyword returns all records from the left table, and the matching records from the right table.
Watermark table configuration and "FOR SYSTEM_TIME AS OF" do not discard normal LEFT join behavior here. Best regards, Alexey On Wed, Oct 30, 2024 at 8:18 AM Guillermo <konstt2...@gmail.com> wrote: > I'm trying to understand how watermarks work in FlinkSQL. I’ve created the > following tables: > > CREATE TABLE currency_rates ( > currency STRING, > conversion_rate STRING, > update_time TIMESTAMP(3), > WATERMARK FOR update_time AS update_time, > PRIMARY KEY(currency) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'currency_rates', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'currency_rates1', > 'key.format' = 'raw', > 'value.format' = 'avro', > 'properties.auto.offset.reset' = 'earliest', > 'value.fields-include' = 'ALL', > 'scan.watermark.idle-timeout' = '1000' > ); > > > CREATE TABLE orders ( > order_id STRING, > price DECIMAL(32,2), > currency STRING, > order_time TIMESTAMP(3), > WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'orders', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'orders1', > 'value.format' = 'avro', > 'properties.auto.offset.reset' = 'latest', > 'value.fields-include' = 'ALL' > ); > > > To test this, I run the following query: > > SELECT > orders.order_id, > orders.price, > orders.currency, > currency_rates.conversion_rate, > orders.order_time, > currency_rates.update_time > FROM orders > LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time > ON orders.currency = currency_rates.currency; > > These are the inserts I’m making: > > INSERT INTO currency_rates VALUES ('USD', 'value 1', > CURRENT_TIMESTAMP); > INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP); > INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP); > > The following result is displayed: > order_idpricecurrencyconversion_rateorder_timeupdate_time > ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279 > Then, I run: > > INSERT INTO currency_rates VALUES ('USD', 'value 2', > CURRENT_TIMESTAMP); > INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP > - interval '5' minutes); > INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP); > > And: > > order_idpricecurrencyconversion_rateorder_timeupdate_time > ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279 > ORD002 100.00 USD value 1 2024-10-29 21:42:46.936 2024-10-29 21:41:59.279 > ORD007 2000.00 USD <NULL> 2024-10-29 21:39:32.560 <NULL>. <<<<<----- > I don't understand why record ORD007 appears, as it was inserted with a > 5-minute delay, so I thought it should not be included because it came late. > > >