[ https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524261#comment-17524261 ]
Nico Kruber commented on FLINK-20255: ------------------------------------- The following example (which was simplified from a more complex join that makes more sense than this version) also seems to be an incarnation of the described problem (tested in Flink 1.14.3): {code} CREATE TEMPORARY TABLE Messages ( `id` CHAR(1), `userId` TINYINT, `relatedUserIds` ARRAY<TINYINT> ) WITH ( 'connector' = 'datagen', 'fields.id.length' = '10', 'fields.userId.kind' = 'random', 'fields.userId.min' = '1', 'fields.userId.max' = '10', 'fields.relatedUserIds.kind' = 'random', 'fields.relatedUserIds.element.min' = '1', 'fields.relatedUserIds.element.max' = '10', 'rows-per-second' = '1000' ); -- the non-working version: SELECT * FROM Messages outer_message WHERE outer_message.userId IN ( SELECT relatedUserId FROM Messages inner_message CROSS JOIN UNNEST(inner_message.relatedUserIds) AS t (relatedUserId) WHERE inner_message.id = outer_message.id ) -- this one is working: /* SELECT * FROM Messages CROSS JOIN UNNEST(relatedUserIds) AS t (relatedUserId) WHERE userId = t.relatedUserId */ {code} It produces the following exception: {code} org.apache.flink.table.api.TableException: unexpected correlate variable $cor1 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) ~[flink-table_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) ~[flink-table_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.immutable.Range.foreach(Range.scala:160) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) ~[flink-table_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:161) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:75) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) ~[flink-sql-client_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:209) ~[flink-sql-client_2.11-1.14.3.jar:1.14.3] ... 12 more {code} > Nested decorrelate failed > ------------------------- > > Key: FLINK-20255 > URL: https://issues.apache.org/jira/browse/FLINK-20255 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.11.0, 1.12.0 > Reporter: godfrey he > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > This issue is from ML > https://www.mail-archive.com/user@flink.apache.org/msg37746.html > We can reproduce the issue through the following code > {code:java} > @FunctionHint(output = new DataTypeHint("ROW<val STRING>")) > class SplitStringToRows extends TableFunction[Row] { > def eval(str: String, separator: String = ";"): Unit = { > if (str != null) { > str.split(separator).foreach(s => collect(Row.of(s.trim()))) > } > } > } > object Job { > def main(args: Array[String]): Unit = { > val settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) > streamTableEnv.createTemporarySystemFunction( > "SplitStringToRows", > classOf[SplitStringToRows] > ) // Class defined in previous email > streamTableEnv.executeSql( > """ > CREATE TABLE table2 ( > attr1 STRING, > attr2 STRING, > attr3 DECIMAL, > attr4 DATE > ) WITH ( > 'connector' = 'datagen' > )""") > val q2 = streamTableEnv.sqlQuery( > """ > SELECT > a.attr1 AS attr1, > attr2, > attr3, > attr4 > FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS > a(attr1) > """) > streamTableEnv.createTemporaryView("view2", q2) > val q3 = > """ > SELECT > w.attr1, > p.attr3 > FROM table2 w > LEFT JOIN LATERAL ( > SELECT > attr1, > attr3 > FROM ( > SELECT > attr1, > attr3, > ROW_NUMBER() OVER ( > PARTITION BY attr1 > ORDER BY > attr4 DESC NULLS LAST, > w.attr2 = attr2 DESC NULLS LAST > ) AS row_num > FROM view2) > WHERE row_num = 1) p > ON (w.attr1 = p.attr1) > """ > println(streamTableEnv.explainSql(q3)) > } > } > {code} > The reason is {{RelDecorrelator}} in Calcite can't handle such nested > decorrelate pattern now -- This message was sent by Atlassian Jira (v8.20.1#820001)