[ 
https://issues.apache.org/jira/browse/FLINK-29676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luochg updated FLINK-29676:
---------------------------
    Description: 
create two tables

{code:java}

drop table if exists TABLE_1;
CREATE TABLE TABLE_1 (
    `indexcalno` STRING,
    `indextype` STRING,
    `agentcode` STRING,
    `managecom` STRING not null,
    `t1` DECIMAL(12,2),  
    PRIMARY KEY (indexcalno,indextype,agentcode) NOT ENFORCED
    ) 
WITH (  
'connector' = 'jdbc',  
'url' = 
'jdbc:mysql://IP:Port/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true',
  
'username'='username', 
'password'='password', 
'table-name' = 'TABLE_1'
);

drop table if exists TABLE_2;
CREATE TABLE TABLE_2 (
`id` BIGINT,
`agentcode` STRING,
`managecom` STRING,
`fyc` DECIMAL(12,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (  
'connector' = 'jdbc',  
'url' = 
'jdbc:mysql://IP:PORT/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true',
  
'username'='username',  
'password'='password',  
'table-name' = 'TABLE_2'
);
{code}

exe sql:

{code:java}
SELECT 
(
  select sum(a.fyc) 
  from TABLE_2  a
  where a.managecom = _t.managecom 
  and a.agentcode=_t.agentcode  
)
from TABLE_1  _t 
{code}

error info:

{code:java}
Fail to run sql command: SELECT 
(
  select sum(a.fyc) 
  from TABLE_2 a
  where a.managecom = _t.managecom 
  and a.agentcode=_t.agentcode  
)
from TABLE_1 _t
org.apache.flink.table.api.TableException: unexpected correlate variable 
$cor262 in the plan
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
        at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
        at 
org.apache.zeppelin.flink.Flink114Shims.collectToList(Flink114Shims.java:223)
        at 
org.apache.zeppelin.flink.FlinkZeppelinContext.showData(FlinkZeppelinContext.scala:110)
        at 
org.apache.zeppelin.interpreter.ZeppelinContext.showData(ZeppelinContext.java:67)
        at 
org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:60)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
{code}


  was:
create two tables

{code:java}

drop table if exists TABLE_1;
CREATE TABLE TABLE_1 (
    `indexcalno` STRING,
    `indextype` STRING,
    `agentcode` STRING,
    `managecom` STRING not null,
    `t1` DECIMAL(12,2),  
    PRIMARY KEY (indexcalno,indextype,agentcode) NOT ENFORCED
    ) 
WITH (  
'connector' = 'jdbc',  
'url' = 
'jdbc:mysql://IP:Port/gxdb?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true',
  
'username'='username', 
'password'='password', 
'table-name' = 'TABLE_1'
);

drop table if exists TABLE_2;
CREATE TABLE TABLE_2 (
`id` BIGINT,
`agentcode` STRING,
`managecom` STRING,
`fyc` DECIMAL(12,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (  
'connector' = 'jdbc',  
'url' = 
'jdbc:mysql://IP:PORT/gxdb?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true',
  
'username'='username',  
'password'='password',  
'table-name' = 'TABLE_2'
);
{code}

exe sql:

{code:java}
SELECT 
(
  select sum(a.fyc) 
  from TABLE_2  a
  where a.managecom = _t.managecom 
  and a.agentcode=_t.agentcode  
)
from TABLE_1  _t 
{code}

error info:

{code:java}
Fail to run sql command: SELECT 
(
  select sum(a.fyc) 
  from TABLE_2 a
  where a.managecom = _t.managecom 
  and a.agentcode=_t.agentcode  
)
from TABLE_1 _t
org.apache.flink.table.api.TableException: unexpected correlate variable 
$cor262 in the plan
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
        at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
        at 
org.apache.zeppelin.flink.Flink114Shims.collectToList(Flink114Shims.java:223)
        at 
org.apache.zeppelin.flink.FlinkZeppelinContext.showData(FlinkZeppelinContext.scala:110)
        at 
org.apache.zeppelin.interpreter.ZeppelinContext.showData(ZeppelinContext.java:67)
        at 
org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:60)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
        at 
org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
{code}



> unexpected correlate variable $cor262 in the plan
> -------------------------------------------------
>
>                 Key: FLINK-29676
>                 URL: https://issues.apache.org/jira/browse/FLINK-29676
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Luochg
>            Priority: Major
>
> create two tables
> {code:java}
> drop table if exists TABLE_1;
> CREATE TABLE TABLE_1 (
>     `indexcalno` STRING,
>     `indextype` STRING,
>     `agentcode` STRING,
>     `managecom` STRING not null,
>     `t1` DECIMAL(12,2),  
>     PRIMARY KEY (indexcalno,indextype,agentcode) NOT ENFORCED
>     ) 
> WITH (  
> 'connector' = 'jdbc',  
> 'url' = 
> 'jdbc:mysql://IP:Port/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true',
>   
> 'username'='username', 
> 'password'='password', 
> 'table-name' = 'TABLE_1'
> );
> drop table if exists TABLE_2;
> CREATE TABLE TABLE_2 (
> `id` BIGINT,
> `agentcode` STRING,
> `managecom` STRING,
> `fyc` DECIMAL(12,2),
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (  
> 'connector' = 'jdbc',  
> 'url' = 
> 'jdbc:mysql://IP:PORT/db?useUnicode=true&characterEncoding=utf8&useSSL=false&useLegacyDatetimeCode=false&serverTimezone=UTC&allowPublicKeyRetrieval=true',
>   
> 'username'='username',  
> 'password'='password',  
> 'table-name' = 'TABLE_2'
> );
> {code}
> exe sql:
> {code:java}
> SELECT 
> (
>   select sum(a.fyc) 
>   from TABLE_2  a
>   where a.managecom = _t.managecom 
>   and a.agentcode=_t.agentcode  
> )
> from TABLE_1  _t 
> {code}
> error info:
> {code:java}
> Fail to run sql command: SELECT 
> (
>   select sum(a.fyc) 
>   from TABLE_2 a
>   where a.managecom = _t.managecom 
>   and a.agentcode=_t.agentcode  
> )
> from TABLE_1 _t
> org.apache.flink.table.api.TableException: unexpected correlate variable 
> $cor262 in the plan
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:44)
>       at scala.collection.immutable.List.foreach(List.scala:392)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
>       at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
>       at 
> org.apache.zeppelin.flink.Flink114Shims.collectToList(Flink114Shims.java:223)
>       at 
> org.apache.zeppelin.flink.FlinkZeppelinContext.showData(FlinkZeppelinContext.scala:110)
>       at 
> org.apache.zeppelin.interpreter.ZeppelinContext.showData(ZeppelinContext.java:67)
>       at 
> org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:60)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
>       at 
> org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
>       at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>       at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
>       at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
>       at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>       at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
>       at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to