[ https://issues.apache.org/jira/browse/FLINK-23159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373458#comment-17373458 ]
Yun Gao commented on FLINK-23159: --------------------------------- Thanks [~wenlong.lwl] for attending the issue~ > Correlated sql subquery on the source created via fromValues() failed to > compile > -------------------------------------------------------------------------------- > > Key: FLINK-23159 > URL: https://issues.apache.org/jira/browse/FLINK-23159 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.0 > Reporter: Yun Gao > Priority: Major > > Correlated subquery like > {code:java} > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableEnvironment; > import org.apache.flink.table.types.DataType; > import org.apache.flink.types.Row; > import java.util.ArrayList; > import java.util.List; > public class SQLQueryTest { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode() > .build(); > TableEnvironment tableEnvironment = TableEnvironment.create(settings); > DataType row = DataTypes.ROW( > DataTypes.FIELD("flag", DataTypes.STRING()), > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("name", DataTypes.STRING()) > ); > Table table = tableEnvironment.fromValues(row, new > MyListSource("table1").builder()); > tableEnvironment.createTemporaryView("table1", table); > table = tableEnvironment.fromValues(row, new > MyListSource("table2").builder()); > tableEnvironment.createTemporaryView("table2", table); > String sql = "select t1.flag from table1 t1 where t1.name in (select > t2.name from table2 t2 where t2.id = t1.id)"; > tableEnvironment.explainSql(sql); > } > public static class MyListSource { > private String flag; > public MyListSource(String flag) { > this.flag = flag; > } > public List<Row> builder() { > List<Row> rows = new ArrayList<>(); > for (int i = 2; i < 3; i++) { > Row row = new Row(3); > row.setField(0, flag); > row.setField(1, i); > row.setField(2, "me"); > rows.add(row); > } > return rows; > } > } > } > {code} > would throws > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: > unexpected correlate variable $cor0 in the plan > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:101) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:677) > at test.SQLQueryTeszt.main(SQLQueryTeszt.java:57) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)