P Rohan Kumar created FLINK-31164: ------------------------------------- Summary: Unexpected correlate variable $cor0 in the plan error in where clause Key: FLINK-31164 URL: https://issues.apache.org/jira/browse/FLINK-31164 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.16.0 Reporter: P Rohan Kumar
{code:java} val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val accountsTd = TableDescriptor.forConnector("datagen").option("rows-per-second", "10") .option("number-of-rows", "10") .schema(Schema .newBuilder() .column("account_num", DataTypes.VARCHAR(2147483647)) .column("acc_name", DataTypes.VARCHAR(2147483647)) .column("acc_phone_num", DataTypes.VARCHAR(2147483647)) .build()) .build() val accountsTable = tableEnv.from(accountsTd) tableEnv.createTemporaryView("accounts", accountsTable) val transactionsTd = TableDescriptor.forConnector("datagen").option("rows-per-second", "10") .option("number-of-rows", "10") .schema(Schema .newBuilder() .column("account_num", DataTypes.VARCHAR(2147483647)) .column("transaction_place", DataTypes.VARCHAR(2147483647)) .column("transaction_time", DataTypes.BIGINT()) .column("amount", DataTypes.INT()) .build()) .build() val transactionsTable = tableEnv.from(transactionsTd) tableEnv.createTemporaryView("transaction_data", transactionsTable) val newTable = tableEnv.sqlQuery("select acc.account_num, (select count(*) from transaction_data where transaction_place = trans.transaction_place and account_num = acc.account_num) from accounts acc,transaction_data trans") tableEnv.toChangelogStream(newTable).print() env.execute() {code} I get the following error if I run the above code. {code:java} Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor0 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:59) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.immutable.Range.foreach(Range.scala:158) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160) at org.example.WhereClauseBug$.main(WhereClauseBug.scala:50) at org.example.WhereClauseBug.main(WhereClauseBug.scala) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)