adriangb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2015101716


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +350,104 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+    predicate: Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+        Ok(pruning_predicate) => {
+            if !pruning_predicate.always_true() {
+                return Some(Arc::new(pruning_predicate));
+            }
+        }
+        Err(e) => {
+            debug!("Could not create pruning predicate for: {e}");
+            predicate_creation_errors.add(1);
+        }
+    }
+    None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page 
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+    Arc::new(PagePruningAccessPlanFilter::new(
+        predicate,
+        Arc::clone(file_schema),
+    ))
+}
+
+/// A vistor for a PhysicalExpr that collects all column references to 
determine what columns the expression needs to be evaluated.
+struct FilterSchemaBuilder<'schema> {
+    filter_schema_fields: BTreeSet<Arc<Field>>,
+    file_schema: &'schema Schema,
+    table_schema: &'schema Schema,
+}
+
+impl<'schema> FilterSchemaBuilder<'schema> {
+    fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> 
Self {
+        Self {
+            filter_schema_fields: BTreeSet::new(),
+            file_schema,
+            table_schema,
+        }
+    }
+
+    fn sort_fields(
+        fields: &mut Vec<Arc<Field>>,
+        table_schema: &Schema,
+        file_schema: &Schema,
+    ) {
+        fields.sort_by_key(|f| f.name().to_string());
+        fields.dedup_by_key(|f| f.name().to_string());
+        fields.sort_by_key(|f| {
+            let table_schema_index =
+                table_schema.index_of(f.name()).unwrap_or(usize::MAX);
+            let file_schema_index = 
file_schema.index_of(f.name()).unwrap_or(usize::MAX);
+            (table_schema_index, file_schema_index)
+        });
+    }
+
+    fn build(self) -> SchemaRef {
+        let mut fields = 
self.filter_schema_fields.into_iter().collect::<Vec<_>>();
+        FilterSchemaBuilder::sort_fields(
+            &mut fields,
+            self.table_schema,
+            self.file_schema,
+        );
+        Arc::new(Schema::new(fields))
+    }
+}
+
+impl TreeNodeRewriter for FilterSchemaBuilder<'_> {
+    type Node = Arc<dyn PhysicalExpr>;
+
+    fn f_down(&mut self, node: Arc<dyn PhysicalExpr>) -> 
Result<Transformed<Self::Node>> {
+        if let Some(column) = node.as_any().downcast_ref::<Column>() {
+            if let Ok(field) = 
self.table_schema.field_with_name(column.name()) {
+                self.filter_schema_fields.insert(Arc::new(field.clone()));
+            } else if let Ok(field) = 
self.file_schema.field_with_name(column.name()) {
+                self.filter_schema_fields.insert(Arc::new(field.clone()));
+            } else {
+                // If it's not in either schema it must be a partition column

Review Comment:
   Hmm not sure I understand. This doesn't push down filters into partitioning 
columns. I think the issue is that as far as TopK (or other operators above the 
scan level) are concerned there is no such thing as a partitioning column, just 
a column. So they can't filter out what filters to push down. The filtering has 
to happen within the opener/scan... and this seems like as good of a place as 
any?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
+        let dynamic_filters = self
+            .dynamic_filters
+            .iter()
+            .map(|f| f.current_filters())
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Collect dynamic_filters into a single predicate by reducing with AND
+        let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+            Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
+        });
+        let enable_page_index = should_enable_page_index(
+            self.enable_page_index,
+            &self.page_pruning_predicate,
+            dynamic_predicate.is_some(),
+        );
+        let predicate = self.predicate.clone();
+        let predicate = match (predicate, dynamic_predicate) {

Review Comment:
   agreed 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to