adriangb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2004247883


##########
datafusion/common/src/config.rs:
##########
@@ -590,6 +590,9 @@ config_namespace! {
         /// during aggregations, if possible
         pub enable_topk_aggregation: bool, default = true
 
+        /// When set to true attempts to push down dynamic filters from TopK 
operations into file scans
+        pub enable_dynamic_filter_pushdown: bool, default = true

Review Comment:
   Probably should be false by default



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -294,3 +345,127 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+    predicate: Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+    match PruningPredicate::try_new(predicate.clone(), 
Arc::clone(file_schema)) {
+        Ok(pruning_predicate) => {
+            if !pruning_predicate.always_true() {
+                return Some(Arc::new(pruning_predicate));
+            }
+        }
+        Err(e) => {
+            debug!("Could not create pruning predicate for: {e}");
+            predicate_creation_errors.add(1);
+        }
+    }
+    None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page 
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+    Arc::new(PagePruningAccessPlanFilter::new(
+        predicate,
+        Arc::clone(file_schema),
+    ))
+}
+
+/// A vistor for a PhysicalExpr that collects all column references to 
determine what columns the expression needs to be evaluated.
+struct FilterSchemaBuilder<'schema> {
+    filter_schema_fields: BTreeSet<Arc<Field>>,
+    file_schema: &'schema Schema,
+    table_schema: &'schema Schema,
+}
+
+impl<'schema> FilterSchemaBuilder<'schema> {
+    fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> 
Self {
+        Self {
+            filter_schema_fields: BTreeSet::new(),
+            file_schema,
+            table_schema,
+        }
+    }
+
+    fn sort_fields(
+        fields: &mut Vec<Arc<Field>>,
+        table_schema: &Schema,
+        file_schema: &Schema,
+    ) {
+        fields.sort_by_key(|f| f.name().to_string());
+        fields.dedup_by_key(|f| f.name().to_string());
+        fields.sort_by_key(|f| {
+            let table_schema_index =
+                table_schema.index_of(f.name()).unwrap_or(usize::MAX);
+            let file_schema_index = 
file_schema.index_of(f.name()).unwrap_or(usize::MAX);
+            (table_schema_index, file_schema_index)
+        });
+    }
+
+    fn build(self) -> SchemaRef {
+        let mut fields = 
self.filter_schema_fields.into_iter().collect::<Vec<_>>();
+        FilterSchemaBuilder::sort_fields(
+            &mut fields,
+            self.table_schema,
+            self.file_schema,
+        );
+        Arc::new(Schema::new(fields))
+    }
+}
+
+impl<'node> TreeNodeVisitor<'node> for FilterSchemaBuilder<'_> {
+    type Node = Arc<dyn PhysicalExpr>;
+
+    fn f_down(
+        &mut self,
+        node: &'node Arc<dyn PhysicalExpr>,
+    ) -> Result<TreeNodeRecursion> {
+        if let Some(column) = node.as_any().downcast_ref::<Column>() {
+            if let Ok(field) = 
self.table_schema.field_with_name(column.name()) {
+                self.filter_schema_fields.insert(Arc::new(field.clone()));
+            } else if let Ok(field) = 
self.file_schema.field_with_name(column.name()) {
+                self.filter_schema_fields.insert(Arc::new(field.clone()));
+            } else {
+                // valid fields are the table schema's fields + the file 
schema's fields, preferring the table schema's fields when there is a conflict
+                let mut valid_fields = self
+                    .table_schema
+                    .fields()
+                    .iter()
+                    .chain(self.file_schema.fields().iter())
+                    .cloned()
+                    .collect::<Vec<_>>();
+                FilterSchemaBuilder::sort_fields(
+                    &mut valid_fields,
+                    self.table_schema,
+                    self.file_schema,
+                );
+                let valid_fields = valid_fields
+                    .into_iter()
+                    .map(|f| 
datafusion_common::Column::new_unqualified(f.name()))
+                    .collect();
+                let field = 
datafusion_common::Column::new_unqualified(column.name());
+                return Err(datafusion_common::DataFusionError::SchemaError(
+                    SchemaError::FieldNotFound {
+                        field: Box::new(field),
+                        valid_fields,
+                    },
+                    Box::new(None),
+                ));
+            }
+        }
+
+        Ok(TreeNodeRecursion::Continue)
+    }
+}

Review Comment:
   Copied from https://github.com/apache/datafusion/pull/15057
   
   The point I'm trying to make is that I think this is both a useful change 
and that the real diff here or there will be smaller once the other is merged



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1655,4 +1656,46 @@ mod tests {
         assert_eq!(calls.len(), 2);
         assert_eq!(calls, vec![Some(123), Some(456)]);
     }
+
+    #[tokio::test]
+    async fn test_topk_predicate_pushdown() {
+        let ctx = SessionContext::new();
+        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
+            // We need to force 1 partition because TopK predicate pushdown 
happens on a per-partition basis
+            // If we had 1 file per partition (as an example) no pushdown 
would happen
+            .with_target_partitions(1);
+
+        let tmp_dir = TempDir::new().unwrap();
+        let path = tmp_dir.path().to_str().unwrap().to_string();
+        // The point here is that we write many, many files.
+        // So when we scan after we processed the first one we should be able 
to skip the rest
+        // because of the TopK predicate pushdown.
+        for file in 0..100 {
+            let name = format!("test{:02}.parquet", file);
+            write_file(&format!("{path}/{name}"));
+        }
+        ctx.register_listing_table("base_table", path, opt, None, None)
+            .await
+            .unwrap();
+
+        let query = "select name from base_table order by id desc limit 3";
+
+        let batches = ctx.sql(query).await.unwrap().collect().await.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+--------+",
+            "| name   |",
+            "+--------+",
+            "| test02 |",
+            "| test02 |",
+            "| test02 |",
+            "+--------+",
+        ];
+        assert_batches_eq!(expected, &batches);
+
+        let sql = format!("explain analyze {query}");
+        let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap();
+        let explain_plan = format!("{}", 
pretty_format_batches(&batches).unwrap());
+        assert_contains!(explain_plan, "row_groups_pruned_statistics=96");

