alamb commented on code in PR #22534:
URL: https://github.com/apache/datafusion/pull/22534#discussion_r3313499941


##########
datafusion/optimizer/src/eliminate_outer_join.rs:
##########
@@ -77,90 +104,152 @@ impl OptimizerRule for EliminateOuterJoin {
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Filter(mut filter) => match 
Arc::unwrap_or_clone(filter.input) {
+        let LogicalPlan::Filter(filter) = plan else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // Descend through one or more Projection nodes until we find a Join.
+        // For each Projection we encounter, rewrite a working copy of the
+        // predicate by replacing references to projection output columns with
+        // the expressions that define them. Keep the filter's original
+        // predicate intact for eventual use in the rebuilt plan; the rewritten
+        // predicate is used only for the null-rejection analysis.
+        let mut rewritten_predicate = filter.predicate.clone();
+        let mut projections: Vec<Projection> = Vec::new();
+        let mut cur = Arc::clone(&filter.input);
+
+        let new_join = loop {
+            match cur.as_ref() {
+                LogicalPlan::Projection(p) => {
+                    rewritten_predicate =
+                        inline_through_projection(rewritten_predicate, p)?;
+                    let next = Arc::clone(&p.input);
+                    projections.push(p.clone());
+                    cur = next;
+                }
                 LogicalPlan::Join(join) => {
-                    let mut null_rejecting_cols: Vec<Column> = vec![];
-
-                    extract_null_rejecting_columns(
-                        &filter.predicate,
-                        &mut null_rejecting_cols,
-                        join.left.schema(),
-                        join.right.schema(),
-                        true,
-                    );
-
-                    let new_join_type = if join.join_type.is_outer() {
-                        let mut left_non_nullable = false;
-                        let mut right_non_nullable = false;
-                        for col in null_rejecting_cols.iter() {
-                            if join.left.schema().has_column(col) {
-                                left_non_nullable = true;
-                            }
-                            if join.right.schema().has_column(col) {
-                                right_non_nullable = true;
-                            }
-                        }
-                        eliminate_outer(
-                            join.join_type,
-                            left_non_nullable,
-                            right_non_nullable,
-                        )
-                    } else {
-                        join.join_type
+                    let Some(new_join) = try_simplify_join(join, 
&rewritten_predicate)
+                    else {
+                        return 
Ok(Transformed::no(LogicalPlan::Filter(filter)));
                     };
-
-                    let new_join = Arc::new(LogicalPlan::Join(Join {
-                        left: join.left,
-                        right: join.right,
-                        join_type: new_join_type,
-                        join_constraint: join.join_constraint,
-                        on: join.on.clone(),
-                        filter: join.filter.clone(),
-                        schema: Arc::clone(&join.schema),
-                        null_equality: join.null_equality,
-                        null_aware: join.null_aware,
-                    }));
-                    Filter::try_new(filter.predicate, new_join)
-                        .map(|f| Transformed::yes(LogicalPlan::Filter(f)))
+                    break new_join;
                 }
-                filter_input => {
-                    filter.input = Arc::new(filter_input);
-                    Ok(Transformed::no(LogicalPlan::Filter(filter)))
+                _ => {
+                    return Ok(Transformed::no(LogicalPlan::Filter(filter)));
                 }
-            },
-            _ => Ok(Transformed::no(plan)),
+            }
+        };
+
+        let rebuilt_inner = rewrap_projections(new_join, projections);
+        Filter::try_new(filter.predicate, Arc::new(rebuilt_inner))
+            .map(|f| Transformed::yes(LogicalPlan::Filter(f)))
+    }
+}
+
+/// Run the null-rejection analysis on `predicate` against `join`'s left/right
+/// schemas. Return `Some(new_join_plan)` if the join type can be tightened
+/// (e.g. LEFT → INNER), `None` otherwise.
+fn try_simplify_join(join: &Join, predicate: &Expr) -> Option<LogicalPlan> {
+    if !join.join_type.is_outer() {
+        return None;
+    }
+
+    let mut null_rejecting_cols: Vec<Column> = vec![];
+    extract_null_rejecting_columns(
+        predicate,
+        &mut null_rejecting_cols,
+        join.left.schema(),
+        join.right.schema(),
+        true,
+    );
+
+    let mut left_non_nullable = false;
+    let mut right_non_nullable = false;
+    for col in null_rejecting_cols.iter() {
+        if join.left.schema().has_column(col) {
+            left_non_nullable = true;
+        }
+        if join.right.schema().has_column(col) {
+            right_non_nullable = true;
         }
     }
+
+    let new_join_type =
+        eliminate_outer(join.join_type, left_non_nullable, right_non_nullable);
+    if new_join_type == join.join_type {
+        return None;
+    }
+
+    Some(LogicalPlan::Join(Join {
+        left: Arc::clone(&join.left),
+        right: Arc::clone(&join.right),
+        join_type: new_join_type,
+        join_constraint: join.join_constraint,
+        on: join.on.clone(),
+        filter: join.filter.clone(),
+        schema: Arc::clone(&join.schema),
+        null_equality: join.null_equality,
+        null_aware: join.null_aware,
+    }))
+}
+
+/// Substitute the projection's output column references in `predicate` with
+/// the projection's defining expressions (stripped of any `Alias` wrapper).
+/// The result expresses `predicate` over the projection's *input* schema.
+///
+/// Unlike `PushDownFilter`, this rule does not change expression evaluation
+/// behavior (in fact, the rewritten expressions are only used for analysis
+/// purposes). Therefore, function volatility and `MoveTowardsLeafNodes`
+/// placement can be ignored here.
+fn inline_through_projection(predicate: Expr, p: &Projection) -> Result<Expr> {
+    let mut map: HashMap<String, Expr> = HashMap::new();
+    for ((qualifier, field), expr) in p.schema.iter().zip(p.expr.iter()) {
+        map.insert(
+            qualified_name(qualifier, field.name()),
+            unalias(expr).clone(),
+        );
+    }
+    replace_cols_by_name(predicate, &map)
+}
+
+/// Re-attach a stack of projections above `new_inner`, restoring the original
+/// plan shape with the new (possibly retyped) join at the bottom. Projection
+/// schemas are reused as-is; only nullability of columns sourced from the
+/// formerly-outer side may have changed, and the existing rule already takes
+/// this looser-schema approach at the join itself.
+fn rewrap_projections(
+    new_inner: LogicalPlan,
+    projections: Vec<Projection>,
+) -> LogicalPlan {
+    let mut current = new_inner;
+    for mut p in projections.into_iter().rev() {
+        p.input = Arc::new(current);
+        current = LogicalPlan::Projection(p);
+    }
+    current
+}
+
+fn unalias(expr: &Expr) -> &Expr {
+    if let Expr::Alias(a) = expr {
+        unalias(&a.expr)
+    } else {
+        expr
+    }
 }
 
 pub fn eliminate_outer(
     join_type: JoinType,
     left_non_nullable: bool,
     right_non_nullable: bool,
 ) -> JoinType {
-    let mut new_join_type = join_type;
-    match join_type {
-        JoinType::Left if right_non_nullable => {
-            new_join_type = JoinType::Inner;
-        }
-        JoinType::Left => {}
-        JoinType::Right if left_non_nullable => {
-            new_join_type = JoinType::Inner;
-        }
-        JoinType::Right => {}
-        JoinType::Full => {
-            if left_non_nullable && right_non_nullable {
-                new_join_type = JoinType::Inner;
-            } else if left_non_nullable {
-                new_join_type = JoinType::Left;
-            } else if right_non_nullable {
-                new_join_type = JoinType::Right;
-            }
-        }
-        _ => {}
+    match (join_type, left_non_nullable, right_non_nullable) {

Review Comment:
   that is quite a bit nicer



##########
datafusion/optimizer/src/eliminate_outer_join.rs:
##########
@@ -15,39 +15,66 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins
+//! [`EliminateOuterJoin`] rewrites outer joins to simpler join types when
+//! filters make the outer rows unnecessary (e.g. `LEFT`/`RIGHT` to `INNER`,
+//! and `FULL` to `LEFT`/`RIGHT`/`INNER`).
+use crate::push_down_filter::replace_cols_by_name;
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{Column, DFSchema, Result};
-use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan};
+use datafusion_common::{Column, DFSchema, Result, qualified_name};
+use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, Projection};
 use datafusion_expr::{Expr, Filter, Operator};
 
 use crate::optimizer::ApplyOrder;
 use datafusion_common::tree_node::Transformed;
 use datafusion_expr::expr::{BinaryExpr, Cast, InList, Like, TryCast};
+use std::collections::HashMap;
 use std::sync::Arc;
 
+/// Attempt to simplify outer joins when filters make their null-padded
+/// rows impossible to observe.
 ///
-/// Attempt to replace outer joins with inner joins.
+/// Outer joins are generally more expensive than inner joins and can block
+/// predicate pushdown and other optimizations. When a filter above an outer
+/// join removes every row the join would add for unmatched input rows, the
+/// join can be changed to a cheaper join type.

Review Comment:
   This is a much clearer explanation. Thank you



##########
datafusion/optimizer/src/eliminate_outer_join.rs:
##########
@@ -77,90 +104,152 @@ impl OptimizerRule for EliminateOuterJoin {
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Filter(mut filter) => match 
Arc::unwrap_or_clone(filter.input) {
+        let LogicalPlan::Filter(filter) = plan else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // Descend through one or more Projection nodes until we find a Join.
+        // For each Projection we encounter, rewrite a working copy of the
+        // predicate by replacing references to projection output columns with
+        // the expressions that define them. Keep the filter's original
+        // predicate intact for eventual use in the rebuilt plan; the rewritten
+        // predicate is used only for the null-rejection analysis.
+        let mut rewritten_predicate = filter.predicate.clone();
+        let mut projections: Vec<Projection> = Vec::new();
+        let mut cur = Arc::clone(&filter.input);
+
+        let new_join = loop {
+            match cur.as_ref() {
+                LogicalPlan::Projection(p) => {
+                    rewritten_predicate =
+                        inline_through_projection(rewritten_predicate, p)?;
+                    let next = Arc::clone(&p.input);
+                    projections.push(p.clone());

Review Comment:
   I was wondering if we can avoid this `clone`. 
   
   Doing so looks like it will take some fancy footwork (probably one pass to 
decide if we can rewrite the join, and then a second pass to actually do it), 
perhaps we could consider it as a follow on PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to