Qingsheng Ren created FLINK-36808:
-------------------------------------

             Summary: UNION ALL after lookup join produces unexpected results
                 Key: FLINK-36808
                 URL: https://issues.apache.org/jira/browse/FLINK-36808
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.19.1, 1.20.0
            Reporter: Qingsheng Ren


Here is the SQL to reproduce the issue:
{code:java}
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
    `id` BIGINT,
    `name` STRING,
    `txn_time` as proctime(),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'stream',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
    `id` BIGINT,
    `status` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'dim',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Lookup join two tables twice with different filter, and union them together
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'OK' 
UNION ALL
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'NOT_EXISTS';{code}
The first lookup join should output:
{code:java}
(1, Alice 2024-11-27 11:52:19.332, OK)
(2, Bob 2024-11-27 11:52:19.332, OK) {code}
The second lookup join should output nothing, as there's not status 
'NOT_EXISTS'.

But the result after union is:
{code:java}
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
There shouldn't be any 'NOT_EXISTS's. 

The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
appended directly by the calc after the lookup join operation, which is not as 
expected. 
{code:java}
| == Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
:  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
:     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
:        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
:        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
:        +- LogicalFilter(condition=[=($cor0.id, $0)])
:           +- LogicalSnapshot(period=[$cor0.txn_time])
:              +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
   +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
      +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 2}])
         :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
         +- LogicalFilter(condition=[=($cor1.id, $0)])
            +- LogicalSnapshot(period=[$cor1.txn_time])
               +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])== Optimized Physical Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, 
CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
      +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
         +- Calc(select=[id, name, PROCTIME() AS txn_time])
            +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])== Optimized Execution Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, 
id])(reuse_id=[1])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS 
VARCHAR(2147483647)) AS status])
      +- Reused(reference_id=[1])
 | {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to