eejbyfeldt commented on code in PR #13249:
URL: https://github.com/apache/datafusion/pull/13249#discussion_r1828347437


##########
datafusion/optimizer/src/eliminate_outer_join.rs:
##########
@@ -443,4 +361,214 @@ mod tests {
         \n    TableScan: t2";
         assert_optimized_plan_equal(plan, expected)
     }
+
+    #[test]
+    fn eliminate_full_with_hybrid_filter() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        // eliminate to inner join
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Full,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .filter(binary_expr(col("t1.b"), Gt, col("t2.b")))?
+            .build()?;
+        let expected = "\
+        Filter: t1.b > t2.b\
+        \n  Inner Join: t1.a = t2.a\
+        \n    TableScan: t1\
+        \n    TableScan: t2";
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[derive(Debug)]
+    struct DoNothingUdf {
+        signature: Signature,
+    }
+
+    impl DoNothingUdf {
+        pub fn new() -> Self {
+            Self {
+                signature: Signature::any(1, Volatility::Immutable),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for DoNothingUdf {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn name(&self) -> &str {
+            "do_nothing"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+            Ok(arg_types[0].clone())
+        }
+
+        fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+            Ok(args[0].clone())
+        }
+    }
+
+    #[test]
+    fn eliminate_right_with_udf() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+        let fun = Arc::new(ScalarUDF::new_from_impl(DoNothingUdf::new()));
+
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Right,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .filter(
+                Expr::ScalarFunction(ScalarFunction::new_udf(fun, 
vec![col("t1.b")]))
+                    .gt(lit(10u32)),
+            )?
+            .build()?;
+
+        let expected = "\
+        Filter: do_nothing(t1.b) > UInt32(10)\
+        \n  Inner Join: t1.a = t2.a\
+        \n    TableScan: t1\
+        \n    TableScan: t2";
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[derive(Debug)]
+    struct AlwaysNullUdf {
+        signature: Signature,
+    }
+
+    impl AlwaysNullUdf {
+        pub fn new() -> Self {
+            Self {
+                signature: Signature::any(1, Volatility::Immutable),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for AlwaysNullUdf {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn name(&self) -> &str {
+            "always_null"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+            Ok(DataType::Null)
+        }
+
+        fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+            Ok(match &args[0] {
+                ColumnarValue::Array(array) => {
+                    ColumnarValue::create_null_array(array.len())
+                }
+                ColumnarValue::Scalar(_) => 
ColumnarValue::Scalar(ScalarValue::Null),
+            })
+        }
+    }
+
+    #[test]
+    fn eliminate_right_with_null_udf() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+        let fun = Arc::new(ScalarUDF::new_from_impl(AlwaysNullUdf::new()));
+
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Right,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .filter(
+                Expr::ScalarFunction(ScalarFunction::new_udf(fun, 
vec![col("t1.b")]))
+                    .is_null(),
+            )?
+            .build()?;
+
+        let expected = "\
+        Filter: always_null(t1.b) IS NULL\
+        \n  Right Join: t1.a = t2.a\
+        \n    TableScan: t1\
+        \n    TableScan: t2";
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[derive(Debug)]
+    struct VolatileUdf {
+        signature: Signature,
+    }
+
+    impl VolatileUdf {
+        pub fn new() -> Self {
+            Self {
+                signature: Signature::any(1, Volatility::Volatile),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for VolatileUdf {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn name(&self) -> &str {
+            "volatile_func"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+            Ok(DataType::Boolean)
+        }

Review Comment:
   Do we not need to implement an `invoke` method here? Or how does one know 
the test passes due to us properly checking volatility and not because the 
invoke method produces an error?



##########
datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs:
##########
@@ -1675,12 +1675,34 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for 
Simplifier<'a, S> {
                 }
             }
 
+            // null <op> A --> null,
+            Expr::BinaryExpr(BinaryExpr {
+                left,
+                op: Eq | NotEq | Gt | GtEq | Lt | LtEq,
+                right: _,
+            }) if always_null(&left, info) => 
Transformed::yes(lit_bool_null()),
+
+            // A <op> null --> null,
+            Expr::BinaryExpr(BinaryExpr {
+                left: _,
+                op: Eq | NotEq | Gt | GtEq | Lt | LtEq,
+                right,
+            }) if always_null(&right, info) => 
Transformed::yes(lit_bool_null()),
+
             // no additional rewrites possible
             expr => Transformed::no(expr),
         })
     }
 }
 
+fn always_null<S: SimplifyInfo>(expr: &Expr, info: &S) -> bool {
+    is_null(expr)

Review Comment:
   Could we have some comment explaining when to use `always_null` over just 
`is_null`? Because to me it seems like one would always want `always_null`? 



##########
datafusion/optimizer/src/utils.rs:
##########
@@ -129,52 +128,32 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
 /// `c0 > 8` return true;
 /// `c0 IS NULL` return false.
 pub fn is_restrict_null_predicate<'a>(
+    input_schema: &DFSchema,
     predicate: Expr,
-    join_cols_of_predicate: impl IntoIterator<Item = &'a Column>,
+    cols_of_predicate: impl IntoIterator<Item = &'a Column>,
 ) -> Result<bool> {
     if matches!(predicate, Expr::Column(_)) {
         return Ok(true);
     }
 
-    static DUMMY_COL_NAME: &str = "?";
-    let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, 
true)]);
-    let input_schema = DFSchema::try_from(schema.clone())?;
-    let column = new_null_array(&DataType::Null, 1);
-    let input_batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![column])?;
     let execution_props = ExecutionProps::default();
