[ https://issues.apache.org/jira/browse/FLINK-36808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-36808: ----------------------------------- Labels: pull-request-available (was: ) > UNION ALL after lookup join produces unexpected results > ------------------------------------------------------- > > Key: FLINK-36808 > URL: https://issues.apache.org/jira/browse/FLINK-36808 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0, 1.19.1 > Reporter: Qingsheng Ren > Assignee: Muhammet Orazov > Priority: Major > Labels: pull-request-available > > Here is the SQL to reproduce the issue: > {code:java} > -- Data of table `stream`: > -- (1, Alice) > -- (2, Bob) > CREATE TEMPORARY TABLE `stream` ( > `id` BIGINT, > `name` STRING, > `txn_time` as proctime(), > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:postgresql://localhost:5432/postgres', > 'table-name' = 'stream', > 'username' = 'postgres', > 'password' = 'postgres' > ); > -- Data of table `dim`: > -- (1, OK) > -- (2, OK) > CREATE TEMPORARY TABLE `dim` ( > `id` BIGINT, > `status` STRING, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:postgresql://localhost:5432/postgres', > 'table-name' = 'dim', > 'username' = 'postgres', > 'password' = 'postgres' > ); > -- Lookup join two tables twice with different filter, and union them together > SELECT > s.id, > s.name, > s.txn_time, > d.status > FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS > `d` > ON > `s`.`id` = `d`.`id` > WHERE > `d`.`status` = 'OK' > UNION ALL > SELECT > s.id, > s.name, > s.txn_time, > d.status > FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS > `d` > ON > `s`.`id` = `d`.`id` > WHERE > `d`.`status` = 'NOT_EXISTS';{code} > The first lookup join should output: > {code:java} > (1, Alice 2024-11-27 11:52:19.332, OK) > (2, Bob 2024-11-27 11:52:19.332, OK) {code} > The second lookup join should output nothing, as there's not status > 'NOT_EXISTS'. > But the result after union is: > {code:java} > 1, Alice, 2024-11-27 11:52:19.332, OK > 2, Bob, 2024-11-27 11:52:19.332, OK > 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS > 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code} > There shouldn't be any 'NOT_EXISTS's. > The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are > appended directly by the calc after the lookup join operation, which is not > as expected. > {code:java} > | == Abstract Syntax Tree == > LogicalUnion(all=[true]) > :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) > : +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')]) > : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{0, 2}]) > : :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) > : : +- LogicalTableScan(table=[[default_catalog, default_database, > stream]]) > : +- LogicalFilter(condition=[=($cor0.id, $0)]) > : +- LogicalSnapshot(period=[$cor0.txn_time]) > : +- LogicalTableScan(table=[[default_catalog, default_database, > dim]]) > +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) > +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{0, 2}]) > :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) > : +- LogicalTableScan(table=[[default_catalog, default_database, > stream]]) > +- LogicalFilter(condition=[=($cor1.id, $0)]) > +- LogicalSnapshot(period=[$cor1.txn_time]) > +- LogicalTableScan(table=[[default_catalog, default_database, > dim]]) > == Optimized Physical Plan == > Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) > +- Union(all=[true], union=[id, name, txn_time, status]) > :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS > status]) > : +- LookupJoin(table=[default_catalog.default_database.dim], > joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) > : +- Calc(select=[id, name, PROCTIME() AS txn_time]) > : +- TableSourceScan(table=[[default_catalog, default_database, > stream]], fields=[id, name]) > +- Calc(select=[id, name, txn_time, > CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS > VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) > +- LookupJoin(table=[default_catalog.default_database.dim], > joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) > +- Calc(select=[id, name, PROCTIME() AS txn_time]) > +- TableSourceScan(table=[[default_catalog, default_database, > stream]], fields=[id, name]) > == Optimized Execution Plan == > Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) > +- Union(all=[true], union=[id, name, txn_time, status]) > :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS > status]) > : +- LookupJoin(table=[default_catalog.default_database.dim], > joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, > id])(reuse_id=[1]) > : +- Calc(select=[id, name, PROCTIME() AS txn_time]) > : +- TableSourceScan(table=[[default_catalog, default_database, > stream]], fields=[id, name]) > +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS > VARCHAR(2147483647)) AS status]) > +- Reused(reference_id=[1]) > | {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)