Alexander Trushev created FLINK-28567:
-----------------------------------------

             Summary: Introduce predicate inference from one side of join to 
the other
                 Key: FLINK-28567
                 URL: https://issues.apache.org/jira/browse/FLINK-28567
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Alexander Trushev


h2. Context

There is JoinPushTransitivePredicatesRule in Calcite that infers predicates 
from on a Join and creates Filters if those predicates can be pushed to its 
inputs.
*Example.* (a0 = b0 AND a0 > 0) => (b0 > 0)

h2. Proposal

Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to 
FlinkStreamRuleSets and to FlinkBatchRuleSets

h2. Benefit

Before the changes:

{code}
Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0;

Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a0], where=[>(a0, 0)])
: +- TableSourceScan(table=[[default_catalog, default_database, A, filter=[]]], 
fields=[a0])
+- Exchange(distribution=[hash[b0]])
+- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b0])
{code}

After the changes:

{code}
Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a0], where=[>(a0, 0)])
: +- TableSourceScan(table=[[default_catalog, default_database, A, filter=[]]], 
fields=[a0])
+- Exchange(distribution=[hash[b0]])
+- Calc(select=[b0], where=[>(b0, 0)])
+- TableSourceScan(table=[[default_catalog, default_database, B, filter=[]]], 
fields=[b0])
{code}

i.e., b0 > 0 is inferred and pushed down




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to