alamb commented on code in PR #21976:
URL: https://github.com/apache/datafusion/pull/21976#discussion_r3243911160


##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -970,7 +813,10 @@ fn preserving_order_enables_streaming(
 ///
 /// Updated node with an execution plan, where the desired single distribution
 /// requirement is satisfied.
-fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
+fn add_merge_on_top(

Review Comment:
   It seems like as a follow on some of this code coudl be consolidated into 
enforce_requirements



##########
datafusion/core/src/optimizer_rule_reference.md:
##########
@@ -67,27 +67,26 @@ Rule order matters. The default pipeline may change between 
releases.
 The same rule name may appear more than once when the default pipeline runs it
 in multiple phases.
 
-| order | rule                           | phase                   | summary   
                                                                                
                   |
-| ----- | ------------------------------ | ----------------------- | 
------------------------------------------------------------------------------------------------------------
 |
-| 1     | `OutputRequirements`           | add phase               | Adds 
helper nodes so output requirements survive later physical rewrites.            
                        |
-| 2     | `aggregate_statistics`         | -                       | Uses 
exact source statistics to answer some aggregates without scanning data.        
                        |
-| 3     | `join_selection`               | -                       | Chooses 
join implementation, build side, and partition mode from statistics and stream 
properties.           |
-| 4     | `LimitedDistinctAggregation`   | -                       | Pushes 
limit hints into grouped distinct-style aggregations when only a small result 
is needed.              |
-| 5     | `FilterPushdown`               | pre-optimization phase  | Pushes 
supported physical filters down toward data sources before distribution and 
sorting are enforced.     |
-| 6     | `EnforceDistribution`          | -                       | Adds 
repartitioning only where needed to satisfy physical distribution requirements. 
                        |
-| 7     | `CombinePartialFinalAggregate` | -                       | Collapses 
adjacent partial and final aggregates when the distributed shape makes them 
redundant.             |
-| 8     | `EnforceSorting`               | -                       | Adds or 
removes local sorts to satisfy required input orderings.                        
                     |
-| 9     | `OptimizeAggregateOrder`       | -                       | Updates 
aggregate expressions to use the best ordering once sort requirements are 
known.                     |
-| 10    | `WindowTopN`                   | -                       | Replaces 
eligible row-number window and filter patterns with per-partition TopK 
execution.                   |
-| 11    | `ProjectionPushdown`           | early pass              | Pushes 
projections toward inputs before later physical rewrites add more limit and 
TopK structure.           |
-| 12    | `OutputRequirements`           | remove phase            | Removes 
the temporary output-requirement helper nodes after requirement-sensitive 
planning is done.          |
-| 13    | `LimitAggregation`             | -                       | Passes a 
limit hint into eligible aggregations so they can keep fewer accumulator 
buckets.                   |
-| 14    | `LimitPushPastWindows`         | -                       | Pushes 
fetch limits through bounded window operators when doing so keeps the result 
correct.                 |
-| 15    | `HashJoinBuffering`            | -                       | Adds 
buffering on the probe side of hash joins so probing can start before build 
completion.                 |
-| 16    | `LimitPushdown`                | -                       | Moves 
physical limits into child operators or fetch-enabled variants to cut data 
early.                      |
-| 17    | `TopKRepartition`              | -                       | Pushes 
TopK below hash repartition when the partition key is a prefix of the sort key. 
                      |
-| 18    | `ProjectionPushdown`           | late pass               | Runs 
projection pushdown again after limit and TopK rewrites expose new pruning 
opportunities.               |
-| 19    | `PushdownSort`                 | -                       | Pushes 
sort requirements into data sources that can already return sorted output.      
                      |
-| 20    | `EnsureCooperative`            | -                       | Wraps 
non-cooperative plan parts so long-running tasks yield fairly.                  
                       |
-| 21    | `FilterPushdown(Post)`         | post-optimization phase | Pushes 
dynamic filters at the end of optimization, after plan references stop moving.  
                      |
-| 22    | `SanityCheckPlan`              | -                       | Validates 
that the final physical plan meets ordering, distribution, and infinite-input 
safety requirements. |
+| order | rule                           | phase                   | summary   
                                                                                
                                       |
+| ----- | ------------------------------ | ----------------------- | 
--------------------------------------------------------------------------------------------------------------------------------
 |
+| 1     | `OutputRequirements`           | add phase               | Adds 
helper nodes so output requirements survive later physical rewrites.            
                                            |
+| 2     | `aggregate_statistics`         | -                       | Uses 
exact source statistics to answer some aggregates without scanning data.        
                                            |
+| 3     | `join_selection`               | -                       | Chooses 
join implementation, build side, and partition mode from statistics and stream 
properties.                               |
+| 4     | `LimitedDistinctAggregation`   | -                       | Pushes 
limit hints into grouped distinct-style aggregations when only a small result 
is needed.                                  |
+| 5     | `FilterPushdown`               | pre-optimization phase  | Pushes 
supported physical filters down toward data sources before distribution and 
sorting are enforced.                         |
+| 6     | `EnsureRequirements`           | -                       | Enforces 
both distribution and sorting requirements in a single idempotent rule 
(replaces EnforceDistribution + EnforceSorting). |

Review Comment:
   we probably don't have to mention it "replaces" as when someone reads this 
the code will no longer have refrences to EnforceDistribution + EnforceSorting 



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)

Review Comment:
   Given this rule calls transform_up / transform_down several times I am not 
sure calling it a "single pass" is quite accurate. That being said, it seems 
like a very nice formulation and maybe as a follow on PR (s) we can consolidate 
the code more / combine some of the passes. 



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};
+        use crate::enforce_sorting::{PlanWithCorrespondingSort, 
ensure_sorting};
+
+        // Step 2a: Distribution enforcement (bottom-up)
+        let dist_ctx = DistributionContext::new_default(plan);
+        let dist_ctx = dist_ctx
+            .transform_up(|ctx| ensure_distribution(ctx, config))
+            .data()?;
+
+        // Step 2b: Sorting enforcement (bottom-up) — runs on 
distribution-fixed plan
+        let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan);
+        let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data;
+
+        // Phase 3: Optimization passes
+        // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort)
+        use crate::enforce_sorting::{
+            PlanWithCorrespondingCoalescePartitions, parallelize_sorts,
+            replace_with_partial_sort,
+        };
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            sort_ctx.plan
+        };
+
+        // 3b: Order-preserving variants
+        use crate::enforce_sorting::replace_with_order_preserving_variants::{
+            OrderPreservationContext, replace_with_order_preserving_variants,
+        };
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // 3c: Sort pushdown (distribution-aware)
+        use crate::enforce_sorting::sort_pushdown::{
+            SortPushDown, assign_initial_requirements, pushdown_sorts,
+        };
+        let mut sort_pushdown = SortPushDown::new_default(plan);
+        assign_initial_requirements(&mut sort_pushdown);
+        let adjusted = pushdown_sorts(sort_pushdown)?;
+
+        // 3d: Partial sort
+        adjusted
+            .plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
+/// by bottom-up passes. Currently experimental — some plan shapes differ
+/// from the `pushdown_sorts` variant (less optimal but still correct).
+#[derive(Default, Debug)]
+pub struct EnsureRequirementsNoPushdown {}
+
+impl EnsureRequirementsNoPushdown {
+    /// Create a new rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Step 1: Distribution enforcement
+        use crate::enforce_distribution::{
+            DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs,
+            adjust_input_keys_ordering as adj_keys, ensure_distribution as 
ensure_dist,
+        };
+        let top_down = config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down {
+            KeyReqs::new_default(plan)
+                .transform_down(adj_keys)
+                .data()?
+                .plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+        let dist_ctx = DistCtx::new_default(plan);
+        let plan = dist_ctx
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts)
+        let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
+
+        // Step 3: parallelize_sorts (optional)
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            adjusted.plan
+        };
+
+        // Step 4: order-preserving variants
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // Step 5: partial sort
+        let plan = plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()?;
+
+        // NO pushdown_sorts — sort placement is purely bottom-up.
+        // Step 6: Final distribution enforcement
+        let dist_ctx2 = DistCtx::new_default(plan);
+        let plan = dist_ctx2
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 7: Fix any sorting violations the final distribution pass 
introduced.
+        let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data;
+
+        Ok(adjusted2.plan)
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirementsNoPushdown"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {

