Hi, can you provide the version of the Flink with this exception? I test your SQL in Flink/master and it works. My test SQL is the following.
create table table_1 ( `f0` STRING NOT NULL, `f1` DOUBLE NOT NULL, `rowtime` TIMESTAMP(3), WATERMARK FOR `rowtime` AS rowtime - INTERVAL '10' SECOND ) with( ... ) create table table_2 ( `f0` STRING NOT NULL, `f1` DOUBLE NOT NULL, `rowtime` TIMESTAMP(3), WATERMARK FOR `rowtime` AS rowtime - INTERVAL '10' SECOND, CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED ) with( ... ) SELECT table_1.f0, table_1.f1 AS table_1_value, table_2.f1 AS table_2_value FROM table_1 JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime ON table_1.f0 = table_2.f0 At 2022-04-28 14:01:17, "Matthew Brown" <m...@matthewbrown.io> wrote: Hi all, I'm trying to join the following two tables using a temporal join: table_1 ( `f0` STRING NOT NULL, `f1` DOUBLE NOT NULL, `rowtime` TIMESTAMP(3) METADATA, WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND ) table_2: ( `f0` STRING NOT NULL, `f1` DOUBLE NOT NULL, `rowtime` TIMESTAMP(3) METADATA, WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND, CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED ) using the following query: -- SELECT table_1.f0, table_1.f1 AS table_1_value, table_2.f1 AS table_2_value, FROM table_1 JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime ON table_1.f0 = table_2.f0 -- and it's raising the following exception --- Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor0 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) --- Has anybody come across this before? Any debugging tips? Cheers, Matt. -- -- AU: +61 459 493 730 UK: +44 7927 618921 @mnbbrown