itsjunetime commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1737065470


##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
     /// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
     /// * `Ok(None)` if the expression cannot be used as an ArrowFilter
     /// * `Err(e)` if an error occurs while building the candidate
-    pub fn build(
-        mut self,
-        metadata: &ParquetMetaData,
-    ) -> Result<Option<FilterCandidate>> {
-        let expr = self.expr.clone().rewrite(&mut self).data()?;
-
-        if self.non_primitive_columns || self.projected_columns {
-            Ok(None)
-        } else {
-            let required_bytes =
-                size_of_columns(&self.required_column_indices, metadata)?;
-            let can_use_index = columns_sorted(&self.required_column_indices, 
metadata)?;
-
-            Ok(Some(FilterCandidate {
-                expr,
-                required_bytes,
-                can_use_index,
-                projection: self.required_column_indices.into_iter().collect(),
-            }))
+    pub fn build(self, metadata: &ParquetMetaData) -> 
Result<Option<FilterCandidate>> {
+        let Some((projection, rewritten_expr)) = non_pushdown_columns(
+            Arc::clone(&self.expr),
+            self.file_schema,
+            self.table_schema,
+        )?
+        else {
+            return Ok(None);
+        };
+
+        let required_bytes = size_of_columns(&self.required_column_indices, 
metadata)?;
+        let can_use_index = columns_sorted(&self.required_column_indices, 
metadata)?;
+
+        Ok(Some(FilterCandidate {
+            expr: rewritten_expr,
+            required_bytes,
+            can_use_index,
+            projection,
+        }))
+    }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree 
structure to determine
+// if any column references in the expression would prevent it from being 
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used 
only once.
+struct PushdownChecker<'schema> {
+    /// Does the expression require any non-primitive columns (like structs)?
+    non_primitive_columns: bool,
+    /// Does the expression reference any columns that are in the table
+    /// schema but not in the file schema?
+    projected_columns: bool,
+    // the indices of all the columns found within the given expression which 
exist inside the given
+    // [`file_schema`]
+    required_column_indices: BTreeSet<usize>,
+    file_schema: &'schema Schema,
+    table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+    fn check_single_column(&mut self, column_name: &str) -> 
Option<TreeNodeRecursion> {

Review Comment:
   I made this into an associated function so that it can be used by itself or 
as a part of `PushdownChecker`'s tree node recursion functions. It sets some 
things on the `PushdownChecker` that are needed to track projected columns, so 
I guess we could make it a free function, but then it would still need to pass 
in all the fields of `PushdownChecker` to do all it needs to do, thus negating 
the benefits of being a free function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to