This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch agg_order
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 12771ca4351bfa093d0ded7841d72f43e1c17aa4
Author: xudong.w <[email protected]>
AuthorDate: Wed Mar 25 17:33:36 2026 +0800

    address comments
---
 .../physical-optimizer/src/enforce_distribution.rs | 33 +++++++++++-----------
 1 file changed, 16 insertions(+), 17 deletions(-)

diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 16497eb723..5cc1c52294 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -934,39 +934,35 @@ fn add_hash_on_top(
 /// ordering would cause the parent to switch from streaming to blocking,
 /// keeping the order-preserving variant is beneficial.
 ///
-/// Only applicable to single-child operators; returns false for multi-child
-/// operators (e.g. joins) where child substitution semantics are ambiguous.
+/// Only applicable to single-child operators; returns `Ok(false)` for
+/// multi-child operators (e.g. joins) where child substitution semantics are
+/// ambiguous.
 fn preserving_order_enables_streaming(
     parent: &Arc<dyn ExecutionPlan>,
     ordered_child: &Arc<dyn ExecutionPlan>,
-) -> bool {
+) -> Result<bool> {
     // Only applicable to single-child operators that maintain input order
     // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
     // maintain input order (e.g. SortExec) handle ordering themselves —
     // preserving SPM for them is unnecessary.
     if parent.children().len() != 1 {
-        return false;
+        return Ok(false);
     }
     if !parent.maintains_input_order()[0] {
-        return false;
+        return Ok(false);
     }
     // Build parent with the ordered child
     let with_ordered =
-        match 
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
-            Ok(p) => p,
-            Err(_) => return false,
-        };
+        Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
     if with_ordered.pipeline_behavior() == EmissionType::Final {
         // Parent is blocking even with ordering — no benefit
-        return false;
+        return Ok(false);
     }
-    // Build parent with an unordered child (simulating CoalescePartitionsExec)
+    // Build parent with an unordered child via CoalescePartitionsExec.
     let unordered_child: Arc<dyn ExecutionPlan> =
         Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
-    match Arc::clone(parent).with_new_children(vec![unordered_child]) {
-        Ok(without_ordered) => without_ordered.pipeline_behavior() == 
EmissionType::Final,
-        Err(_) => false,
-    }
+    let without_ordered = 
Arc::clone(parent).with_new_children(vec![unordered_child])?;
+    Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
 }
 
 /// # Returns
@@ -1381,8 +1377,11 @@ pub fn ensure_distribution(
                 }
             };
 
-            let streaming_benefit = child.data
-                && preserving_order_enables_streaming(&plan, &child.plan);
+            let streaming_benefit = if child.data {
+                preserving_order_enables_streaming(&plan, &child.plan)?
+            } else {
+                false
+            };
 
             // There is an ordering requirement of the operator:
             if let Some(required_input_ordering) = required_input_ordering {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to