Review Comment:
   There are over 1600 lines of tests in this module and I am finding it very 
hard to review. I wonder:
   1. Why add tests in this module (rather than the existing pattern of 
`core/tests/physical_optimizer/` (where you can use actual ExecutionPlans 
rather than mocking everything out)
   2. Are all these tests necessary? Since this seems mostly like an internal 
refactor and the exsiting code is covered by the existing tests, I wonder if 
there was a big testing gap you found? Or maybe the tests were intermediate 
work product that may not be necessary to commit to the repo?
   
   I asked claude what was redundant and it said some non trival number are (I 
didn't double check but its answers are here):
   
   <details>
   
   Tier 1 — Clearly redundant (delete)
   
     These use the identical plan shape SortExec(DESC) → 
MockMultiPartitionExec(N) → GlobalLimit(fetch) and the identical assertion as 
another test that strictly subsumes them:
   
     1. test_idempotent_various_partition_counts (mod.rs:691, sweeps 
{2,4,8,16,32,64}) — strictly subsumed by 
test_idempotent_all_partition_counts_1_to_64 (mod.rs:1188, sweeps 1..=64). Same 
topology, same
     assertion. Delete.
     2. test_idempotent_multi_partition_sort_limit (mod.rs:477, N=16) — 
subsumed by test_idempotent_all_partition_counts_1_to_64 at N=16. Delete.
     3. test_multi_partition_sort_limit_sanity_check (mod.rs:432, N=32) — 
subsumed by test_various_partition_counts_all_pass_sanity_check (mod.rs:663, 
sweeps {2,4,8,16,32,64}) at N=32. Same topology, same
     optimize_and_sanity_check assertion. Delete.
     4. test_triple_optimize_stable (mod.rs:1208, N=32, three passes) — same 
topology again. assert_idempotent proves f(f(x)) == f(x); for a deterministic 
optimizer that implies f^N(x) == f(x) for all N, so adding
      a third pass is mathematically redundant. The partition sweep already 
covers N=32. Delete.
   
     Tier 2 — Substantial overlap (consider combining)
   
     5. test_fetch_preserved_across_passes (mod.rs:629) — checks fetch=5 
appears after 1 pass on Sort → multi(4) → GlobalLimit(5). 
test_issue_14150_fetch_survives_multiple_passes (mod.rs:1259) checks the same
     property after 3 passes on a richer plan (Repartition → Sort(fetch=5) → 
GlobalLimit(5)) and is more representative of the actual bug. The single-pass 
test adds little once the multi-pass one exists.
     6. test_no_extra_spm_when_already_optimal (mod.rs:1006) and 
test_idempotent_spm_sort_multi_partition (mod.rs:1114) — identical input plan 
(SPM → Sort(preserve=true) → multi(10) → GlobalLimit(21)). First
     asserts SPM count ≤ 1; second asserts 2-pass equality. Since the input 
already has exactly one SPM, idempotency on this input implies no extra SPM is 
added. Either fold the SPM-count assertion into the
     idempotency test (so one test covers both), or drop one.
   
     Tier 3 — Mathematically over-strong but topology unique (trim, don't 
delete)
   
     7. test_idempotent_10x_complex (mod.rs:1763) — the union + projection + 
sort topology is unique among the new tests, so this should stay. But 10 passes 
is unnecessary — a 2-pass assert_idempotent call on the
     same plan gives the same guarantee. Consider just reducing it to use 
assert_idempotent.
   
     Tests that look similar but are NOT redundant
   
     - test_issue_14150_fetch_survives_multiple_passes vs. 
test_issue_14150_fetch_survives_with_input_spm — both target #14150 but with 
deliberately different input shapes (the latter is the "sharper reproduce"
     with an SPM(fetch) already in the input — the historical bug shape). Keep 
both.
     - test_projection_over_multi_partition_sort_limit vs. 
test_idempotent_projection_over_multi_partition_with_single_partition_requirement
 — first uses GlobalLimit, second uses
     OutputRequirementExec(SinglePartition). Different requirement source, 
different code path. Keep both.
     - test_output_requirement_single_partition_over_multi_partition_source vs. 
test_idempotent_output_requirement_single_partition — same topology but one 
checks SanityCheckPlan, the other checks idempotency.
     Mild overlap; could be merged into a single test asserting both, but each 
captures a distinct property.
   
   </details>



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};
+        use crate::enforce_sorting::{PlanWithCorrespondingSort, 
ensure_sorting};
+
+        // Step 2a: Distribution enforcement (bottom-up)
+        let dist_ctx = DistributionContext::new_default(plan);
+        let dist_ctx = dist_ctx
+            .transform_up(|ctx| ensure_distribution(ctx, config))
+            .data()?;
+
+        // Step 2b: Sorting enforcement (bottom-up) — runs on 
distribution-fixed plan
+        let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan);
+        let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data;
+
+        // Phase 3: Optimization passes
+        // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort)
+        use crate::enforce_sorting::{
+            PlanWithCorrespondingCoalescePartitions, parallelize_sorts,
+            replace_with_partial_sort,
+        };
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            sort_ctx.plan
+        };
+
+        // 3b: Order-preserving variants
+        use crate::enforce_sorting::replace_with_order_preserving_variants::{
+            OrderPreservationContext, replace_with_order_preserving_variants,
+        };
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // 3c: Sort pushdown (distribution-aware)
+        use crate::enforce_sorting::sort_pushdown::{
+            SortPushDown, assign_initial_requirements, pushdown_sorts,
+        };
+        let mut sort_pushdown = SortPushDown::new_default(plan);
+        assign_initial_requirements(&mut sort_pushdown);
+        let adjusted = pushdown_sorts(sort_pushdown)?;
+
+        // 3d: Partial sort
+        adjusted
+            .plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
+/// by bottom-up passes. Currently experimental — some plan shapes differ
+/// from the `pushdown_sorts` variant (less optimal but still correct).
+#[derive(Default, Debug)]
+pub struct EnsureRequirementsNoPushdown {}
+
+impl EnsureRequirementsNoPushdown {
+    /// Create a new rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Step 1: Distribution enforcement
+        use crate::enforce_distribution::{
+            DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs,
+            adjust_input_keys_ordering as adj_keys, ensure_distribution as 
ensure_dist,
+        };
+        let top_down = config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down {
+            KeyReqs::new_default(plan)
+                .transform_down(adj_keys)
+                .data()?
+                .plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+        let dist_ctx = DistCtx::new_default(plan);
+        let plan = dist_ctx
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts)
+        let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
+
+        // Step 3: parallelize_sorts (optional)
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            adjusted.plan
+        };
+
+        // Step 4: order-preserving variants
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // Step 5: partial sort
+        let plan = plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()?;
+
+        // NO pushdown_sorts — sort placement is purely bottom-up.
+        // Step 6: Final distribution enforcement
+        let dist_ctx2 = DistCtx::new_default(plan);
+        let plan = dist_ctx2
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 7: Fix any sorting violations the final distribution pass 
introduced.
+        let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data;
+
+        Ok(adjusted2.plan)
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirementsNoPushdown"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::sync::Arc;
+
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::Result;
+    use datafusion_common::tree_node::TreeNodeRecursion;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::{
+        EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
+    };
+    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+    use datafusion_physical_plan::limit::GlobalLimitExec;
+    use datafusion_physical_plan::sorts::sort::SortExec;
+    use datafusion_physical_plan::union::UnionExec;
+    use datafusion_physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
+        Partitioning, PlanProperties, SendableRecordBatchStream,
+    };
+
+    use crate::output_requirements::OutputRequirementExec;
+    use crate::sanity_checker::SanityCheckPlan;
+
+    use datafusion_common::{JoinType, NullEquality};
+    use datafusion_physical_expr::Distribution;
+    use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
+    use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use datafusion_physical_plan::joins::{
+        HashJoinExec, PartitionMode, SortMergeJoinExec,
+    };
+    use datafusion_physical_plan::projection::ProjectionExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
+    use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+
+    /// Mock ExecutionPlan with configurable partition count and output 
ordering.
+    #[derive(Debug)]
+    struct MockMultiPartitionExec {
+        properties: Arc<PlanProperties>,
+    }
+
+    impl MockMultiPartitionExec {
+        fn new(partition_count: usize) -> Self {
+            let schema = Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int64, false),
+                Field::new("b", DataType::Int64, false),
+            ]));
+            let mut eq = EquivalenceProperties::new(Arc::clone(&schema));
+            if let Some(ordering) = 
LexOrdering::new(vec![PhysicalSortExpr::new(
+                Arc::new(Column::new("a", 0)),
+                SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                },
+            )]) {
+                
eq.add_orderings(vec![ordering.into_iter().collect::<Vec<_>>()]);
+            }
+            let properties = PlanProperties::new(
+                eq,
+                Partitioning::UnknownPartitioning(partition_count),
+                EmissionType::Incremental,
+                Boundedness::Bounded,
+            );
+            Self {
+                properties: Arc::new(properties),
+            }
+        }
+    }
+
+    impl DisplayAs for MockMultiPartitionExec {
+        fn fmt_as(
+            &self,
+            _t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            write!(f, "MockMultiPartitionExec")
+        }
+    }
+
+    impl ExecutionPlan for MockMultiPartitionExec {
+        fn name(&self) -> &str {
+            "MockMultiPartitionExec"
+        }
+        fn properties(&self) -> &Arc<PlanProperties> {
+            &self.properties
+        }
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+        fn with_new_children(
+            self: Arc<Self>,
+            _children: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(self)
+        }
+        fn apply_expressions(
+            &self,
+            _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+        ) -> Result<TreeNodeRecursion> {
+            Ok(TreeNodeRecursion::Continue)
+        }
+        fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<datafusion_execution::TaskContext>,
+        ) -> Result<SendableRecordBatchStream> {
+            unimplemented!()
+        }
+    }
+
+    /// Helper: run EnsureRequirements and verify SanityCheckPlan passes
+    fn optimize_and_sanity_check(
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = ConfigOptions::default();
+        let optimized = EnsureRequirements::new().optimize(plan, &config)?;
+        // SanityCheckPlan must pass
+        SanityCheckPlan::new().optimize(Arc::clone(&optimized), &config)?;
+        Ok(optimized)
+    }
+
+    /// Helper: verify idempotency — running twice produces the same plan
+    fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
+        let config = ConfigOptions::default();
+        let p1 = EnsureRequirements::new()
+            .optimize(plan, &config)
+            .expect("first optimize failed");
+        let p2 = EnsureRequirements::new()
+            .optimize(Arc::clone(&p1), &config)
+            .expect("second optimize failed");
+
+        let s1 = datafusion_physical_plan::displayable(p1.as_ref())
+            .indent(true)
+            .to_string();
+        let s2 = datafusion_physical_plan::displayable(p2.as_ref())
+            .indent(true)
+            .to_string();
+        assert_eq!(
+            s1, s2,
+            "EnsureRequirements is NOT 
idempotent!\nFirst:\n{s1}\nSecond:\n{s2}"
+        );
+
+        // Both must pass SanityCheckPlan
+        SanityCheckPlan::new()
+            .optimize(p1, &config)
+            .expect("SanityCheckPlan failed on first pass");
+        SanityCheckPlan::new()
+            .optimize(p2, &config)
+            .expect("SanityCheckPlan failed on second pass");
+    }
+
+    /// Multi-partition sort + `GlobalLimitExec` must produce a valid plan.
+    /// Regression for the `SanityCheckPlan` failure that motivated this PR:
+    /// `pushdown_sorts` setting `preserve_partitioning=true` on 
multi-partition
+    /// input without inserting `SortPreservingMergeExec` violated the
+    /// `SinglePartition` requirement from `GlobalLimitExec`.
+    #[test]
+    fn test_multi_partition_sort_limit_sanity_check() {
+        let source = Arc::new(MockMultiPartitionExec::new(32));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Union with mixed partition counts + sort + limit.
+    #[test]
+    fn test_union_mixed_partitions_sort_limit() {
+        let live = Arc::new(MockMultiPartitionExec::new(32));
+        let historical = Arc::new(MockMultiPartitionExec::new(1));
+
+        let union = UnionExec::try_new(vec![live as _, historical as 
_]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Idempotency: multi-partition sort + limit
+    #[test]
+    fn test_idempotent_multi_partition_sort_limit() {
+        let source = Arc::new(MockMultiPartitionExec::new(16));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(10)));
+
+        assert_idempotent(limit);
+    }
+
+    /// Idempotency: union with mixed partitions
+    #[test]
+    fn test_idempotent_union_mixed_partitions() {
+        let live = Arc::new(MockMultiPartitionExec::new(8));
+        let hist = Arc::new(MockMultiPartitionExec::new(1));
+        let union = UnionExec::try_new(vec![live as _, hist as _]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(5)));
+
+        assert_idempotent(limit);
+    }
+
+    // ========================================================================
+    // Projection + multi-partition tests (pushdown_sorts trigger path)
+    // ========================================================================
+
+    /// ProjectionExec over multi-partition + sort DESC + limit.
+    /// This is the topology where pushdown_sorts pushes sort through 
projection
+    /// onto the multi-partition source. The optimizer must still produce a 
valid plan.
+    #[test]
+    fn test_projection_over_multi_partition_sort_limit() {
+        let source = Arc::new(MockMultiPartitionExec::new(16));
+        // Identity projection
+        let proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
+            (Arc::new(Column::new("a", 0)), "a".to_string()),
+            (Arc::new(Column::new("b", 1)), "b".to_string()),
+        ];
+        let projection =
+            Arc::new(ProjectionExec::try_new(proj_exprs, source as 
_).unwrap());
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, projection));
+        let limit: Arc<dyn ExecutionPlan> =
+            Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(Arc::clone(&limit));
+        assert!(

Review Comment:
   I think this could be more concise using `unwrap()` (or `expect(...)` if 
youw ant the messages



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -1672,17 +1669,16 @@ fn multi_smj_joins() -> Result<()> {
                         // TODO(wiedld): show different test result if enforce 
distribution first.
                         assert_plan!(plan_sort, @r"
                         SortMergeJoinExec: join_type=..., on=[(a@0, c@2)]
-                          RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=1, maintains_sort_order=true
-                            SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]
-                              CoalescePartitionsExec
-                                SortMergeJoinExec: join_type=..., on=[(a@0, 
b1@1)]
-                                  RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=1, maintains_sort_order=true
-                                    SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]
+                          RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC

Review Comment:
   this seems like a better plan to me (rather than sorting in a single 
partition is is sorting in parallel across all partitions)



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};
+        use crate::enforce_sorting::{PlanWithCorrespondingSort, 
ensure_sorting};
+
+        // Step 2a: Distribution enforcement (bottom-up)
+        let dist_ctx = DistributionContext::new_default(plan);
+        let dist_ctx = dist_ctx
+            .transform_up(|ctx| ensure_distribution(ctx, config))
+            .data()?;
+
+        // Step 2b: Sorting enforcement (bottom-up) — runs on 
distribution-fixed plan
+        let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan);
+        let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data;
+
+        // Phase 3: Optimization passes
+        // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort)
+        use crate::enforce_sorting::{
+            PlanWithCorrespondingCoalescePartitions, parallelize_sorts,
+            replace_with_partial_sort,
+        };
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            sort_ctx.plan
+        };
+
+        // 3b: Order-preserving variants
+        use crate::enforce_sorting::replace_with_order_preserving_variants::{
+            OrderPreservationContext, replace_with_order_preserving_variants,
+        };
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // 3c: Sort pushdown (distribution-aware)
+        use crate::enforce_sorting::sort_pushdown::{
+            SortPushDown, assign_initial_requirements, pushdown_sorts,
+        };
+        let mut sort_pushdown = SortPushDown::new_default(plan);
+        assign_initial_requirements(&mut sort_pushdown);
+        let adjusted = pushdown_sorts(sort_pushdown)?;
+
+        // 3d: Partial sort
+        adjusted
+            .plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
+/// by bottom-up passes. Currently experimental — some plan shapes differ
+/// from the `pushdown_sorts` variant (less optimal but still correct).
+#[derive(Default, Debug)]
+pub struct EnsureRequirementsNoPushdown {}
+
+impl EnsureRequirementsNoPushdown {
+    /// Create a new rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Step 1: Distribution enforcement
+        use crate::enforce_distribution::{
+            DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs,
+            adjust_input_keys_ordering as adj_keys, ensure_distribution as 
ensure_dist,
+        };
+        let top_down = config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down {
+            KeyReqs::new_default(plan)
+                .transform_down(adj_keys)
+                .data()?
+                .plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+        let dist_ctx = DistCtx::new_default(plan);
+        let plan = dist_ctx
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts)
+        let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
+
+        // Step 3: parallelize_sorts (optional)
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            adjusted.plan
+        };
+
+        // Step 4: order-preserving variants
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // Step 5: partial sort
+        let plan = plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()?;
+
+        // NO pushdown_sorts — sort placement is purely bottom-up.
+        // Step 6: Final distribution enforcement
+        let dist_ctx2 = DistCtx::new_default(plan);
+        let plan = dist_ctx2
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 7: Fix any sorting violations the final distribution pass 
introduced.
+        let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data;
+
+        Ok(adjusted2.plan)
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirementsNoPushdown"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod new_tests;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::sync::Arc;
+
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::Result;
+    use datafusion_common::tree_node::TreeNodeRecursion;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::{
+        EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
+    };
+    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+    use datafusion_physical_plan::limit::GlobalLimitExec;
+    use datafusion_physical_plan::sorts::sort::SortExec;
+    use datafusion_physical_plan::union::UnionExec;
+    use datafusion_physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
+        Partitioning, PlanProperties, SendableRecordBatchStream,
+    };
+
+    use crate::enforce_distribution::EnforceDistribution;
+    use crate::output_requirements::OutputRequirementExec;
+    use crate::sanity_checker::SanityCheckPlan;
+
+    use datafusion_common::{JoinType, NullEquality};
+    use datafusion_physical_expr::Distribution;
+    use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
+    use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use datafusion_physical_plan::joins::{
+        HashJoinExec, PartitionMode, SortMergeJoinExec,
+    };
+    use datafusion_physical_plan::projection::ProjectionExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
+    use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+
+    /// Mock ExecutionPlan with configurable partition count and output 
ordering.
+    #[derive(Debug)]
+    struct MockMultiPartitionExec {
+        properties: Arc<PlanProperties>,
+    }
+
+    impl MockMultiPartitionExec {
+        fn new(partition_count: usize) -> Self {
+            let schema = Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int64, false),
+                Field::new("b", DataType::Int64, false),
+            ]));
+            let mut eq = EquivalenceProperties::new(Arc::clone(&schema));
+            if let Some(ordering) = 
LexOrdering::new(vec![PhysicalSortExpr::new(
+                Arc::new(Column::new("a", 0)),
+                SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                },
+            )]) {
+                
eq.add_orderings(vec![ordering.into_iter().collect::<Vec<_>>()]);
+            }
+            let properties = PlanProperties::new(
+                eq,
+                Partitioning::UnknownPartitioning(partition_count),
+                EmissionType::Incremental,
+                Boundedness::Bounded,
+            );
+            Self {
+                properties: Arc::new(properties),
+            }
+        }
+    }
+
+    impl DisplayAs for MockMultiPartitionExec {
+        fn fmt_as(
+            &self,
+            _t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            write!(f, "MockMultiPartitionExec")
+        }
+    }
+
+    impl ExecutionPlan for MockMultiPartitionExec {
+        fn name(&self) -> &str {
+            "MockMultiPartitionExec"
+        }
+        fn properties(&self) -> &Arc<PlanProperties> {
+            &self.properties
+        }
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+        fn with_new_children(
+            self: Arc<Self>,
+            _children: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(self)
+        }
+        fn apply_expressions(
+            &self,
+            _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+        ) -> Result<TreeNodeRecursion> {
+            Ok(TreeNodeRecursion::Continue)
+        }
+        fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<datafusion_execution::TaskContext>,
+        ) -> Result<SendableRecordBatchStream> {
+            unimplemented!()
+        }
+    }
+
+    /// Helper: run EnsureRequirements and verify SanityCheckPlan passes
+    fn optimize_and_sanity_check(
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = ConfigOptions::default();
+        let optimized = EnsureRequirements::new().optimize(plan, &config)?;
+        // SanityCheckPlan must pass
+        SanityCheckPlan::new().optimize(Arc::clone(&optimized), &config)?;
+        Ok(optimized)
+    }
+
+    /// Helper: verify idempotency — running twice produces the same plan
+    fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
+        let config = ConfigOptions::default();
+        let p1 = EnsureRequirements::new()
+            .optimize(plan, &config)
+            .expect("first optimize failed");
+        let p2 = EnsureRequirements::new()
+            .optimize(Arc::clone(&p1), &config)
+            .expect("second optimize failed");
+
+        let s1 = datafusion_physical_plan::displayable(p1.as_ref())
+            .indent(true)
+            .to_string();
+        let s2 = datafusion_physical_plan::displayable(p2.as_ref())
+            .indent(true)
+            .to_string();
+        assert_eq!(
+            s1, s2,
+            "EnsureRequirements is NOT 
idempotent!\nFirst:\n{s1}\nSecond:\n{s2}"
+        );
+
+        // Both must pass SanityCheckPlan
+        SanityCheckPlan::new()
+            .optimize(p1, &config)
+            .expect("SanityCheckPlan failed on first pass");
+        SanityCheckPlan::new()
+            .optimize(p2, &config)
+            .expect("SanityCheckPlan failed on second pass");
+    }
+
+    /// Multi-partition sort + GlobalLimitExec must produce valid plan.
+    /// Regression test for UXX0/HRC 502s (April 2026).
+    #[test]
+    fn test_multi_partition_sort_limit_sanity_check() {
+        let source = Arc::new(MockMultiPartitionExec::new(32));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Union with mixed partition counts + sort + limit.
+    #[test]
+    fn test_union_mixed_partitions_sort_limit() {
+        let live = Arc::new(MockMultiPartitionExec::new(32));
+        let historical = Arc::new(MockMultiPartitionExec::new(1));
+
+        let union = UnionExec::try_new(vec![live as _, historical as 
_]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Idempotency: multi-partition sort + limit
+    #[test]
+    fn test_idempotent_multi_partition_sort_limit() {
+        let source = Arc::new(MockMultiPartitionExec::new(16));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(10)));
+
+        assert_idempotent(limit);
+    }
+
+    /// Idempotency: union with mixed partitions
+    #[test]
+    fn test_idempotent_union_mixed_partitions() {
+        let live = Arc::new(MockMultiPartitionExec::new(8));
+        let hist = Arc::new(MockMultiPartitionExec::new(1));
+        let union = UnionExec::try_new(vec![live as _, hist as _]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(5)));
+
+        assert_idempotent(limit);
+    }
+
+    // ========================================================================
+    // Projection + multi-partition tests (pushdown_sorts trigger path)
+    // ========================================================================
+
+    /// ProjectionExec over multi-partition + sort DESC + limit.
+    /// This is the topology where pushdown_sorts pushes sort through 
projection
+    /// onto the multi-partition source. The optimizer must still produce a 
valid plan.
+    #[test]
+    fn test_projection_over_multi_partition_sort_limit() {
+        let source = Arc::new(MockMultiPartitionExec::new(16));
+        // Identity projection
+        let proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
+            (Arc::new(Column::new("a", 0)), "a".to_string()),
+            (Arc::new(Column::new("b", 1)), "b".to_string()),
+        ];
+        let projection =
+            Arc::new(ProjectionExec::try_new(proj_exprs, source as 
_).unwrap());
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, projection));
+        let limit: Arc<dyn ExecutionPlan> =
+            Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(Arc::clone(&limit));
+        assert!(
+            result.is_ok(),
+            "SanityCheckPlan failed for projection over multi-partition: {:?}",
+            result.err()
+        );
+        assert_idempotent(limit);
+    }
+
+    // ========================================================================
+    // Single partition tests (no unnecessary operators)
+    // ========================================================================
+
+    /// Single partition source + sort + limit should NOT add 
SortPreservingMergeExec.
+    #[test]
+    fn test_single_partition_no_unnecessary_spm() {
+        let source = Arc::new(MockMultiPartitionExec::new(1));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit: Arc<dyn ExecutionPlan> =
+            Arc::new(GlobalLimitExec::new(sort, 0, Some(10)));
+
+        let optimized = optimize_and_sanity_check(limit).unwrap();
+        let plan_str = 
datafusion_physical_plan::displayable(optimized.as_ref())
+            .indent(true)
+            .to_string();
+
+        // Single partition should not have SortPreservingMergeExec
+        assert!(
+            !plan_str.contains("SortPreservingMergeExec"),
+            "Unnecessary SortPreservingMergeExec for single 
partition:\n{plan_str}"
+        );
+    }
+
+    /// Source already has correct ordering → should not add SortExec.
+    #[test]
+    fn test_sort_already_satisfied_no_extra_sort() {
+        let source = Arc::new(MockMultiPartitionExec::new(1));
+
+        // Sort ASC matches MockMultiPartitionExec's output ordering (a ASC)
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit: Arc<dyn ExecutionPlan> =
+            Arc::new(GlobalLimitExec::new(sort, 0, Some(10)));
+
+        let optimized = optimize_and_sanity_check(limit).unwrap();
+        let plan_str = 
datafusion_physical_plan::displayable(optimized.as_ref())
+            .indent(true)
+            .to_string();
+
+        // Sort should be eliminated since source already satisfies ordering
+        // The plan should just be limit + source (or limit + local limit + 
source)
+        assert!(
+            !plan_str.contains("SortExec: expr=[a@0 ASC"),
+            "Unnecessary SortExec when ordering already satisfied:\n{plan_str}"
+        );
+    }
+
+    // ========================================================================

