nuno-faria commented on code in PR #14026:
URL: https://github.com/apache/datafusion/pull/14026#discussion_r1906812252


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -985,6 +985,77 @@ impl OptimizerRule for PushDownFilter {
                         }
                     })
             }
+            LogicalPlan::Window(window) => {
+                // Retrieve the set of potential partition keys where we can 
push filters by.
+                // Unlike aggregations, where there is only one statement per 
SELECT, there can be
+                // multiple window functions, each with potentially different 
partition keys.
+                // Therefore, we need to ensure that any potential partition 
key returned is used in
+                // ALL window functions. Otherwise, filters cannot be pushed 
by through that column.
+                let potential_partition_keys = window
+                    .window_expr
+                    .iter()
+                    .map(|e| {
+                        if let Expr::WindowFunction(window_expression) = e {
+                            window_expression
+                                .partition_by
+                                .iter()
+                                .map(|c| {
+                                    Column::from_qualified_name(
+                                        c.schema_name().to_string(),
+                                    )
+                                })
+                                .collect::<HashSet<_>>()
+                        } else {
+                            // window functions expressions are only 
Expr::WindowFunction
+                            unreachable!()
+                        }
+                    })
+                    // performs the set intersection of the partition keys of 
all window functions,
+                    // returning only the common ones
+                    .reduce(|a, b| &a & &b)
+                    .unwrap_or_default();
+
+                let predicates = split_conjunction_owned(filter.predicate);
+                let mut keep_predicates = vec![];
+                let mut push_predicates = vec![];
+                for expr in predicates {
+                    let cols = expr.column_refs();
+                    if cols.iter().all(|c| 
potential_partition_keys.contains(c)) {
+                        push_predicates.push(expr);
+                    } else {
+                        keep_predicates.push(expr);
+                    }
+                }
+
+                // Unlike with aggregations, there are no cases where we have 
to replace, e.g.,
+                // `a+b` with Column(a)+Column(b). This is because partition 
expressions are not
+                // available as standalone columns to the user. For example, 
while an aggregation on
+                // `a+b` becomes Column(a + b), in a window partition it 
becomes
+                // `func() PARTITION BY [a + b] ...`. Thus, filters on 
expressions always remain in
+                // place, so we can use `push_predicates` directly. This is 
consistent with other
+                // optimizers, such as the one used by Postgres.

Review Comment:
   That's right. Unlike with aggregations, the partition column `a+b` is not 
projected.
   Interestingly enough, Postgres also does not push filters in these cases, so 
it must follow a similar approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to