Alexander Trushev created FLINK-28567: -----------------------------------------
Summary: Introduce predicate inference from one side of join to the other Key: FLINK-28567 URL: https://issues.apache.org/jira/browse/FLINK-28567 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Alexander Trushev h2. Context There is JoinPushTransitivePredicatesRule in Calcite that infers predicates from on a Join and creates Filters if those predicates can be pushed to its inputs. *Example.* (a0 = b0 AND a0 > 0) => (b0 > 0) h2. Proposal Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to FlinkStreamRuleSets and to FlinkBatchRuleSets h2. Benefit Before the changes: {code} Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0; Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a0]]) : +- Calc(select=[a0], where=[>(a0, 0)]) : +- TableSourceScan(table=[[default_catalog, default_database, A, filter=[]]], fields=[a0]) +- Exchange(distribution=[hash[b0]]) +- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b0]) {code} After the changes: {code} Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a0]]) : +- Calc(select=[a0], where=[>(a0, 0)]) : +- TableSourceScan(table=[[default_catalog, default_database, A, filter=[]]], fields=[a0]) +- Exchange(distribution=[hash[b0]]) +- Calc(select=[b0], where=[>(b0, 0)]) +- TableSourceScan(table=[[default_catalog, default_database, B, filter=[]]], fields=[b0]) {code} i.e., b0 > 0 is inferred and pushed down -- This message was sent by Atlassian Jira (v8.20.10#820010)