Review Comment:
   (see comment above about reundancy)



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};

Review Comment:
   as a follow on PR it might be nuce to move the enforce_sorting and 
enforce_distribution to submodules of ensure_requirements (just move the code / 
helpers around, as they are (now) internal implementation details)



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -695,17 +695,14 @@ impl TestConfig {
             // TODO: End state payloads will be checked here.
         }
 
-        for run in optimizers_to_run {
-            optimized = match run {
-                Run::Distribution => {
-                    let optimizer = EnforceDistribution::new();
-                    optimizer.optimize(optimized, &self.config)?
-                }
-                Run::Sorting => {
-                    let optimizer = EnforceSorting::new();
-                    optimizer.optimize(optimized, &self.config)?
-                }
-            };
+        // With `EnsureRequirements`, distribution and sorting enforcement are
+        // composed into a single idempotent pass, so the historical sequence
+        // of `Run::Distribution` / `Run::Sorting` collapses to repeated calls
+        // of the same rule. The sequences are preserved so existing test
+        // assertions (which encode legacy run orders) remain stable.

Review Comment:
   It would be a nice follow on PR to conslidate these tests (so it only ran 
EnsureREquirements once) (or maybe ran it once, and then ran it again and 
ensured it was idempotent)



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};
+        use crate::enforce_sorting::{PlanWithCorrespondingSort, 
ensure_sorting};
+
+        // Step 2a: Distribution enforcement (bottom-up)
+        let dist_ctx = DistributionContext::new_default(plan);
+        let dist_ctx = dist_ctx
+            .transform_up(|ctx| ensure_distribution(ctx, config))
+            .data()?;
+
+        // Step 2b: Sorting enforcement (bottom-up) — runs on 
distribution-fixed plan
+        let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan);
+        let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data;
+
+        // Phase 3: Optimization passes
+        // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort)
+        use crate::enforce_sorting::{
+            PlanWithCorrespondingCoalescePartitions, parallelize_sorts,
+            replace_with_partial_sort,
+        };
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            sort_ctx.plan
+        };
+
+        // 3b: Order-preserving variants
+        use crate::enforce_sorting::replace_with_order_preserving_variants::{
+            OrderPreservationContext, replace_with_order_preserving_variants,
+        };
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // 3c: Sort pushdown (distribution-aware)
+        use crate::enforce_sorting::sort_pushdown::{
+            SortPushDown, assign_initial_requirements, pushdown_sorts,
+        };
+        let mut sort_pushdown = SortPushDown::new_default(plan);
+        assign_initial_requirements(&mut sort_pushdown);
+        let adjusted = pushdown_sorts(sort_pushdown)?;
+
+        // 3d: Partial sort
+        adjusted
+            .plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
+/// by bottom-up passes. Currently experimental — some plan shapes differ
+/// from the `pushdown_sorts` variant (less optimal but still correct).
+#[derive(Default, Debug)]
+pub struct EnsureRequirementsNoPushdown {}

