morazow opened a new pull request, #26514:
URL: https://github.com/apache/flink/pull/26514

   ## What is the purpose of the change
   
   - Fix bug in LookupJoin that occurs when used together with filterable (that 
support filter pushdown) table and union all
   
   ### Brief description of the bug
   
   Given a union all query:
   - that combines results of the two lookup joins
   - that these lookup joins have different filter queries
   
   ```
   SELECT
        s.id,
        s.name,
        s.txn_time,
        d.status
   FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` 
AS `d`
   ON
        `s`.`id` = `d`.`id`
   WHERE
        `d`.`status` = 'OK' 
   UNION ALL
   SELECT
        s.id,
        s.name,
        s.txn_time,
        d.status
   FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` 
AS `d`
   ON
        `s`.`id` = `d`.`id`
   WHERE
        `d`.`status` = 'NOT_EXISTS';
   ```
   
   In this situation the planner will pushdown the filter condition into the 
table part of the lookup join, but the structure of the lookup joins stays the 
same, e.g, they will have same `digests` with different table / temporal table.
   
   This is the problem since the Calcite Volcano optimizer will register them 
equivalent because it does so [using the digest of the relation 
nodes](https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L1304-L1328).
   
   This introduces the bug because when optimizing the Union, both parts of the 
query will be treated same (even though we have different where clauses) and 
the found cheapest plan will be same for both lookup joins.
   
   You can also see the effect if you put non-existing filter first the result 
will be empty, because the first lookupjoin is also used for the second part of 
the union.
   
   ### Alternative Solutions
   
   The better solution would be to improve the LookupJoin expression to also 
include the filter condition into the table name. For example,
   ```
   LookupJoin(table=[default_catalog.default_database.dim, 
filter=[<PUSHED-DOWN-FILTERS>]], joinType=[InnerJoin], lookup=[id=id], 
select=[id, name, txn_time, id], upsertKey=[[0]])
   ```
   
   But this would require many refactoring, and mainly in the tests.
   
   In this PR, I have opted for adding another `filter` with pushed down filter 
conditions if the LookupJoin contains a table with filter pushdowns.
   
   ## Brief change log
   
   - Add unit test to reproduce the test
   - Add (one alternative) fix to resolve the bug
   
   ## Verifying this change
   
   The change adds test case that reproduces the bug that can be verified by 
the fixes.
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   - The serializers: no
   - The runtime per-record code paths (performance sensitive): no
   - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
   - The S3 file system connector: no 
   
   ## Documentation
   
   - Does this pull request introduce a new feature? no
   - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to