This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch agg_order in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 12771ca4351bfa093d0ded7841d72f43e1c17aa4 Author: xudong.w <[email protected]> AuthorDate: Wed Mar 25 17:33:36 2026 +0800 address comments --- .../physical-optimizer/src/enforce_distribution.rs | 33 +++++++++++----------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 16497eb723..5cc1c52294 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -934,39 +934,35 @@ fn add_hash_on_top( /// ordering would cause the parent to switch from streaming to blocking, /// keeping the order-preserving variant is beneficial. /// -/// Only applicable to single-child operators; returns false for multi-child -/// operators (e.g. joins) where child substitution semantics are ambiguous. +/// Only applicable to single-child operators; returns `Ok(false)` for +/// multi-child operators (e.g. joins) where child substitution semantics are +/// ambiguous. fn preserving_order_enables_streaming( parent: &Arc<dyn ExecutionPlan>, ordered_child: &Arc<dyn ExecutionPlan>, -) -> bool { +) -> Result<bool> { // Only applicable to single-child operators that maintain input order // (e.g. AggregateExec in PartiallySorted mode). Operators that don't // maintain input order (e.g. SortExec) handle ordering themselves — // preserving SPM for them is unnecessary. if parent.children().len() != 1 { - return false; + return Ok(false); } if !parent.maintains_input_order()[0] { - return false; + return Ok(false); } // Build parent with the ordered child let with_ordered = - match Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) { - Ok(p) => p, - Err(_) => return false, - }; + Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?; if with_ordered.pipeline_behavior() == EmissionType::Final { // Parent is blocking even with ordering — no benefit - return false; + return Ok(false); } - // Build parent with an unordered child (simulating CoalescePartitionsExec) + // Build parent with an unordered child via CoalescePartitionsExec. let unordered_child: Arc<dyn ExecutionPlan> = Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child))); - match Arc::clone(parent).with_new_children(vec![unordered_child]) { - Ok(without_ordered) => without_ordered.pipeline_behavior() == EmissionType::Final, - Err(_) => false, - } + let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?; + Ok(without_ordered.pipeline_behavior() == EmissionType::Final) } /// # Returns @@ -1381,8 +1377,11 @@ pub fn ensure_distribution( } }; - let streaming_benefit = child.data - && preserving_order_enables_streaming(&plan, &child.plan); + let streaming_benefit = if child.data { + preserving_order_enables_streaming(&plan, &child.plan)? + } else { + false + }; // There is an ordering requirement of the operator: if let Some(required_input_ordering) = required_input_ordering { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
