alamb commented on code in PR #21107:
URL: https://github.com/apache/datafusion/pull/21107#discussion_r2981773938
##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -987,15 +987,14 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
-03)----SortExec: expr=[generated_id@0 ASC NULLS LAST],
preserve_partitioning=[true]
-04)------RepartitionExec: partitioning=Hash([generated_id@0], 4),
input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id],
aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
-06)----------ProjectionExec: expr=[generated_id@0 as generated_id,
__unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
-07)------------UnnestExec
-08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as
__unnest_placeholder(make_array(range().value))]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
-10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range:
start=1, end=5, batch_size=8192]
+03)----RepartitionExec: partitioning=Hash([generated_id@0], 4),
input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS
LAST
+04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id],
aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
Review Comment:
this plan looks better to me (it still is fully parallelized and now uses
avoids an unecessary hash repartition
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -928,6 +928,47 @@ fn add_hash_on_top(
///
/// * `input`: Current node.
///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns false for multi-child
+/// operators (e.g. joins) where child substitution semantics are ambiguous.
+fn preserving_order_enables_streaming(
+ parent: &Arc<dyn ExecutionPlan>,
+ ordered_child: &Arc<dyn ExecutionPlan>,
+) -> bool {
+ // Only applicable to single-child operators that maintain input order
+ // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+ // maintain input order (e.g. SortExec) handle ordering themselves —
+ // preserving SPM for them is unnecessary.
+ if parent.children().len() != 1 {
+ return false;
+ }
+ if !parent.maintains_input_order()[0] {
+ return false;
+ }
+ // Build parent with the ordered child
+ let with_ordered =
+ match
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
+ Ok(p) => p,
+ Err(_) => return false,
+ };
+ if with_ordered.pipeline_behavior() == EmissionType::Final {
+ // Parent is blocking even with ordering — no benefit
+ return false;
+ }
+ // Build parent with an unordered child (simulating CoalescePartitionsExec)
Review Comment:
this commet is strange to me -- the code adds a CoalescePartitionsExec -- so
I don't think it is "simulating" anything
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -928,6 +928,47 @@ fn add_hash_on_top(
///
/// * `input`: Current node.
///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns false for multi-child
+/// operators (e.g. joins) where child substitution semantics are ambiguous.
+fn preserving_order_enables_streaming(
+ parent: &Arc<dyn ExecutionPlan>,
+ ordered_child: &Arc<dyn ExecutionPlan>,
+) -> bool {
+ // Only applicable to single-child operators that maintain input order
+ // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+ // maintain input order (e.g. SortExec) handle ordering themselves —
+ // preserving SPM for them is unnecessary.
+ if parent.children().len() != 1 {
+ return false;
+ }
+ if !parent.maintains_input_order()[0] {
+ return false;
+ }
+ // Build parent with the ordered child
+ let with_ordered =
+ match
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
+ Ok(p) => p,
+ Err(_) => return false,
Review Comment:
I don't think we should ignore the Err here or below as it could mask real
errors / a bug with this code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]