This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 02ce571910 Push even local limits past windows (#20752)
02ce571910 is described below

commit 02ce571910e6be643b29f6b282c86ab137dbfd2d
Author: Brent Gardner <[email protected]>
AuthorDate: Fri Mar 6 14:56:40 2026 -0700

    Push even local limits past windows (#20752)
    
    ## Which issue does this PR close?
    
    - Closes #20751.
    
    ## Rationale for this change
    
    Described in issue
    
    ## What changes are included in this PR?
    
    A simple change, and a unit test
    
    ## Are these changes tested?
    
    With a unit test
    
    ## Are there any user-facing changes?
    
    Some queries should go faster, especially when distributed
    
    **Note: AI assistance was used in this PR**
---
 Cargo.lock                                         |   1 +
 datafusion/physical-optimizer/Cargo.toml           |   1 +
 .../src/limit_pushdown_past_window.rs              | 119 ++++++++++++++++++++-
 3 files changed, 120 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index 38fa83dd12..9c8f2c5935 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2449,6 +2449,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-expr-common",
  "datafusion-functions",
+ "datafusion-functions-window",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "datafusion-physical-plan",
diff --git a/datafusion/physical-optimizer/Cargo.toml 
b/datafusion/physical-optimizer/Cargo.toml
index 395da10d62..38c8a7c372 100644
--- a/datafusion/physical-optimizer/Cargo.toml
+++ b/datafusion/physical-optimizer/Cargo.toml
@@ -56,5 +56,6 @@ recursive = { workspace = true, optional = true }
 [dev-dependencies]
 datafusion-expr = { workspace = true }
 datafusion-functions = { workspace = true }
+datafusion-functions-window = { workspace = true }
 insta = { workspace = true }
 tokio = { workspace = true }
diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs 
b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
index c23fa4faef..729b600da7 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
@@ -25,7 +25,7 @@ use datafusion_physical_expr::window::{
     StandardWindowFunctionExpr, WindowExpr,
 };
 use datafusion_physical_plan::execution_plan::CardinalityEffect;
-use datafusion_physical_plan::limit::GlobalLimitExec;
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion_physical_plan::repartition::RepartitionExec;
 use datafusion_physical_plan::sorts::sort::SortExec;
 use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -206,6 +206,12 @@ fn get_limit(node: &Arc<dyn ExecutionPlan>, ctx: &mut 
TraverseState) -> bool {
         ctx.reset_limit(limit.fetch().map(|fetch| fetch + limit.skip()));
         return true;
     }
+    // In distributed execution, GlobalLimitExec becomes LocalLimitExec
+    // per partition. Handle it the same way (LocalLimitExec has no skip).
+    if let Some(limit) = node.as_any().downcast_ref::<LocalLimitExec>() {
+        ctx.reset_limit(Some(limit.fetch()));
+        return true;
+    }
     if let Some(limit) = 
node.as_any().downcast_ref::<SortPreservingMergeExec>() {
         ctx.reset_limit(limit.fetch());
         return true;
@@ -254,3 +260,114 @@ fn bound_to_usize(bound: &WindowFrameBound) -> 
Option<usize> {
         _ => None,
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::ScalarValue;
+    use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+    use datafusion_functions_window::row_number::row_number_udwf;
+    use datafusion_physical_expr::expressions::col;
+    use datafusion_physical_expr::window::StandardWindowExpr;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+    use datafusion_physical_plan::InputOrderMode;
+    use datafusion_physical_plan::displayable;
+    use datafusion_physical_plan::limit::LocalLimitExec;
+    use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
+    use datafusion_physical_plan::windows::{
+        BoundedWindowAggExec, create_udwf_window_expr,
+    };
+    use insta::assert_snapshot;
+    use std::sync::Arc;
+
+    fn plan_str(plan: &dyn ExecutionPlan) -> String {
+        displayable(plan).indent(true).to_string()
+    }
+
+    fn schema() -> Arc<Schema> {
+        Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
+    }
+
+    /// Build: LocalLimitExec or GlobalLimitExec → 
BoundedWindowAggExec(row_number) → SortExec
+    fn build_window_plan(
+        use_local_limit: bool,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        let s = schema();
+        let input: Arc<dyn ExecutionPlan> =
+            Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
+
+        let ordering =
+            LexOrdering::new(vec![PhysicalSortExpr::new_default(col("a", 
&s)?).asc()])
+                .unwrap();
+
+        let sort: Arc<dyn ExecutionPlan> = Arc::new(
+            SortExec::new(ordering.clone(), 
input).with_preserve_partitioning(true),
+        );
+
+        let window_expr = Arc::new(StandardWindowExpr::new(
+            create_udwf_window_expr(
+                &row_number_udwf(),
+                &[],
+                &s,
+                "row_number".to_string(),
+                false,
+            )?,
+            &[],
+            ordering.as_ref(),
+            Arc::new(WindowFrame::new_bounds(
+                WindowFrameUnits::Rows,
+                WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                WindowFrameBound::CurrentRow,
+            )),
+        ));
+
+        let window: Arc<dyn ExecutionPlan> = 
Arc::new(BoundedWindowAggExec::try_new(
+            vec![window_expr],
+            sort,
+            InputOrderMode::Sorted,
+            true,
+        )?);
+
+        let limit: Arc<dyn ExecutionPlan> = if use_local_limit {
+            Arc::new(LocalLimitExec::new(window, 100))
+        } else {
+            Arc::new(GlobalLimitExec::new(window, 0, Some(100)))
+        };
+
+        Ok(limit)
+    }
+
+    fn optimize(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        let mut config = ConfigOptions::new();
+        config.optimizer.enable_window_limits = true;
+        LimitPushPastWindows::new().optimize(plan, &config).unwrap()
+    }
+
+    /// GlobalLimitExec above a windowed sort should push fetch into the 
SortExec.
+    #[test]
+    fn global_limit_pushes_past_window() {
+        let plan = build_window_plan(false).unwrap();
+        let optimized = optimize(plan);
+        assert_snapshot!(plan_str(optimized.as_ref()), @r#"
+        GlobalLimitExec: skip=0, fetch=100
+          BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 
}, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+            SortExec: TopK(fetch=100), expr=[a@0 ASC], 
preserve_partitioning=[true]
+              PlaceholderRowExec
+        "#);
+    }
+
+    /// LocalLimitExec above a windowed sort should also push fetch into the 
SortExec.
+    /// This is the case in distributed execution where GlobalLimitExec 
becomes LocalLimitExec.
+    #[test]
+    fn local_limit_pushes_past_window() {
+        let plan = build_window_plan(true).unwrap();
+        let optimized = optimize(plan);
+        assert_snapshot!(plan_str(optimized.as_ref()), @r#"
+        LocalLimitExec: fetch=100
+          BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 
}, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+            SortExec: TopK(fetch=100), expr=[a@0 ASC], 
preserve_partitioning=[true]
+              PlaceholderRowExec
+        "#);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to