nuno-faria commented on code in PR #14026: URL: https://github.com/apache/datafusion/pull/14026#discussion_r1907601841
########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -1442,6 +1513,227 @@ mod tests { assert_optimized_plan_eq(plan, expected) } + /// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed + #[test] + fn filter_move_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a"), col("b")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(col("b").gt(lit(10i64)))? + .build()?; + + let expected = "\ + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.b > Int64(10)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and + /// 'b' are pushed + #[test] + fn filter_move_complex_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a"), col("b")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))? + .build()?; + + let expected = "\ + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed + #[test] + fn filter_move_partial_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))? + .build()?; + + let expected = "\ + Filter: test.b = Int64(1)\ + \n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.a > Int64(10)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that filters on partition expressions are not pushed, as the single expression + /// column is not available to the user, unlike with aggregations + #[test] + fn filter_expression_keep_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + // unlike with aggregations, single partition column "test.a + test.b" is not available + // to the plan, so we use multiple columns when filtering + .filter(add(col("a"), col("b")).gt(lit(10i64)))? + .build()?; + + let expected = "\ + Filter: test.a + test.b > Int64(10)\ + \n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that filters are not pushed on order by columns (that are not used in partitioning) + #[test] + fn filter_order_keep_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .filter(col("c").gt(lit(10i64)))? + .build()?; + + let expected = "\ + Filter: test.c > Int64(10)\ + \n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when we use multiple window functions with a common partition key, the filter + /// on that key is pushed + #[test] + fn filter_multiple_windows_common_partitions() -> Result<()> { + let table_scan = test_table_scan()?; + + let window1 = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let window2 = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("b"), col("a")]) + .order_by(vec![col("c").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window1, window2])? + .filter(col("a").gt(lit(10i64)))? // a appears in both window functions + .build()?; + + let expected = "\ + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\ + \n TableScan: test, full_filters=[test.a > Int64(10)]"; + assert_optimized_plan_eq(plan, expected) + } + + /// verifies that when we use multiple window functions with different partitions keys, the Review Comment: Yes, if we try to push the filter of a column that is not used as a partition key in all window functions, it could produce a wrong result set. On the other hand, if the key is used as a partition key in all window functions, then we can safely push it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org