Hi, I have been trying to write a temporal join in SQL done on a rolling aggregate view. However it does not work and throws :
org.apache.flink.table.api.ValidationException: Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found. It seems that after the aggregation, the table looses the watermark and it's not possible to add one with the SQL API as it's a view. CREATE TABLE orders ( order_id INT, price DECIMAL(6, 2), currency_id INT, order_time AS NOW(), WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_id.kind' = 'sequence', 'fields.order_id.start' = '1', 'fields.order_id.end' = '100000', 'fields.currency_id.min' = '1', 'fields.currency_id.max' = '20' ); CREATE TABLE currency_rates ( currency_id INT, conversion_rate DECIMAL(4, 3), PRIMARY KEY (currency_id) NOT ENFORCED ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.currency_id.min' = '1', 'fields.currency_id.max' = '20' ); CREATE TEMPORARY VIEW max_rates AS ( SELECT currency_id, MAX(conversion_rate) AS max_rate FROM currency_rates GROUP BY currency_id ); CREATE TEMPORARY VIEW temporal_join AS ( SELECT order_id, max_rates.max_rate FROM orders LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency_id = max_rates.currency_id ); SELECT * FROM temporal_join; Am I missing something? What would be a good starting point to address this? Thanks in advance, Sébastien Chevalley