-    let null_column = Column::from_name(DUMMY_COL_NAME);
+    let replace_columns = cols_of_predicate.into_iter().collect();
+    let replaced_predicate = replace_expr_with_null(predicate, 
&replace_columns)?;
+    let coerced_predicate = coerce(replaced_predicate, input_schema)?;
 
-    let join_cols_to_replace = join_cols_of_predicate
-        .into_iter()
-        .map(|column| (column, &null_column))
-        .collect::<HashMap<_, _>>();
+    let info = SimplifyContext::new(&execution_props)
+        .with_schema(Arc::new(input_schema.clone()));
+    let simplifier = ExprSimplifier::new(info).with_canonicalize(false);
+    let expr = simplifier.simplify(coerced_predicate)?;
 
-    let replaced_predicate = replace_col(predicate, &join_cols_to_replace)?;
-    let coerced_predicate = coerce(replaced_predicate, &input_schema)?;
-    let phys_expr =
-        create_physical_expr(&coerced_predicate, &input_schema, 
&execution_props)?;
+    let ret = match &expr {
+        Expr::Literal(scalar) if scalar.is_null() => true,
+        Expr::Literal(ScalarValue::Boolean(Some(b))) => !b,
+        _ if matches!(expr.get_type(input_schema)?, DataType::Null) => true,

Review Comment:
   Since this does not depend on evaluating expr, should we make this condition 
be an early return at the top of the method instead?



##########
datafusion/optimizer/src/utils.rs:
##########
@@ -129,52 +128,32 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
 /// `c0 > 8` return true;
 /// `c0 IS NULL` return false.
 pub fn is_restrict_null_predicate<'a>(
+    input_schema: &DFSchema,
     predicate: Expr,
-    join_cols_of_predicate: impl IntoIterator<Item = &'a Column>,
+    cols_of_predicate: impl IntoIterator<Item = &'a Column>,
 ) -> Result<bool> {
     if matches!(predicate, Expr::Column(_)) {
         return Ok(true);
     }
 
-    static DUMMY_COL_NAME: &str = "?";
-    let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, 
true)]);
-    let input_schema = DFSchema::try_from(schema.clone())?;
-    let column = new_null_array(&DataType::Null, 1);
-    let input_batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![column])?;
     let execution_props = ExecutionProps::default();
-    let null_column = Column::from_name(DUMMY_COL_NAME);
+    let replace_columns = cols_of_predicate.into_iter().collect();
+    let replaced_predicate = replace_expr_with_null(predicate, 
&replace_columns)?;
+    let coerced_predicate = coerce(replaced_predicate, input_schema)?;
 
-    let join_cols_to_replace = join_cols_of_predicate
-        .into_iter()
-        .map(|column| (column, &null_column))
-        .collect::<HashMap<_, _>>();
+    let info = SimplifyContext::new(&execution_props)
+        .with_schema(Arc::new(input_schema.clone()));
+    let simplifier = ExprSimplifier::new(info).with_canonicalize(false);
+    let expr = simplifier.simplify(coerced_predicate)?;
 
-    let replaced_predicate = replace_col(predicate, &join_cols_to_replace)?;
-    let coerced_predicate = coerce(replaced_predicate, &input_schema)?;
-    let phys_expr =
-        create_physical_expr(&coerced_predicate, &input_schema, 
&execution_props)?;
+    let ret = match &expr {
+        Expr::Literal(scalar) if scalar.is_null() => true,
+        Expr::Literal(ScalarValue::Boolean(Some(b))) => !b,
+        _ if matches!(expr.get_type(input_schema)?, DataType::Null) => true,
+        _ => false,
+    };
 
-    let result_type = phys_expr.data_type(&schema)?;
-    if !matches!(&result_type, DataType::Boolean) {
-        return Ok(false);
-    }
-
-    // If result is single `true`, return false;
-    // If result is single `NULL` or `false`, return true;
-    Ok(match phys_expr.evaluate(&input_batch)? {
-        ColumnarValue::Array(array) => {

Review Comment:
   Why do we no longer need to handle exprs returning single element arrays?



##########
datafusion/optimizer/src/utils.rs:
##########
@@ -129,52 +128,32 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
 /// `c0 > 8` return true;
 /// `c0 IS NULL` return false.
 pub fn is_restrict_null_predicate<'a>(
+    input_schema: &DFSchema,
     predicate: Expr,
-    join_cols_of_predicate: impl IntoIterator<Item = &'a Column>,
+    cols_of_predicate: impl IntoIterator<Item = &'a Column>,
 ) -> Result<bool> {
     if matches!(predicate, Expr::Column(_)) {
         return Ok(true);
     }
 
-    static DUMMY_COL_NAME: &str = "?";
-    let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, 
true)]);
-    let input_schema = DFSchema::try_from(schema.clone())?;
-    let column = new_null_array(&DataType::Null, 1);
-    let input_batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![column])?;
     let execution_props = ExecutionProps::default();
-    let null_column = Column::from_name(DUMMY_COL_NAME);
+    let replace_columns = cols_of_predicate.into_iter().collect();
+    let replaced_predicate = replace_expr_with_null(predicate, 
&replace_columns)?;
+    let coerced_predicate = coerce(replaced_predicate, input_schema)?;

Review Comment:
   Since we have access to the schema here could could we replace the exprs 
with nulls with correct types instead and therefore avoiding needing to 
`coerce` the expr after changing it? 
   
   Or at least when it looked at the coerce logic it seemed quite complex and I 
have a hard time convincing my self that it can never affect the result of this 
function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to