Qingsheng Ren created FLINK-36808: ------------------------------------- Summary: UNION ALL after lookup join produces unexpected results Key: FLINK-36808 URL: https://issues.apache.org/jira/browse/FLINK-36808 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.1, 1.20.0 Reporter: Qingsheng Ren
Here is the SQL to reproduce the issue: {code:java} -- Data of table `stream`: -- (1, Alice) -- (2, Bob) CREATE TEMPORARY TABLE `stream` ( `id` BIGINT, `name` STRING, `txn_time` as proctime(), PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/postgres', 'table-name' = 'stream', 'username' = 'postgres', 'password' = 'postgres' ); -- Data of table `dim`: -- (1, OK) -- (2, OK) CREATE TEMPORARY TABLE `dim` ( `id` BIGINT, `status` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/postgres', 'table-name' = 'dim', 'username' = 'postgres', 'password' = 'postgres' ); -- Lookup join two tables twice with different filter, and union them together SELECT s.id, s.name, s.txn_time, d.status FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d` ON `s`.`id` = `d`.`id` WHERE `d`.`status` = 'OK' UNION ALL SELECT s.id, s.name, s.txn_time, d.status FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d` ON `s`.`id` = `d`.`id` WHERE `d`.`status` = 'NOT_EXISTS';{code} The first lookup join should output: {code:java} (1, Alice 2024-11-27 11:52:19.332, OK) (2, Bob 2024-11-27 11:52:19.332, OK) {code} The second lookup join should output nothing, as there's not status 'NOT_EXISTS'. But the result after union is: {code:java} 1, Alice, 2024-11-27 11:52:19.332, OK 2, Bob, 2024-11-27 11:52:19.332, OK 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code} There shouldn't be any 'NOT_EXISTS's. The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are appended directly by the calc after the lookup join operation, which is not as expected. {code:java} | == Abstract Syntax Tree == LogicalUnion(all=[true]) :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) : +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')]) : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}]) : :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) : : +- LogicalTableScan(table=[[default_catalog, default_database, stream]]) : +- LogicalFilter(condition=[=($cor0.id, $0)]) : +- LogicalSnapshot(period=[$cor0.txn_time]) : +- LogicalTableScan(table=[[default_catalog, default_database, dim]]) +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 2}]) :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) : +- LogicalTableScan(table=[[default_catalog, default_database, stream]]) +- LogicalFilter(condition=[=($cor1.id, $0)]) +- LogicalSnapshot(period=[$cor1.txn_time]) +- LogicalTableScan(table=[[default_catalog, default_database, dim]])== Optimized Physical Plan == Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) +- Calc(select=[id, name, PROCTIME() AS txn_time]) +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])== Optimized Execution Plan == Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])(reuse_id=[1]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS VARCHAR(2147483647)) AS status]) +- Reused(reference_id=[1]) | {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)