[ 
https://issues.apache.org/jira/browse/FLINK-36808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36808:
-----------------------------------
    Labels: pull-request-available  (was: )

> UNION ALL after lookup join produces unexpected results
> -------------------------------------------------------
>
>                 Key: FLINK-36808
>                 URL: https://issues.apache.org/jira/browse/FLINK-36808
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 1.19.1
>            Reporter: Qingsheng Ren
>            Assignee: Muhammet Orazov
>            Priority: Major
>              Labels: pull-request-available
>
> Here is the SQL to reproduce the issue:
> {code:java}
> -- Data of table `stream`:
> -- (1, Alice)
> -- (2, Bob)
> CREATE TEMPORARY TABLE `stream` (
>     `id` BIGINT,
>     `name` STRING,
>     `txn_time` as proctime(),
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'stream',
>   'username' = 'postgres',
>   'password' = 'postgres'
> );
> -- Data of table `dim`:
> -- (1, OK)
> -- (2, OK)
> CREATE TEMPORARY TABLE `dim` (
>     `id` BIGINT,
>     `status` STRING,
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'dim',
>   'username' = 'postgres',
>   'password' = 'postgres'
> );
> -- Lookup join two tables twice with different filter, and union them together
> SELECT
>      s.id,
>      s.name,
>      s.txn_time,
>      d.status
> FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
> `d`
> ON
>      `s`.`id` = `d`.`id`
> WHERE
>      `d`.`status` = 'OK' 
> UNION ALL
> SELECT
>      s.id,
>      s.name,
>      s.txn_time,
>      d.status
> FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
> `d`
> ON
>      `s`.`id` = `d`.`id`
> WHERE
>      `d`.`status` = 'NOT_EXISTS';{code}
> The first lookup join should output:
> {code:java}
> (1, Alice 2024-11-27 11:52:19.332, OK)
> (2, Bob 2024-11-27 11:52:19.332, OK) {code}
> The second lookup join should output nothing, as there's not status 
> 'NOT_EXISTS'.
> But the result after union is:
> {code:java}
> 1, Alice, 2024-11-27 11:52:19.332, OK
> 2, Bob, 2024-11-27 11:52:19.332, OK
> 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
> 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
> There shouldn't be any 'NOT_EXISTS's. 
> The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
> appended directly by the calc after the lookup join operation, which is not 
> as expected. 
> {code:java}
> | == Abstract Syntax Tree ==
> LogicalUnion(all=[true])
> :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
> :  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
> :     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{0, 2}])
> :        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
> :        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> stream]])
> :        +- LogicalFilter(condition=[=($cor0.id, $0)])
> :           +- LogicalSnapshot(period=[$cor0.txn_time])
> :              +- LogicalTableScan(table=[[default_catalog, default_database, 
> dim]])
> +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
>    +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
>       +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{0, 2}])
>          :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
>          :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> stream]])
>          +- LogicalFilter(condition=[=($cor1.id, $0)])
>             +- LogicalSnapshot(period=[$cor1.txn_time])
>                +- LogicalTableScan(table=[[default_catalog, default_database, 
> dim]])
> == Optimized Physical Plan ==
> Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
> +- Union(all=[true], union=[id, name, txn_time, status])
>    :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
> status])
>    :  +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
>    :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
>    :        +- TableSourceScan(table=[[default_catalog, default_database, 
> stream]], fields=[id, name])
>    +- Calc(select=[id, name, txn_time, 
> CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
>       +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
>          +- Calc(select=[id, name, PROCTIME() AS txn_time])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> stream]], fields=[id, name])
> == Optimized Execution Plan ==
> Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
> +- Union(all=[true], union=[id, name, txn_time, status])
>    :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
> status])
>    :  +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, 
> id])(reuse_id=[1])
>    :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
>    :        +- TableSourceScan(table=[[default_catalog, default_database, 
> stream]], fields=[id, name])
>    +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS 
> VARCHAR(2147483647)) AS status])
>       +- Reused(reference_id=[1])
>  | {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to