I'm trying to understand how watermarks work in FlinkSQL. I’ve created the
following tables:

      CREATE TABLE currency_rates (
      currency STRING,
      conversion_rate STRING,
      update_time TIMESTAMP(3),
      WATERMARK FOR update_time AS update_time,
      PRIMARY KEY(currency) NOT ENFORCED
      ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'currency_rates',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'currency_rates1',
      'key.format' = 'raw',
      'value.format' = 'avro',
      'properties.auto.offset.reset' = 'earliest',
      'value.fields-include' = 'ALL',
      'scan.watermark.idle-timeout' = '1000'
      );


      CREATE TABLE orders (
      order_id    STRING,
      price       DECIMAL(32,2),
      currency    STRING,
      order_time  TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'orders',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'orders1',
      'value.format' = 'avro',
      'properties.auto.offset.reset' = 'latest',
      'value.fields-include' = 'ALL'
      );


To test this, I run the following query:

      SELECT
      orders.order_id,
      orders.price,
      orders.currency,
      currency_rates.conversion_rate,
      orders.order_time,
      currency_rates.update_time
      FROM orders
      LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
      ON orders.currency = currency_rates.currency;

These are the inserts I’m making:

    INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP);
    INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
    INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);

The following result is displayed:
order_idpricecurrencyconversion_rateorder_timeupdate_time
ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279
Then, I run:

    INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP);
    INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP
- interval '5' minutes);
    INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);

And:

order_idpricecurrencyconversion_rateorder_timeupdate_time
ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279
ORD002 100.00 USD value 1 2024-10-29 21:42:46.936 2024-10-29 21:41:59.279
ORD007 2000.00 USD <NULL> 2024-10-29 21:39:32.560 <NULL>. <<<<<-----
I don't understand why record ORD007 appears, as it was inserted with a
5-minute delay, so I thought it should not be included because it came late.

Reply via email to