Review Comment:
   Is `EnsureRequirementsNoPushdown` used anywhere? I don't see it



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -1621,18 +1618,18 @@ fn multi_smj_joins() -> Result<()> {
                     _ => {
                         assert_plan!(plan_distrib, @r"
                         SortMergeJoinExec: join_type=..., on=[(a@0, c@2)]
-                          SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]
-                            RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10
+                          RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC

Review Comment:
   It looks like the repartitioning was pulled up after the join (the plans 
look very similar to me)



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};
+        use crate::enforce_sorting::{PlanWithCorrespondingSort, 
ensure_sorting};
+
+        // Step 2a: Distribution enforcement (bottom-up)
+        let dist_ctx = DistributionContext::new_default(plan);
+        let dist_ctx = dist_ctx
+            .transform_up(|ctx| ensure_distribution(ctx, config))
+            .data()?;
+
+        // Step 2b: Sorting enforcement (bottom-up) — runs on 
distribution-fixed plan
+        let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan);
+        let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data;
+
+        // Phase 3: Optimization passes
+        // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort)
+        use crate::enforce_sorting::{
+            PlanWithCorrespondingCoalescePartitions, parallelize_sorts,
+            replace_with_partial_sort,
+        };
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            sort_ctx.plan
+        };
+
+        // 3b: Order-preserving variants
+        use crate::enforce_sorting::replace_with_order_preserving_variants::{
+            OrderPreservationContext, replace_with_order_preserving_variants,
+        };
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // 3c: Sort pushdown (distribution-aware)
+        use crate::enforce_sorting::sort_pushdown::{
+            SortPushDown, assign_initial_requirements, pushdown_sorts,
+        };
+        let mut sort_pushdown = SortPushDown::new_default(plan);
+        assign_initial_requirements(&mut sort_pushdown);
+        let adjusted = pushdown_sorts(sort_pushdown)?;
+
+        // 3d: Partial sort
+        adjusted
+            .plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
+/// by bottom-up passes. Currently experimental — some plan shapes differ
+/// from the `pushdown_sorts` variant (less optimal but still correct).
+#[derive(Default, Debug)]
+pub struct EnsureRequirementsNoPushdown {}
+
+impl EnsureRequirementsNoPushdown {
+    /// Create a new rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Step 1: Distribution enforcement
+        use crate::enforce_distribution::{
+            DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs,
+            adjust_input_keys_ordering as adj_keys, ensure_distribution as 
ensure_dist,
+        };
+        let top_down = config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down {
+            KeyReqs::new_default(plan)
+                .transform_down(adj_keys)
+                .data()?
+                .plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+        let dist_ctx = DistCtx::new_default(plan);
+        let plan = dist_ctx
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts)
+        let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
+
+        // Step 3: parallelize_sorts (optional)
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            adjusted.plan
+        };
+
+        // Step 4: order-preserving variants
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // Step 5: partial sort
+        let plan = plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()?;
+
+        // NO pushdown_sorts — sort placement is purely bottom-up.
+        // Step 6: Final distribution enforcement
+        let dist_ctx2 = DistCtx::new_default(plan);
+        let plan = dist_ctx2
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 7: Fix any sorting violations the final distribution pass 
introduced.
+        let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data;
+
+        Ok(adjusted2.plan)
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirementsNoPushdown"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::sync::Arc;
+
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::Result;
+    use datafusion_common::tree_node::TreeNodeRecursion;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::{
+        EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
+    };
+    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+    use datafusion_physical_plan::limit::GlobalLimitExec;
+    use datafusion_physical_plan::sorts::sort::SortExec;
+    use datafusion_physical_plan::union::UnionExec;
+    use datafusion_physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
+        Partitioning, PlanProperties, SendableRecordBatchStream,
+    };
+
+    use crate::output_requirements::OutputRequirementExec;
+    use crate::sanity_checker::SanityCheckPlan;
+
+    use datafusion_common::{JoinType, NullEquality};
+    use datafusion_physical_expr::Distribution;
+    use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
+    use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use datafusion_physical_plan::joins::{
+        HashJoinExec, PartitionMode, SortMergeJoinExec,
+    };
+    use datafusion_physical_plan::projection::ProjectionExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
+    use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+
+    /// Mock ExecutionPlan with configurable partition count and output 
ordering.
+    #[derive(Debug)]
+    struct MockMultiPartitionExec {
+        properties: Arc<PlanProperties>,
+    }
+
+    impl MockMultiPartitionExec {
+        fn new(partition_count: usize) -> Self {
+            let schema = Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int64, false),
+                Field::new("b", DataType::Int64, false),
+            ]));
+            let mut eq = EquivalenceProperties::new(Arc::clone(&schema));
+            if let Some(ordering) = 
LexOrdering::new(vec![PhysicalSortExpr::new(
+                Arc::new(Column::new("a", 0)),
+                SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                },
+            )]) {
+                
eq.add_orderings(vec![ordering.into_iter().collect::<Vec<_>>()]);
+            }
+            let properties = PlanProperties::new(
+                eq,
+                Partitioning::UnknownPartitioning(partition_count),
+                EmissionType::Incremental,
+                Boundedness::Bounded,
+            );
+            Self {
+                properties: Arc::new(properties),
+            }
+        }
+    }
+
+    impl DisplayAs for MockMultiPartitionExec {
+        fn fmt_as(
+            &self,
+            _t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            write!(f, "MockMultiPartitionExec")
+        }
+    }
+
+    impl ExecutionPlan for MockMultiPartitionExec {
+        fn name(&self) -> &str {
+            "MockMultiPartitionExec"
+        }
+        fn properties(&self) -> &Arc<PlanProperties> {
+            &self.properties
+        }
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+        fn with_new_children(
+            self: Arc<Self>,
+            _children: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(self)
+        }
+        fn apply_expressions(
+            &self,
+            _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+        ) -> Result<TreeNodeRecursion> {
+            Ok(TreeNodeRecursion::Continue)
+        }
+        fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<datafusion_execution::TaskContext>,
+        ) -> Result<SendableRecordBatchStream> {
+            unimplemented!()
+        }
+    }
+
+    /// Helper: run EnsureRequirements and verify SanityCheckPlan passes
+    fn optimize_and_sanity_check(
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = ConfigOptions::default();
+        let optimized = EnsureRequirements::new().optimize(plan, &config)?;
+        // SanityCheckPlan must pass
+        SanityCheckPlan::new().optimize(Arc::clone(&optimized), &config)?;
+        Ok(optimized)
+    }
+
+    /// Helper: verify idempotency — running twice produces the same plan
+    fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
+        let config = ConfigOptions::default();
+        let p1 = EnsureRequirements::new()
+            .optimize(plan, &config)
+            .expect("first optimize failed");
+        let p2 = EnsureRequirements::new()
+            .optimize(Arc::clone(&p1), &config)
+            .expect("second optimize failed");
+
+        let s1 = datafusion_physical_plan::displayable(p1.as_ref())
+            .indent(true)
+            .to_string();
+        let s2 = datafusion_physical_plan::displayable(p2.as_ref())
+            .indent(true)
+            .to_string();
+        assert_eq!(
+            s1, s2,
+            "EnsureRequirements is NOT 
idempotent!\nFirst:\n{s1}\nSecond:\n{s2}"
+        );
+
+        // Both must pass SanityCheckPlan
+        SanityCheckPlan::new()
+            .optimize(p1, &config)
+            .expect("SanityCheckPlan failed on first pass");
+        SanityCheckPlan::new()
+            .optimize(p2, &config)
+            .expect("SanityCheckPlan failed on second pass");
+    }
+
+    /// Multi-partition sort + `GlobalLimitExec` must produce a valid plan.
+    /// Regression for the `SanityCheckPlan` failure that motivated this PR:
+    /// `pushdown_sorts` setting `preserve_partitioning=true` on 
multi-partition
+    /// input without inserting `SortPreservingMergeExec` violated the
+    /// `SinglePartition` requirement from `GlobalLimitExec`.
+    #[test]
+    fn test_multi_partition_sort_limit_sanity_check() {
+        let source = Arc::new(MockMultiPartitionExec::new(32));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Union with mixed partition counts + sort + limit.
+    #[test]
+    fn test_union_mixed_partitions_sort_limit() {
+        let live = Arc::new(MockMultiPartitionExec::new(32));
+        let historical = Arc::new(MockMultiPartitionExec::new(1));
+
+        let union = UnionExec::try_new(vec![live as _, historical as 
_]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());

Review Comment:
   These tets now also juse `assert` when most of the existing tests use 
`insta` snapshots to make updating easier (which may not be a thing anymore now 
that AI coding tools are around), but consistency would be nice
   
   



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -1875,25 +1870,21 @@ fn smj_join_key_ordering() -> Result<()> {
     let plan_sort = test_config.to_plan(join, &SORT_DISTRIB_DISTRIB);
     assert_plan!(plan_sort, @r"
     SortMergeJoinExec: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]
-      RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), 
input_partitions=1, maintains_sort_order=true
-        SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false]
-          CoalescePartitionsExec
-            ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]
-              ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]
-                AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as 
a1], aggr=[]
-                  RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10
-                    AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], 
aggr=[]
-                      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
-                        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet
-      RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), 
input_partitions=1, maintains_sort_order=true
-        SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false]
-          CoalescePartitionsExec
-            ProjectionExec: expr=[a@1 as a2, b@0 as b2]
-              AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]
-                RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10
-                  AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], 
aggr=[]
-                    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
-                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet
+      SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]

