[ https://issues.apache.org/jira/browse/FLINK-31164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser closed FLINK-31164. ---------------------------------- Resolution: Duplicate > Unexpected correlate variable $cor0 in the plan error in where clause > --------------------------------------------------------------------- > > Key: FLINK-31164 > URL: https://issues.apache.org/jira/browse/FLINK-31164 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.16.0 > Reporter: P Rohan Kumar > Priority: Major > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val accountsTd = > TableDescriptor.forConnector("datagen").option("rows-per-second", "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("account_num", DataTypes.VARCHAR(2147483647)) > .column("acc_name", DataTypes.VARCHAR(2147483647)) > .column("acc_phone_num", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val accountsTable = tableEnv.from(accountsTd) > tableEnv.createTemporaryView("accounts", accountsTable) > val transactionsTd = > TableDescriptor.forConnector("datagen").option("rows-per-second", "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("account_num", DataTypes.VARCHAR(2147483647)) > .column("transaction_place", DataTypes.VARCHAR(2147483647)) > .column("transaction_time", DataTypes.BIGINT()) > .column("amount", DataTypes.INT()) > .build()) > .build() > val transactionsTable = tableEnv.from(transactionsTd) > tableEnv.createTemporaryView("transaction_data", transactionsTable) > val newTable = tableEnv.sqlQuery("select acc.account_num, (select count(*) > from transaction_data where transaction_place = trans.transaction_place and > account_num = acc.account_num) from accounts acc,transaction_data trans") > tableEnv.toChangelogStream(newTable).print() > env.execute() {code} > I get the following error if I run the above code. > > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: > unexpected correlate variable $cor0 in the plan > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:59) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.immutable.Range.foreach(Range.scala:158) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160) > at org.example.WhereClauseBug$.main(WhereClauseBug.scala:50) > at org.example.WhereClauseBug.main(WhereClauseBug.scala) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)