[ 
https://issues.apache.org/jira/browse/FLINK-23159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373458#comment-17373458
 ] 

Yun Gao commented on FLINK-23159:
---------------------------------

Thanks [~wenlong.lwl] for attending the issue~

> Correlated sql subquery on the source created via fromValues() failed to 
> compile
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-23159
>                 URL: https://issues.apache.org/jira/browse/FLINK-23159
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.0
>            Reporter: Yun Gao
>            Priority: Major
>
> Correlated subquery like 
> {code:java}
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.types.DataType;
> import org.apache.flink.types.Row;
> import java.util.ArrayList;
> import java.util.List;
> public class SQLQueryTest {
>   public static void main(String[] args) {
>     EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode()
>       .build();
>     TableEnvironment tableEnvironment = TableEnvironment.create(settings);
>     DataType row = DataTypes.ROW(
>       DataTypes.FIELD("flag", DataTypes.STRING()),
>       DataTypes.FIELD("id", DataTypes.INT()),
>       DataTypes.FIELD("name", DataTypes.STRING())
>     );
>     Table table = tableEnvironment.fromValues(row, new 
> MyListSource("table1").builder());
>     tableEnvironment.createTemporaryView("table1", table);
>     table = tableEnvironment.fromValues(row, new 
> MyListSource("table2").builder());
>     tableEnvironment.createTemporaryView("table2", table);
>     String sql = "select t1.flag from table1 t1 where t1.name in (select 
> t2.name from table2 t2 where t2.id = t1.id)";
>     tableEnvironment.explainSql(sql);
>   }
>   public static class MyListSource {
>     private String flag;
>     public MyListSource(String flag) {
>       this.flag = flag;
>     }
>     public List<Row> builder() {
>       List<Row> rows = new ArrayList<>();
>       for (int i = 2; i < 3; i++) {
>         Row row = new Row(3);
>         row.setField(0, flag);
>         row.setField(1, i);
>         row.setField(2, "me");
>         rows.add(row);
>       }
>       return rows;
>     }
>   }
> }
> {code}
> would throws
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> unexpected correlate variable $cor0 in the plan
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.immutable.Range.foreach(Range.scala:160)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:101)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:677)
>   at test.SQLQueryTeszt.main(SQLQueryTeszt.java:57)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to