Godfrey, I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.
Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows: SELECT T.id, attr2, attr3, attr4 FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id) Where SplitStringToRows is defined as: @FunctionHint(output = new DataTypeHint("ROW<val STRING>")) class SplitStringToRows extends TableFunction[Row] { def eval(str: String, separator: String = ";"): Unit = { if (str != null) { str.split(separator).foreach(s => collect(Row.of(s.trim()))) } } } Removing the lateral table bit in that first table made the original query plan work correctly. I greatly appreciate your assistance! Regards, Dylan Forciea From: godfrey he <godfre...@gmail.com> Date: Wednesday, November 18, 2020 at 7:33 AM To: Dylan Forciea <dy...@oseberg.io> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Lateral join not finding correlate variable Hi Dylan, Could you provide which Flink version you find out the problem with? I test the above query on master, and I get the plan, no errors occur. Here is my test case: @Test def testLateralJoin(): Unit = { util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4) util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4) val query = """ |SELECT | t1.id<http://t1.id>, | t1.attr1, | t2.attr2 |FROM table1 t1 |LEFT JOIN LATERAL ( | SELECT | id, | attr2 | FROM ( | SELECT | id, | attr2, | ROW_NUMBER() OVER ( | PARTITION BY id | ORDER BY | attr3 DESC, | t1.attr4 = attr4 DESC | ) AS row_num | FROM table2) | WHERE row_num = 1) t2 |ON t1.id<http://t1.id> = t2.id<http://t2.id> |""".stripMargin util.verifyPlan(query) } Best, Godfrey Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> 于2020年11月18日周三 上午7:44写道: This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive. I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of: SELECT t1.id<http://t1.id>, t1.attr1, t2.attr2 FROM table1 t1 LEFT JOIN LATERAL ( SELECT id, attr2 FROM ( SELECT id, attr2, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY attr3 DESC, t1.attr4 = attr4 DESC ) AS row_num FROM table2 WHERE row_num = 1) t2 ON (t1.id<http://t1.id> = t2.id<http://t2.id>) I am getting an error that looks like: Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97) at io.oseberg.flink.well.ok.Job$.main(Job.scala:57) at io.oseberg.flink.well.ok.Job.main(Job.scala) The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing. Regards, Dylan Forciea