alamb commented on code in PR #14026:
URL: https://github.com/apache/datafusion/pull/14026#discussion_r1907385995


##########
datafusion/optimizer/Cargo.toml:
##########
@@ -43,6 +43,7 @@ arrow = { workspace = true }
 chrono = { workspace = true }
 datafusion-common = { workspace = true, default-features = true }
 datafusion-expr = { workspace = true }
+datafusion-functions-window = { workspace = true }

Review Comment:
   this is something we have been trying to avoid -- explicit dependencies on 
functions in the optimizer (this allows faster compilation times)
   
   Since the package is only used in tests, can you please move it into the 
`dev-dependencies` section below ?



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1442,6 +1513,227 @@ mod tests {
         assert_optimized_plan_eq(plan, expected)
     }
 
+    /// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 
'b' is pushed
+    #[test]
+    fn filter_move_window() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let window = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![col("a"), col("b")])
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![window])?
+            .filter(col("b").gt(lit(10i64)))?
+            .build()?;
+
+        let expected = "\
+        WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY 
[test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
+        \n  TableScan: test, full_filters=[test.b > Int64(10)]";
+        assert_optimized_plan_eq(plan, expected)
+    }
+
+    /// verifies that when partitioning by 'a' and 'b', and filtering by 'a' 
and 'b', both 'a' and
+    /// 'b' are pushed
+    #[test]
+    fn filter_move_complex_window() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let window = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![col("a"), col("b")])
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![window])?
+            .filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
+            .build()?;
+
+        let expected = "\
+            WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] 
ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW]]\
+            \n  TableScan: test, full_filters=[test.a > Int64(10), test.b = 
Int64(1)]";
+        assert_optimized_plan_eq(plan, expected)
+    }
+
+    /// verifies that when partitioning by 'a' and filtering by 'a' and 'b', 
only 'a' is pushed
+    #[test]
+    fn filter_move_partial_window() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let window = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![col("a")])
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![window])?
+            .filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
+            .build()?;
+
+        let expected = "\
+        Filter: test.b = Int64(1)\
+        \n  WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY 
[test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
+        \n    TableScan: test, full_filters=[test.a > Int64(10)]";
+        assert_optimized_plan_eq(plan, expected)
+    }
+
+    /// verifies that filters on partition expressions are not pushed, as the 
single expression
+    /// column is not available to the user, unlike with aggregations
+    #[test]
+    fn filter_expression_keep_window() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let window = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![window])?
+            // unlike with aggregations, single partition column "test.a + 
test.b" is not available
+            // to the plan, so we use multiple columns when filtering
+            .filter(add(col("a"), col("b")).gt(lit(10i64)))?
+            .build()?;
+
+        let expected = "\
+        Filter: test.a + test.b > Int64(10)\
+        \n  WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] 
ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW]]\
+        \n    TableScan: test";
+        assert_optimized_plan_eq(plan, expected)
+    }
+
+    /// verifies that filters are not pushed on order by columns (that are not 
used in partitioning)
+    #[test]
+    fn filter_order_keep_window() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let window = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![col("a")])
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![window])?
+            .filter(col("c").gt(lit(10i64)))?
+            .build()?;
+
+        let expected = "\
+        Filter: test.c > Int64(10)\
+        \n  WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY 
[test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
+        \n    TableScan: test";
+        assert_optimized_plan_eq(plan, expected)
+    }
+
+    /// verifies that when we use multiple window functions with a common 
partition key, the filter
+    /// on that key is pushed
+    #[test]
+    fn filter_multiple_windows_common_partitions() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let window1 = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![col("a")])
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let window2 = Expr::WindowFunction(WindowFunction::new(
+            WindowFunctionDefinition::WindowUDF(
+                datafusion_functions_window::rank::rank_udwf(),
+            ),
+            vec![],
+        ))
+        .partition_by(vec![col("b"), col("a")])
+        .order_by(vec![col("c").sort(true, true)])
+        .build()
+        .unwrap();
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![window1, window2])?
+            .filter(col("a").gt(lit(10i64)))? // a appears in both window 
functions
+            .build()?;
+
+        let expected = "\
+            WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY 
[test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
+            \n  TableScan: test, full_filters=[test.a > Int64(10)]";
+        assert_optimized_plan_eq(plan, expected)
+    }
+
+    /// verifies that when we use multiple window functions with different 
partitions keys, the

Review Comment:
   I am pretty sure the rationale for not pushing in this case is that the 
filtered rows may contribute / affect the values of other partitions
   



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -985,6 +985,77 @@ impl OptimizerRule for PushDownFilter {
                         }
                     })
             }
+            LogicalPlan::Window(window) => {
+                // Retrieve the set of potential partition keys where we can 
push filters by.
+                // Unlike aggregations, where there is only one statement per 
SELECT, there can be
+                // multiple window functions, each with potentially different 
partition keys.
+                // Therefore, we need to ensure that any potential partition 
key returned is used in
+                // ALL window functions. Otherwise, filters cannot be pushed 
by through that column.
+                let potential_partition_keys = window
+                    .window_expr
+                    .iter()
+                    .map(|e| {
+                        if let Expr::WindowFunction(window_expression) = e {
+                            window_expression
+                                .partition_by
+                                .iter()
+                                .map(|c| {
+                                    Column::from_qualified_name(
+                                        c.schema_name().to_string(),
+                                    )
+                                })
+                                .collect::<HashSet<_>>()
+                        } else {
+                            // window functions expressions are only 
Expr::WindowFunction
+                            unreachable!()
+                        }
+                    })
+                    // performs the set intersection of the partition keys of 
all window functions,
+                    // returning only the common ones
+                    .reduce(|a, b| &a & &b)
+                    .unwrap_or_default();
+
+                let predicates = split_conjunction_owned(filter.predicate);
+                let mut keep_predicates = vec![];
+                let mut push_predicates = vec![];
+                for expr in predicates {
+                    let cols = expr.column_refs();
+                    if cols.iter().all(|c| 
potential_partition_keys.contains(c)) {
+                        push_predicates.push(expr);
+                    } else {
+                        keep_predicates.push(expr);
+                    }
+                }
+
+                // Unlike with aggregations, there are no cases where we have 
to replace, e.g.,
+                // `a+b` with Column(a)+Column(b). This is because partition 
expressions are not
+                // available as standalone columns to the user. For example, 
while an aggregation on
+                // `a+b` becomes Column(a + b), in a window partition it 
becomes
+                // `func() PARTITION BY [a + b] ...`. Thus, filters on 
expressions always remain in
+                // place, so we can use `push_predicates` directly. This is 
consistent with other
+                // optimizers, such as the one used by Postgres.

Review Comment:
   I filed a PR to try and document the output schema more clearly:
   - https://github.com/apache/datafusion/pull/14047



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to