alamb commented on code in PR #16424: URL: https://github.com/apache/datafusion/pull/16424#discussion_r2150916597
########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +511,91 @@ fn should_enable_page_index( .unwrap_or(false) } +/// Prune based on partition values and file-level statistics. +pub struct FilePruner { + predicate: Arc<dyn PhysicalExpr>, + pruning_schema: Arc<Schema>, Review Comment: Could we maybe add some comments about what a `pruning_schema` is? And how it relates to `partition_fields` ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +511,91 @@ fn should_enable_page_index( .unwrap_or(false) } +/// Prune based on partition values and file-level statistics. +pub struct FilePruner { + predicate: Arc<dyn PhysicalExpr>, + pruning_schema: Arc<Schema>, + file: PartitionedFile, + partition_fields: Vec<FieldRef>, + predicate_creation_errors: Count, +} + +impl FilePruner { + pub fn new_opt( Review Comment: Could we document under what circumstances it returns `None`? I think it is when there are no dynamic predicates It would also be good to document **why** it would return `None` (in this case because we assume that files have already been pruned using any non-dynamic predicates so additional pruning may happen ONLY when new dynamic predicates are available??) ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -384,6 +353,24 @@ impl FileOpener for ParquetOpener { .map(move |maybe_batch| { maybe_batch .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) + }) + .take_while(move |_| { + if let Some(file_pruner) = file_pruner.as_ref() { + match file_pruner.should_prune() { Review Comment: This is basically applying the filter on each record batch, right? I think once we can actually push the filters into the parquet scan (which I realize I have been talking about for months...) this could become be entirely redundant On the other hand, it also stops the input immediately if we find out the file should stop 🤔 ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +512,91 @@ fn should_enable_page_index( .unwrap_or(false) } +/// Prune based on partition values and file-level statistics. +pub struct FilePruner { + predicate: Arc<dyn PhysicalExpr>, + pruning_schema: Arc<Schema>, + file: PartitionedFile, + partition_fields: Vec<FieldRef>, + predicate_creation_errors: Count, +} + +impl FilePruner { + pub fn new_opt( + predicate: Arc<dyn PhysicalExpr>, + logical_file_schema: &SchemaRef, + partition_fields: Vec<FieldRef>, + file: PartitionedFile, + predicate_creation_errors: Count, + ) -> Result<Option<Self>> { + // If there is not dynamic predicate, we don't need to prune + if !is_dynamic_physical_expr(Arc::clone(&predicate))? { + return Ok(None); + } + // Build a pruning schema that combines the file fields and partition fields. + // Partition fileds are always at the end. + let pruning_schema = Arc::new( + Schema::new( + logical_file_schema + .fields() + .iter() + .cloned() + .chain(partition_fields.iter().cloned()) + .collect_vec(), + ) + .with_metadata(logical_file_schema.metadata().clone()), + ); + Ok(Some(Self { + predicate, + pruning_schema, + file, + partition_fields, + predicate_creation_errors, + })) + } + + pub fn should_prune(&self) -> Result<bool> { + let pruning_predicate = build_pruning_predicate( + Arc::clone(&self.predicate), + &self.pruning_schema, + &self.predicate_creation_errors, + ); Review Comment: it happens once per file, right? If so I agree that doing it as a follow on optimization sounds good. However, I recommend we file a ticket while this is all in our heads / we have the full context otherwise we'll forget what to do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org