zhuqi-lucas commented on code in PR #21580:
URL: https://github.com/apache/datafusion/pull/21580#discussion_r3123894244
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1166,157 @@ impl RowGroupsPrunedParquetOpen {
);
}
- // Prepare the access plan (extract row groups and row selection)
+ // Row group ordering optimization (two composable steps):
+ //
+ // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+ // with the file's declared output ordering. This fixes out-of-order
+ // RGs (e.g., from append-heavy workloads) without changing
direction.
+ // Skipped gracefully when statistics are unavailable.
+ //
+ // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+ // so the reversed order is correct whether or not reorder changed
+ // anything. Also handles row_selection remapping.
+ //
+ // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+ // For unsorted data: reorder fixes the order, reverse flips for DESC.
+ // Build reorder optimizer from sort_order_for_reorder (Inexact path)
+ // or from DynamicFilterPhysicalExpr sort_options (any TopK query).
+ // Fuzz test uses tiebreaker columns so reorder is safe for all TopK.
+ let reorder_optimizer: Option<
+ Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+ > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+ Some(
+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+ sort_order.clone(),
+ ))
+ as Box<dyn
crate::access_plan_optimizer::AccessPlanOptimizer>,
+ )
+ } else if let Some(predicate) = &prepared.predicate
+ && let Some(df) = find_dynamic_filter(predicate)
+ && let Some(sort_options) = df.sort_options()
+ && sort_options.len() == 1
+ {
+ // Build a sort order from DynamicFilter for non-sort-pushdown
TopK.
+ // Quick bail: check if the sort column exists in file schema.
+ // For GROUP BY + ORDER BY, the sort column is an aggregate output
+ // (not in parquet) — skip to avoid wasted StatisticsConverter
work.
+ let children = df.children();
+ if !children.is_empty() {
+ let col = find_column_in_expr(children[0]);
+ if let Some(ref c) = col
+ && prepared
+ .physical_file_schema
+ .field_with_name(c.name())
+ .is_ok()
+ {
+ let sort_expr =
+
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
+ expr: Arc::clone(children[0]),
+ options: arrow::compute::SortOptions {
+ descending: false,
+ nulls_first: sort_options[0].nulls_first,
+ },
+ };
+ LexOrdering::new(vec![sort_expr]).map(|order| {
+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+ order,
+ ))
+ as Box<dyn
crate::access_plan_optimizer::AccessPlanOptimizer>
+ })
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ // Reverse for DESC queries. Only when reorder is active (the sort
+ // column exists in parquet stats). Without reorder, reversing RGs
+ // randomly changes I/O patterns with no benefit.
+ let is_descending = prepared.reverse_row_groups
+ || (reorder_optimizer.is_some()
+ && prepared
+ .predicate
+ .as_ref()
+ .and_then(find_dynamic_filter)
+ .and_then(|df| df.sort_options().map(|opts|
opts[0].descending))
+ .unwrap_or(false));
+ let reverse_optimizer: Option<
+ Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+ > = if is_descending {
+ Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
+ } else {
+ None
+ };
+
+ // Prepare the access plan and apply optimizers in order:
+ // 1. reorder (fix out-of-order RGs to match declared ordering)
+ // 2. reverse (flip for DESC queries)
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+ if let Some(opt) = &reorder_optimizer {
+ prepared_plan = opt.optimize(
+ prepared_plan,
+ file_metadata.as_ref(),
+ &prepared.physical_file_schema,
+ )?;
+ }
+ if let Some(opt) = &reverse_optimizer {
+ prepared_plan = opt.optimize(
+ prepared_plan,
+ file_metadata.as_ref(),
+ &prepared.physical_file_schema,
+ )?;
+ }
- // Potentially reverse the access plan for performance.
- // See `ParquetSource::try_pushdown_sort` for the rationale.
- if prepared.reverse_row_groups {
- prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
+ // TopK cumulative pruning: after reorder + reverse, the RGs are in
+ // optimal order. Accumulate rows from the front until >= K, prune
rest.
+ //
+ // Only safe when predicate is DynamicFilter-only (no WHERE clause).
+ // With WHERE, raw num_rows overestimates qualifying rows — cumulative
+ // prune may keep too few RGs, returning fewer than K results.
+ //
+ // Additionally requires either sort pushdown (guaranteed
non-overlapping)
+ // or verified non-overlap from statistics.
+ let is_pure_dynamic_filter =
prepared.predicate.as_ref().is_some_and(|p| {
+ let any_ref: &dyn std::any::Any = p.as_ref();
+ any_ref
+ .downcast_ref::<DynamicFilterPhysicalExpr>()
+ .is_some()
+ });
+ let has_sort_pushdown = prepared.sort_order_for_reorder.is_some();
+ if is_pure_dynamic_filter
+ && let Some(predicate) = &prepared.predicate
+ && let Some(df) = find_dynamic_filter(predicate)
+ && let Some(fetch) = df.fetch()
+ && (has_sort_pushdown
+ || rgs_are_non_overlapping(
+ &prepared_plan,
+ file_metadata.as_ref(),
+ &prepared.physical_file_schema,
+ &df,
+ ))
+ {
+ let rg_indexes = prepared_plan.row_group_indexes();
+ let mut cumulative = 0usize;
+ let mut keep_count = 0;
+ for &idx in rg_indexes {
+ cumulative += file_metadata.row_group(idx).num_rows() as usize;
Review Comment:
Good point. Currently create_filter and fetch are set in the same method
(with_fetch), and we fixed the ordering so fetch is set before create_filter is
called. There's no separate code path that updates fetch
without recreating the filter.
But you're right that this coupling is fragile, if a future optimizer calls
with_fetch independently, the filter's fetch would go stale. I'll optimize it
as follow-up to consider making fetch on DynamicFilterPhysicalExpr read
directly from SortExec.fetch (via shared reference) instead of copying the
value at creation time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]