alamb commented on code in PR #14026: URL: https://github.com/apache/datafusion/pull/14026#discussion_r1907385995
########## datafusion/optimizer/Cargo.toml: ########## @@ -43,6 +43,7 @@ arrow = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions-window = { workspace = true } Review Comment: this is something we have been trying to avoid -- explicit dependencies on functions in the optimizer (this allows faster compilation times) Since the package is only used in tests, can you please move it into the `dev-dependencies` section below ? ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -1442,6 +1513,227 @@ mod tests { assert_optimized_plan_eq(plan, expected) } + /// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed + #[test] + fn filter_move_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a"), col("b")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(col("b").gt(lit(10i64)))? + .build()?; + + let expected = "\ + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.b > Int64(10)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and + /// 'b' are pushed + #[test] + fn filter_move_complex_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a"), col("b")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))? + .build()?; + + let expected = "\ + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed + #[test] + fn filter_move_partial_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))? + .build()?; + + let expected = "\ + Filter: test.b = Int64(1)\ + \n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.a > Int64(10)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that filters on partition expressions are not pushed, as the single expression + /// column is not available to the user, unlike with aggregations + #[test] + fn filter_expression_keep_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + // unlike with aggregations, single partition column "test.a + test.b" is not available + // to the plan, so we use multiple columns when filtering + .filter(add(col("a"), col("b")).gt(lit(10i64)))? + .build()?; + + let expected = "\ + Filter: test.a + test.b > Int64(10)\ + \n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that filters are not pushed on order by columns (that are not used in partitioning) + #[test] + fn filter_order_keep_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(col("c").gt(lit(10i64)))? + .build()?; + + let expected = "\ + Filter: test.c > Int64(10)\ + \n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when we use multiple window functions with a common partition key, the filter + /// on that key is pushed + #[test] + fn filter_multiple_windows_common_partitions() -> Result<()> { + let table_scan = test_table_scan()?; + + let window1 = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let window2 = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("b"), col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window1, window2])? + .filter(col("a").gt(lit(10i64)))? // a appears in both window functions + .build()?; + + let expected = "\ + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.a > Int64(10)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when we use multiple window functions with different partitions keys, the Review Comment: I am pretty sure the rationale for not pushing in this case is that the filtered rows may contribute / affect the values of other partitions ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -985,6 +985,77 @@ impl OptimizerRule for PushDownFilter { } }) } + LogicalPlan::Window(window) => { + // Retrieve the set of potential partition keys where we can push filters by. + // Unlike aggregations, where there is only one statement per SELECT, there can be + // multiple window functions, each with potentially different partition keys. + // Therefore, we need to ensure that any potential partition key returned is used in + // ALL window functions. Otherwise, filters cannot be pushed by through that column. + let potential_partition_keys = window + .window_expr + .iter() + .map(|e| { + if let Expr::WindowFunction(window_expression) = e { + window_expression + .partition_by + .iter() + .map(|c| { + Column::from_qualified_name( + c.schema_name().to_string(), + ) + }) + .collect::<HashSet<_>>() + } else { + // window functions expressions are only Expr::WindowFunction + unreachable!() + } + }) + // performs the set intersection of the partition keys of all window functions, + // returning only the common ones + .reduce(|a, b| &a & &b) + .unwrap_or_default(); + + let predicates = split_conjunction_owned(filter.predicate); + let mut keep_predicates = vec![]; + let mut push_predicates = vec![]; + for expr in predicates { + let cols = expr.column_refs(); + if cols.iter().all(|c| potential_partition_keys.contains(c)) { + push_predicates.push(expr); + } else { + keep_predicates.push(expr); + } + } + + // Unlike with aggregations, there are no cases where we have to replace, e.g., + // `a+b` with Column(a)+Column(b). This is because partition expressions are not + // available as standalone columns to the user. For example, while an aggregation on + // `a+b` becomes Column(a + b), in a window partition it becomes + // `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in + // place, so we can use `push_predicates` directly. This is consistent with other + // optimizers, such as the one used by Postgres. Review Comment: I filed a PR to try and document the output schema more clearly: - https://github.com/apache/datafusion/pull/14047 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org