Review Comment:
   Proof this works!



##########
datafusion/datasource-parquet/src/mod.rs:
##########
@@ -541,11 +541,13 @@ impl ExecutionPlan for ParquetExec {
 fn should_enable_page_index(
     enable_page_index: bool,
     page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
+    has_dynamic_filters: bool,
 ) -> bool {
     enable_page_index
-        && page_pruning_predicate.is_some()
-        && page_pruning_predicate
-            .as_ref()
-            .map(|p| p.filter_number() > 0)
-            .unwrap_or(false)
+        && (page_pruning_predicate.is_some()
+            && page_pruning_predicate
+                .as_ref()
+                .map(|p| p.filter_number() > 0)
+                .unwrap_or(false))
+        || has_dynamic_filters
 }

Review Comment:
   Copied from https://github.com/apache/datafusion/pull/15057



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -587,4 +578,17 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn supports_dynamic_filter_pushdown(&self) -> bool {
+        true
+    }

Review Comment:
   I guess I could nix this method and just treat a `Ok(None)` from 
`push_down_dynamic_filter` as not supported



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -147,6 +180,24 @@ impl FileOpener for ParquetOpener {
 
             let file_schema = Arc::clone(builder.schema());
 
+            let mut pruning_predicate = pruning_predicate;
+            let mut page_pruning_predicate = page_pruning_predicate;
+
+            if let Some(predicate) = predicate.as_ref() {

Review Comment:
   Similar to https://github.com/apache/datafusion/pull/15057



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -282,6 +327,54 @@ impl TopKHeap {
         }
     }
 
+    /// Get threshold values for all columns in the given sort expressions.
+    /// If the heap does not yet have k items, returns None.
+    /// Otherwise, returns the threshold values from the max row in the heap.
+    pub fn get_threshold_values(
+        &self,
+        sort_exprs: &[PhysicalSortExpr],
+    ) -> Result<Option<Vec<ColumnThreshold>>> {
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        let max_row = match self.max() {
+            Some(row) => row,
+            None => return Ok(None),
+        };
+
+        // Get the batch that contains the max row
+        let batch_entry = match self.store.get(max_row.batch_id) {
+            Some(entry) => entry,
+            None => return internal_err!("Invalid batch ID in TopKRow"),
+        };
+
+        // Extract threshold values for each sort expression
+        let mut thresholds = Vec::with_capacity(sort_exprs.len());
+        for sort_expr in sort_exprs {
+            // Extract the value for this column from the max row
+            let expr = Arc::clone(&sort_expr.expr);
+            let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 
1))?;
+
+            // Convert to scalar value - should be a single value since we're 
evaluating on a single row batch
+            let scalar = match value {
+                ColumnarValue::Scalar(scalar) => scalar,
+                ColumnarValue::Array(array) if array.len() == 1 => {
+                    // Extract the first (and only) value from the array
+                    datafusion_common::ScalarValue::try_from_array(&array, 0)?
+                }
+                array => {
+                    return internal_err!("Expected a scalar value, got {:?}", 
array)
+                }
+            };
+
+            thresholds.push(ColumnThreshold {
+                expr,
+                value: scalar,
+                sort_options: sort_expr.options,
+            });
+        }
+
+        Ok(Some(thresholds))
+    }

Review Comment:
   This deserves some scrutiny. I'm not sure if there's a better way to do 
this, or if it's wrong for some cases, etc.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -644,10 +737,72 @@ impl RecordBatchStore {
     }
 }
 
+struct TopKDynamicFilterSource {
+    /// The TopK heap that provides the current filters
+    heap: Arc<RwLock<TopKHeap>>,
+    /// The sort expressions used to create the TopK
+    expr: Arc<[PhysicalSortExpr]>,
+}
+
+impl std::fmt::Debug for TopKDynamicFilterSource {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TopKDynamicFilterSource")
+            .field("expr", &self.expr)
+            .finish()
+    }
+}
+
+impl DynamicFilterSource for TopKDynamicFilterSource {
+    fn current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+        let heap_guard = self.heap.read().map_err(|_| {
+            DataFusionError::Internal(
+                "Failed to acquire read lock on TopK heap".to_string(),
+            )
+        })?;
+
+        // Get the threshold values for all sort expressions
+        let Some(thresholds) = heap_guard.get_threshold_values(&self.expr)? 
else {
+            return Ok(vec![]); // No thresholds available yet
+        };
+
+        // Create filter expressions for each threshold
+        let mut filters: Vec<Arc<dyn PhysicalExpr>> =
+            Vec::with_capacity(thresholds.len());
+
+        for threshold in thresholds {
+            // Skip null threshold values - can't create a meaningful filter
+            if threshold.value.is_null() {
+                continue;
+            }
+
+            // Create the appropriate operator based on sort order
+            let op = if threshold.sort_options.descending {
+                // For descending sort, we want col > threshold (exclude 
smaller values)
+                Operator::Gt
+            } else {
+                // For ascending sort, we want col < threshold (exclude larger 
values)
+                Operator::Lt
+            };
+
+            let comparison = Arc::new(BinaryExpr::new(
+                Arc::clone(&threshold.expr),
+                op,
+                lit(threshold.value.clone()),
+            ));
+
+            // TODO: handle nulls first/last?

Review Comment:
   !!
   Not sure exactly how to translate that into a dynamic filter...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to