alamb commented on code in PR #21107:
URL: https://github.com/apache/datafusion/pull/21107#discussion_r2981773938


##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -987,15 +987,14 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
 02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as 
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
-03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], 
preserve_partitioning=[true]
-04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), 
input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], 
aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
-06)----------ProjectionExec: expr=[generated_id@0 as generated_id, 
__unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
-07)------------UnnestExec
-08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as 
__unnest_placeholder(make_array(range().value))]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1, maintains_sort_order=true
-10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: 
start=1, end=5, batch_size=8192]
+03)----RepartitionExec: partitioning=Hash([generated_id@0], 4), 
input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS 
LAST
+04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], 
aggr=[array_agg(unnested.ar)], ordering_mode=Sorted

Review Comment:
   this plan looks better to me (it still is fully parallelized and now uses 
avoids an unecessary hash repartition 



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -928,6 +928,47 @@ fn add_hash_on_top(
 ///
 /// * `input`: Current node.
 ///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns false for multi-child
+/// operators (e.g. joins) where child substitution semantics are ambiguous.
+fn preserving_order_enables_streaming(
+    parent: &Arc<dyn ExecutionPlan>,
+    ordered_child: &Arc<dyn ExecutionPlan>,
+) -> bool {
+    // Only applicable to single-child operators that maintain input order
+    // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+    // maintain input order (e.g. SortExec) handle ordering themselves —
+    // preserving SPM for them is unnecessary.
+    if parent.children().len() != 1 {
+        return false;
+    }
+    if !parent.maintains_input_order()[0] {
+        return false;
+    }
+    // Build parent with the ordered child
+    let with_ordered =
+        match 
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
+            Ok(p) => p,
+            Err(_) => return false,
+        };
+    if with_ordered.pipeline_behavior() == EmissionType::Final {
+        // Parent is blocking even with ordering — no benefit
+        return false;
+    }
+    // Build parent with an unordered child (simulating CoalescePartitionsExec)

Review Comment:
   this commet is strange to me -- the code adds a CoalescePartitionsExec -- so 
I don't think it is "simulating" anything



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -928,6 +928,47 @@ fn add_hash_on_top(
 ///
 /// * `input`: Current node.
 ///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns false for multi-child
+/// operators (e.g. joins) where child substitution semantics are ambiguous.
+fn preserving_order_enables_streaming(
+    parent: &Arc<dyn ExecutionPlan>,
+    ordered_child: &Arc<dyn ExecutionPlan>,
+) -> bool {
+    // Only applicable to single-child operators that maintain input order
+    // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+    // maintain input order (e.g. SortExec) handle ordering themselves —
+    // preserving SPM for them is unnecessary.
+    if parent.children().len() != 1 {
+        return false;
+    }
+    if !parent.maintains_input_order()[0] {
+        return false;
+    }
+    // Build parent with the ordered child
+    let with_ordered =
+        match 
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
+            Ok(p) => p,
+            Err(_) => return false,

Review Comment:
   I don't think we should ignore the Err here  or below as it could mask real 
errors / a bug with this code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to