[ https://issues.apache.org/jira/browse/FLINK-29676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luochg updated FLINK-29676: --------------------------- Description: create two tables {code:java} drop table if exists TABLE_1; CREATE TABLE TABLE_1 ( `indexcalno` STRING, `indextype` STRING, `agentcode` STRING, `managecom` STRING not null, `t1` DECIMAL(12,2), PRIMARY KEY (indexcalno,indextype,agentcode) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP:Port/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true', 'username'='username', 'password'='password', 'table-name' = 'TABLE_1' ); drop table if exists TABLE_2; CREATE TABLE TABLE_2 ( `id` BIGINT, `agentcode` STRING, `managecom` STRING, `fyc` DECIMAL(12,2), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP:PORT/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true', 'username'='username', 'password'='password', 'table-name' = 'TABLE_2' ); {code} exe sql: {code:java} SELECT ( select sum(a.fyc) from TABLE_2 a where a.managecom = _t.managecom and a.agentcode=_t.agentcode ) from TABLE_1 _t {code} error info: {code:java} Fail to run sql command: SELECT ( select sum(a.fyc) from TABLE_2 a where a.managecom = _t.managecom and a.agentcode=_t.agentcode ) from TABLE_1 _t org.apache.flink.table.api.TableException: unexpected correlate variable $cor262 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601) at org.apache.zeppelin.flink.Flink114Shims.collectToList(Flink114Shims.java:223) at org.apache.zeppelin.flink.FlinkZeppelinContext.showData(FlinkZeppelinContext.scala:110) at org.apache.zeppelin.interpreter.ZeppelinContext.showData(ZeppelinContext.java:67) at org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:60) at org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494) at org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257) at org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151) at org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} was: create two tables {code:java} drop table if exists TABLE_1; CREATE TABLE TABLE_1 ( `indexcalno` STRING, `indextype` STRING, `agentcode` STRING, `managecom` STRING not null, `t1` DECIMAL(12,2), PRIMARY KEY (indexcalno,indextype,agentcode) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP:Port/gxdb?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true', 'username'='username', 'password'='password', 'table-name' = 'TABLE_1' ); drop table if exists TABLE_2; CREATE TABLE TABLE_2 ( `id` BIGINT, `agentcode` STRING, `managecom` STRING, `fyc` DECIMAL(12,2), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP:PORT/gxdb?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true', 'username'='username', 'password'='password', 'table-name' = 'TABLE_2' ); {code} exe sql: {code:java} SELECT ( select sum(a.fyc) from TABLE_2 a where a.managecom = _t.managecom and a.agentcode=_t.agentcode ) from TABLE_1 _t {code} error info: {code:java} Fail to run sql command: SELECT ( select sum(a.fyc) from TABLE_2 a where a.managecom = _t.managecom and a.agentcode=_t.agentcode ) from TABLE_1 _t org.apache.flink.table.api.TableException: unexpected correlate variable $cor262 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601) at org.apache.zeppelin.flink.Flink114Shims.collectToList(Flink114Shims.java:223) at org.apache.zeppelin.flink.FlinkZeppelinContext.showData(FlinkZeppelinContext.scala:110) at org.apache.zeppelin.interpreter.ZeppelinContext.showData(ZeppelinContext.java:67) at org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:60) at org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494) at org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257) at org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151) at org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} > unexpected correlate variable $cor262 in the plan > ------------------------------------------------- > > Key: FLINK-29676 > URL: https://issues.apache.org/jira/browse/FLINK-29676 > Project: Flink > Issue Type: Bug > Reporter: Luochg > Priority: Major > > create two tables > {code:java} > drop table if exists TABLE_1; > CREATE TABLE TABLE_1 ( > `indexcalno` STRING, > `indextype` STRING, > `agentcode` STRING, > `managecom` STRING not null, > `t1` DECIMAL(12,2), > PRIMARY KEY (indexcalno,indextype,agentcode) NOT ENFORCED > ) > WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://IP:Port/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true', > > 'username'='username', > 'password'='password', > 'table-name' = 'TABLE_1' > ); > drop table if exists TABLE_2; > CREATE TABLE TABLE_2 ( > `id` BIGINT, > `agentcode` STRING, > `managecom` STRING, > `fyc` DECIMAL(12,2), > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://IP:PORT/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true', > > 'username'='username', > 'password'='password', > 'table-name' = 'TABLE_2' > ); > {code} > exe sql: > {code:java} > SELECT > ( > select sum(a.fyc) > from TABLE_2 a > where a.managecom = _t.managecom > and a.agentcode=_t.agentcode > ) > from TABLE_1 _t > {code} > error info: > {code:java} > Fail to run sql command: SELECT > ( > select sum(a.fyc) > from TABLE_2 a > where a.managecom = _t.managecom > and a.agentcode=_t.agentcode > ) > from TABLE_1 _t > org.apache.flink.table.api.TableException: unexpected correlate variable > $cor262 in the plan > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) > at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601) > at > org.apache.zeppelin.flink.Flink114Shims.collectToList(Flink114Shims.java:223) > at > org.apache.zeppelin.flink.FlinkZeppelinContext.showData(FlinkZeppelinContext.scala:110) > at > org.apache.zeppelin.interpreter.ZeppelinContext.showData(ZeppelinContext.java:67) > at > org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:60) > at > org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494) > at > org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257) > at > org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151) > at > org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109) > at > org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) > at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)