Review Comment:
   same here -- this plan is better



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)
+        // For each node: distribution first, then sorting.
+        use crate::enforce_distribution::{DistributionContext, 
ensure_distribution};
+        use crate::enforce_sorting::{PlanWithCorrespondingSort, 
ensure_sorting};
+
+        // Step 2a: Distribution enforcement (bottom-up)
+        let dist_ctx = DistributionContext::new_default(plan);
+        let dist_ctx = dist_ctx
+            .transform_up(|ctx| ensure_distribution(ctx, config))
+            .data()?;
+
+        // Step 2b: Sorting enforcement (bottom-up) — runs on 
distribution-fixed plan
+        let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan);
+        let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data;
+
+        // Phase 3: Optimization passes
+        // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort)
+        use crate::enforce_sorting::{
+            PlanWithCorrespondingCoalescePartitions, parallelize_sorts,
+            replace_with_partial_sort,
+        };
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            sort_ctx.plan
+        };
+
+        // 3b: Order-preserving variants
+        use crate::enforce_sorting::replace_with_order_preserving_variants::{
+            OrderPreservationContext, replace_with_order_preserving_variants,
+        };
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // 3c: Sort pushdown (distribution-aware)
+        use crate::enforce_sorting::sort_pushdown::{
+            SortPushDown, assign_initial_requirements, pushdown_sorts,
+        };
+        let mut sort_pushdown = SortPushDown::new_default(plan);
+        assign_initial_requirements(&mut sort_pushdown);
+        let adjusted = pushdown_sorts(sort_pushdown)?;
+
+        // 3d: Partial sort
+        adjusted
+            .plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
+/// by bottom-up passes. Currently experimental — some plan shapes differ
+/// from the `pushdown_sorts` variant (less optimal but still correct).
+#[derive(Default, Debug)]
+pub struct EnsureRequirementsNoPushdown {}
+
+impl EnsureRequirementsNoPushdown {
+    /// Create a new rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Step 1: Distribution enforcement
+        use crate::enforce_distribution::{
+            DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs,
+            adjust_input_keys_ordering as adj_keys, ensure_distribution as 
ensure_dist,
+        };
+        let top_down = config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down {
+            KeyReqs::new_default(plan)
+                .transform_down(adj_keys)
+                .data()?
+                .plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+        let dist_ctx = DistCtx::new_default(plan);
+        let plan = dist_ctx
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts)
+        let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
+
+        // Step 3: parallelize_sorts (optional)
+        let plan = if config.optimizer.repartition_sorts {
+            let ctx = 
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
+            ctx.transform_up(parallelize_sorts).data()?.plan
+        } else {
+            adjusted.plan
+        };
+
+        // Step 4: order-preserving variants
+        let ctx = OrderPreservationContext::new_default(plan);
+        let plan = ctx
+            .transform_up(|c| {
+                replace_with_order_preserving_variants(c, false, true, config)
+            })
+            .data()?
+            .plan;
+
+        // Step 5: partial sort
+        let plan = plan
+            .transform_up(|p| 
Ok(Transformed::yes(replace_with_partial_sort(p)?)))
+            .data()?;
+
+        // NO pushdown_sorts — sort placement is purely bottom-up.
+        // Step 6: Final distribution enforcement
+        let dist_ctx2 = DistCtx::new_default(plan);
+        let plan = dist_ctx2
+            .transform_up(|ctx| ensure_dist(ctx, config))
+            .data()?
+            .plan;
+
+        // Step 7: Fix any sorting violations the final distribution pass 
introduced.
+        let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan);
+        let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data;
+
+        Ok(adjusted2.plan)
+    }
+
+    fn name(&self) -> &str {
+        "EnsureRequirementsNoPushdown"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::sync::Arc;
+
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::Result;
+    use datafusion_common::tree_node::TreeNodeRecursion;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::{
+        EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
+    };
+    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+    use datafusion_physical_plan::limit::GlobalLimitExec;
+    use datafusion_physical_plan::sorts::sort::SortExec;
+    use datafusion_physical_plan::union::UnionExec;
+    use datafusion_physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
+        Partitioning, PlanProperties, SendableRecordBatchStream,
+    };
+
+    use crate::output_requirements::OutputRequirementExec;
+    use crate::sanity_checker::SanityCheckPlan;
+
+    use datafusion_common::{JoinType, NullEquality};
+    use datafusion_physical_expr::Distribution;
+    use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
+    use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use datafusion_physical_plan::joins::{
+        HashJoinExec, PartitionMode, SortMergeJoinExec,
+    };
+    use datafusion_physical_plan::projection::ProjectionExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
+    use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+
+    /// Mock ExecutionPlan with configurable partition count and output 
ordering.
+    #[derive(Debug)]
+    struct MockMultiPartitionExec {
+        properties: Arc<PlanProperties>,
+    }
+
+    impl MockMultiPartitionExec {
+        fn new(partition_count: usize) -> Self {
+            let schema = Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int64, false),
+                Field::new("b", DataType::Int64, false),
+            ]));
+            let mut eq = EquivalenceProperties::new(Arc::clone(&schema));
+            if let Some(ordering) = 
LexOrdering::new(vec![PhysicalSortExpr::new(
+                Arc::new(Column::new("a", 0)),
+                SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                },
+            )]) {
+                
eq.add_orderings(vec![ordering.into_iter().collect::<Vec<_>>()]);
+            }
+            let properties = PlanProperties::new(
+                eq,
+                Partitioning::UnknownPartitioning(partition_count),
+                EmissionType::Incremental,
+                Boundedness::Bounded,
+            );
+            Self {
+                properties: Arc::new(properties),
+            }
+        }
+    }
+
+    impl DisplayAs for MockMultiPartitionExec {
+        fn fmt_as(
+            &self,
+            _t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            write!(f, "MockMultiPartitionExec")
+        }
+    }
+
+    impl ExecutionPlan for MockMultiPartitionExec {
+        fn name(&self) -> &str {
+            "MockMultiPartitionExec"
+        }
+        fn properties(&self) -> &Arc<PlanProperties> {
+            &self.properties
+        }
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+        fn with_new_children(
+            self: Arc<Self>,
+            _children: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(self)
+        }
+        fn apply_expressions(
+            &self,
+            _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+        ) -> Result<TreeNodeRecursion> {
+            Ok(TreeNodeRecursion::Continue)
+        }
+        fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<datafusion_execution::TaskContext>,
+        ) -> Result<SendableRecordBatchStream> {
+            unimplemented!()
+        }
+    }
+
+    /// Helper: run EnsureRequirements and verify SanityCheckPlan passes
+    fn optimize_and_sanity_check(
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = ConfigOptions::default();
+        let optimized = EnsureRequirements::new().optimize(plan, &config)?;
+        // SanityCheckPlan must pass
+        SanityCheckPlan::new().optimize(Arc::clone(&optimized), &config)?;
+        Ok(optimized)
+    }
+
+    /// Helper: verify idempotency — running twice produces the same plan
+    fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
+        let config = ConfigOptions::default();
+        let p1 = EnsureRequirements::new()
+            .optimize(plan, &config)
+            .expect("first optimize failed");
+        let p2 = EnsureRequirements::new()
+            .optimize(Arc::clone(&p1), &config)
+            .expect("second optimize failed");
+
+        let s1 = datafusion_physical_plan::displayable(p1.as_ref())
+            .indent(true)
+            .to_string();
+        let s2 = datafusion_physical_plan::displayable(p2.as_ref())
+            .indent(true)
+            .to_string();
+        assert_eq!(
+            s1, s2,
+            "EnsureRequirements is NOT 
idempotent!\nFirst:\n{s1}\nSecond:\n{s2}"
+        );
+
+        // Both must pass SanityCheckPlan
+        SanityCheckPlan::new()
+            .optimize(p1, &config)
+            .expect("SanityCheckPlan failed on first pass");
+        SanityCheckPlan::new()
+            .optimize(p2, &config)
+            .expect("SanityCheckPlan failed on second pass");
+    }
+
+    /// Multi-partition sort + `GlobalLimitExec` must produce a valid plan.
+    /// Regression for the `SanityCheckPlan` failure that motivated this PR:
+    /// `pushdown_sorts` setting `preserve_partitioning=true` on 
multi-partition
+    /// input without inserting `SortPreservingMergeExec` violated the
+    /// `SinglePartition` requirement from `GlobalLimitExec`.
+    #[test]
+    fn test_multi_partition_sort_limit_sanity_check() {
+        let source = Arc::new(MockMultiPartitionExec::new(32));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Union with mixed partition counts + sort + limit.
+    #[test]
+    fn test_union_mixed_partitions_sort_limit() {
+        let live = Arc::new(MockMultiPartitionExec::new(32));
+        let historical = Arc::new(MockMultiPartitionExec::new(1));
+
+        let union = UnionExec::try_new(vec![live as _, historical as 
_]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(limit);
+        assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err());
+    }
+
+    /// Idempotency: multi-partition sort + limit
+    #[test]
+    fn test_idempotent_multi_partition_sort_limit() {
+        let source = Arc::new(MockMultiPartitionExec::new(16));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(10)));
+
+        assert_idempotent(limit);
+    }
+
+    /// Idempotency: union with mixed partitions
+    #[test]
+    fn test_idempotent_union_mixed_partitions() {
+        let live = Arc::new(MockMultiPartitionExec::new(8));
+        let hist = Arc::new(MockMultiPartitionExec::new(1));
+        let union = UnionExec::try_new(vec![live as _, hist as _]).unwrap();
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, union));
+        let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(5)));
+
+        assert_idempotent(limit);
+    }
+
+    // ========================================================================
+    // Projection + multi-partition tests (pushdown_sorts trigger path)
+    // ========================================================================
+
+    /// ProjectionExec over multi-partition + sort DESC + limit.
+    /// This is the topology where pushdown_sorts pushes sort through 
projection
+    /// onto the multi-partition source. The optimizer must still produce a 
valid plan.
+    #[test]
+    fn test_projection_over_multi_partition_sort_limit() {
+        let source = Arc::new(MockMultiPartitionExec::new(16));
+        // Identity projection
+        let proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
+            (Arc::new(Column::new("a", 0)), "a".to_string()),
+            (Arc::new(Column::new("b", 1)), "b".to_string()),
+        ];
+        let projection =
+            Arc::new(ProjectionExec::try_new(proj_exprs, source as 
_).unwrap());
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, projection));
+        let limit: Arc<dyn ExecutionPlan> =
+            Arc::new(GlobalLimitExec::new(sort, 0, Some(21)));
+
+        let result = optimize_and_sanity_check(Arc::clone(&limit));
+        assert!(
+            result.is_ok(),
+            "SanityCheckPlan failed for projection over multi-partition: {:?}",
+            result.err()
+        );
+        assert_idempotent(limit);
+    }
+
+    // ========================================================================
+    // Single partition tests (no unnecessary operators)
+    // ========================================================================
+
+    /// Single partition source + sort + limit should NOT add 
SortPreservingMergeExec.
+    #[test]
+    fn test_single_partition_no_unnecessary_spm() {
+        let source = Arc::new(MockMultiPartitionExec::new(1));
+
+        let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new(
+            Arc::new(Column::new("a", 0)),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let sort = Arc::new(SortExec::new(sort_expr, source));
+        let limit: Arc<dyn ExecutionPlan> =
+            Arc::new(GlobalLimitExec::new(sort, 0, Some(10)));
+
+        let optimized = optimize_and_sanity_check(limit).unwrap();
+        let plan_str = 
datafusion_physical_plan::displayable(optimized.as_ref())
+            .indent(true)
+            .to_string();
+
+        // Single partition should not have SortPreservingMergeExec
+        assert!(
+            !plan_str.contains("SortPreservingMergeExec"),
+            "Unnecessary SortPreservingMergeExec for single 
partition:\n{plan_str}"
+        );

Review Comment:
   This pattern is often combined into a single macro or function using 
insta_assert in other tests to reduce the test reptition and make it easier tp 
update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to