rkrishn7 opened a new issue, #21096:
URL: https://github.com/apache/datafusion/issues/21096
### Is your feature request related to a problem or challenge?
When `EnforceDistribution` processes a child that requires `SinglePartition`
or `HashPartitioned` distribution but has no ordering requirement, it
unconditionally calls `replace_order_preserving_variants`, replacing
`SortPreservingMergeExec` with `CoalescePartitionsExec`. This destroys ordering
even when the parent declares `maintains_input_order = true`, meaning the
ordering would have flowed through to ancestors that may benefit from it.
For example, consider a plan where a custom operator (like a remote
execution node) requires `SinglePartition` input and maintains input order,
nested inside `UnionExec` branches:
```
SortRequiredExec [col ASC]
UnionExec
RemoteExec # requires SinglePartition,
maintains order
DataSourceExec (N partitions, sorted [col ASC])
RemoteExec
DataSourceExec (N partitions, sorted [col ASC])
```
`EnforceDistribution` correctly inserts `SortPreservingMergeExec` to satisfy
the `SinglePartition` requirement while preserving per-partition sort ordering.
But it then immediately replaces SPM with `CoalescePartitionsExec`, destroying
the ordering. This cascades upward — `UnionExec` now has unordered children,
and `EnforceSorting` must insert a full `SortExec` to re-establish ordering,
which is significantly more expensive than the merge that was removed.
The `UnspecifiedDistribution` branch already correctly checks
`maintains_input_order` before replacing order-preserving variants, but
`SinglePartition || HashPartitioned` does not.
### Describe the solution you'd like
Add a `!maintains` guard to the `SinglePartition || HashPartitioned` branch
in `ensure_distribution`, aligning it with the existing
`UnspecifiedDistribution` branch:
```rust
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
if !maintains {
child = replace_order_preserving_variants(child)?;
}
}
```
This ensures that when a parent maintains input order,
`SortPreservingMergeExec` is preserved so ordering can propagate to ancestors
that need it.
### Describe alternatives you've considered
_No response_
### Additional context
This issue is particularly impactful when `prefer_existing_sort` is enabled.
With `prefer_existing_sort = true`, the optimizer avoids stripping
order-preserving variants at ancestor nodes that have ordering requirements.
This means the ordering preserved by SPM inside each branch would successfully
flow through the entire tree — but only if SPM survives the
`EnforceDistribution` pass.
There is more context from the specific case that prompted this here:
https://github.com/massive-com/arrow-datafusion/pull/35
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]