adriangb commented on code in PR #15301: URL: https://github.com/apache/datafusion/pull/15301#discussion_r2015101716
########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -295,3 +350,104 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub(crate) fn build_pruning_predicate( + predicate: Arc<dyn PhysicalExpr>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option<Arc<PruningPredicate>> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub(crate) fn build_page_pruning_predicate( + predicate: &Arc<dyn PhysicalExpr>, + file_schema: &SchemaRef, +) -> Arc<PagePruningAccessPlanFilter> { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} + +/// A vistor for a PhysicalExpr that collects all column references to determine what columns the expression needs to be evaluated. +struct FilterSchemaBuilder<'schema> { + filter_schema_fields: BTreeSet<Arc<Field>>, + file_schema: &'schema Schema, + table_schema: &'schema Schema, +} + +impl<'schema> FilterSchemaBuilder<'schema> { + fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> Self { + Self { + filter_schema_fields: BTreeSet::new(), + file_schema, + table_schema, + } + } + + fn sort_fields( + fields: &mut Vec<Arc<Field>>, + table_schema: &Schema, + file_schema: &Schema, + ) { + fields.sort_by_key(|f| f.name().to_string()); + fields.dedup_by_key(|f| f.name().to_string()); + fields.sort_by_key(|f| { + let table_schema_index = + table_schema.index_of(f.name()).unwrap_or(usize::MAX); + let file_schema_index = file_schema.index_of(f.name()).unwrap_or(usize::MAX); + (table_schema_index, file_schema_index) + }); + } + + fn build(self) -> SchemaRef { + let mut fields = self.filter_schema_fields.into_iter().collect::<Vec<_>>(); + FilterSchemaBuilder::sort_fields( + &mut fields, + self.table_schema, + self.file_schema, + ); + Arc::new(Schema::new(fields)) + } +} + +impl TreeNodeRewriter for FilterSchemaBuilder<'_> { + type Node = Arc<dyn PhysicalExpr>; + + fn f_down(&mut self, node: Arc<dyn PhysicalExpr>) -> Result<Transformed<Self::Node>> { + if let Some(column) = node.as_any().downcast_ref::<Column>() { + if let Ok(field) = self.table_schema.field_with_name(column.name()) { + self.filter_schema_fields.insert(Arc::new(field.clone())); + } else if let Ok(field) = self.file_schema.field_with_name(column.name()) { + self.filter_schema_fields.insert(Arc::new(field.clone())); + } else { + // If it's not in either schema it must be a partition column Review Comment: Hmm not sure I understand. This doesn't push down filters into partitioning columns. I think the issue is that as far as TopK (or other operators above the scan level) are concerned there is no such thing as a partitioning column, just a column. So they can't filter out what filters to push down. The filtering has to happen within the opener/scan... and this seems like as good of a place as any? ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; + let dynamic_filters = self + .dynamic_filters + .iter() + .map(|f| f.current_filters()) + .collect::<Result<Vec<_>>>()? + .into_iter() + .flatten() + .collect::<Vec<_>>(); + // Collect dynamic_filters into a single predicate by reducing with AND + let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| { + Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b)) + }); + let enable_page_index = should_enable_page_index( + self.enable_page_index, + &self.page_pruning_predicate, + dynamic_predicate.is_some(), + ); + let predicate = self.predicate.clone(); + let predicate = match (predicate, dynamic_predicate) { Review Comment: agreed 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org