alamb commented on code in PR #16424:
URL: https://github.com/apache/datafusion/pull/16424#discussion_r2150916597


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +511,91 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+/// Prune based on partition values and file-level statistics.
+pub struct FilePruner {
+    predicate: Arc<dyn PhysicalExpr>,
+    pruning_schema: Arc<Schema>,

Review Comment:
   Could we maybe add some comments about what a `pruning_schema` is? And how 
it relates to `partition_fields`



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +511,91 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+/// Prune based on partition values and file-level statistics.
+pub struct FilePruner {
+    predicate: Arc<dyn PhysicalExpr>,
+    pruning_schema: Arc<Schema>,
+    file: PartitionedFile,
+    partition_fields: Vec<FieldRef>,
+    predicate_creation_errors: Count,
+}
+
+impl FilePruner {
+    pub fn new_opt(

Review Comment:
   Could we document under what circumstances it returns `None`? I think it is 
when there are no dynamic predicates
   
   It would also be good to document **why** it would return `None` (in this 
case because we assume that files have already been pruned using any 
non-dynamic predicates so additional pruning may happen ONLY when new dynamic 
predicates are available??)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -384,6 +353,24 @@ impl FileOpener for ParquetOpener {
                 .map(move |maybe_batch| {
                     maybe_batch
                         .and_then(|b| 
schema_mapping.map_batch(b).map_err(Into::into))
+                })
+                .take_while(move |_| {
+                    if let Some(file_pruner) = file_pruner.as_ref() {
+                        match file_pruner.should_prune() {

Review Comment:
   This is basically applying the filter on each record batch, right? 
   
   I think once we can actually push the filters into the parquet scan (which I 
realize I have been talking about for months...) this could become be entirely 
redundant
   
   On the other hand, it also stops the input immediately if we find out the 
file should stop 🤔 
   
   



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +512,91 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+/// Prune based on partition values and file-level statistics.
+pub struct FilePruner {
+    predicate: Arc<dyn PhysicalExpr>,
+    pruning_schema: Arc<Schema>,
+    file: PartitionedFile,
+    partition_fields: Vec<FieldRef>,
+    predicate_creation_errors: Count,
+}
+
+impl FilePruner {
+    pub fn new_opt(
+        predicate: Arc<dyn PhysicalExpr>,
+        logical_file_schema: &SchemaRef,
+        partition_fields: Vec<FieldRef>,
+        file: PartitionedFile,
+        predicate_creation_errors: Count,
+    ) -> Result<Option<Self>> {
+        // If there is not dynamic predicate, we don't need to prune
+        if !is_dynamic_physical_expr(Arc::clone(&predicate))? {
+            return Ok(None);
+        }
+        // Build a pruning schema that combines the file fields and partition 
fields.
+        // Partition fileds are always at the end.
+        let pruning_schema = Arc::new(
+            Schema::new(
+                logical_file_schema
+                    .fields()
+                    .iter()
+                    .cloned()
+                    .chain(partition_fields.iter().cloned())
+                    .collect_vec(),
+            )
+            .with_metadata(logical_file_schema.metadata().clone()),
+        );
+        Ok(Some(Self {
+            predicate,
+            pruning_schema,
+            file,
+            partition_fields,
+            predicate_creation_errors,
+        }))
+    }
+
+    pub fn should_prune(&self) -> Result<bool> {
+        let pruning_predicate = build_pruning_predicate(
+            Arc::clone(&self.predicate),
+            &self.pruning_schema,
+            &self.predicate_creation_errors,
+        );

Review Comment:
   it happens once per file, right? 
   
   If so I agree that doing it as a follow on optimization sounds good. 
   
   However, I recommend we file a ticket while this is all in our heads / we 
have the full context otherwise we'll forget what to do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to