2010YOUY01 commented on code in PR #14026: URL: https://github.com/apache/datafusion/pull/14026#discussion_r1906209324
########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -985,6 +985,77 @@ impl OptimizerRule for PushDownFilter { } }) } + LogicalPlan::Window(window) => { + // Retrieve the set of potential partition keys where we can push filters by. + // Unlike aggregations, where there is only one statement per SELECT, there can be + // multiple window functions, each with potentially different partition keys. + // Therefore, we need to ensure that any potential partition key returned is used in + // ALL window functions. Otherwise, filters cannot be pushed by through that column. + let potential_partition_keys = window + .window_expr + .iter() + .map(|e| { + if let Expr::WindowFunction(window_expression) = e { + window_expression + .partition_by + .iter() + .map(|c| { + Column::from_qualified_name( + c.schema_name().to_string(), + ) + }) + .collect::<HashSet<_>>() + } else { + // window functions expressions are only Expr::WindowFunction + unreachable!() + } + }) + // performs the set intersection of the partition keys of all window functions, + // returning only the common ones + .reduce(|a, b| &a & &b) + .unwrap_or_default(); + + let predicates = split_conjunction_owned(filter.predicate); + let mut keep_predicates = vec![]; + let mut push_predicates = vec![]; + for expr in predicates { + let cols = expr.column_refs(); + if cols.iter().all(|c| potential_partition_keys.contains(c)) { + push_predicates.push(expr); + } else { + keep_predicates.push(expr); + } + } + + // Unlike with aggregations, there are no cases where we have to replace, e.g., + // `a+b` with Column(a)+Column(b). This is because partition expressions are not + // available as standalone columns to the user. For example, while an aggregation on + // `a+b` becomes Column(a + b), in a window partition it becomes + // `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in + // place, so we can use `push_predicates` directly. This is consistent with other + // optimizers, such as the one used by Postgres. Review Comment: Do I understand this comment block correctly: ``` Filter: a+b > 0 -- Window: func() PARTITION BY [a+b] ... ``` Filter in theory can be pushed down in this plan, but we are not able to implement it due to engineering issue (I think it's related to your concern @comphead ) ########## datafusion/optimizer/src/push_down_filter.rs: ########## @@ -985,6 +985,77 @@ impl OptimizerRule for PushDownFilter { } }) } + LogicalPlan::Window(window) => { Review Comment: I recommend adding an example here for better understandability, the rule applied here is not straightforward I think Something like ``` Before: Filter: (a > 1) and (b > 1) and (c > 1) -- Window: func() PARTITION BY [a] ... func() PARTITION BY [a, b] ... After: Filter: (b > 1) and (c > 1) -- Window: func() PARTITION BY [a] ... func() PARTITION BY [a, b] ... ---- Filter (a > 1) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org