yonatan-sevenai commented on code in PR #21375:
URL: https://github.com/apache/datafusion/pull/21375#discussion_r3255110337


##########
datafusion/sql/src/unparser/plan.rs:
##########
@@ -680,8 +711,219 @@ impl Unparser<'_> {
                 if self.dialect.unnest_as_lateral_flatten() {
                     Self::collect_flatten_aliases(p.input.as_ref(), select);
                 }
-                self.reconstruct_select_statement(plan, p, select)?;
-                self.select_to_sql_recursively(p.input.as_ref(), query, 
select, relation)
+                // Walk down through consecutive Sort/Limit nodes, greedily
+                // absorbing what can be folded into the SELECT we're
+                // building around the Aggregate. A single SQL SELECT can
+                // carry at most one `ORDER BY` (applied before `LIMIT`),
+                // so the safe shape between us and the Aggregate is
+                // `Limit* Sort?` (outer→inner). We stop at the first node
+                // that would violate this; that node becomes the
+                // subquery boundary, and recursion (seeing
+                // `already_projected = true`) wraps it in a derived
+                // relation. If we walk all the way to a non-Sort/non-Limit
+                // terminator, the entire chain folds into one SELECT.
+                //
+                // Stacked Sorts with nothing between them collapse to the
+                // outermost — the same simplification `EnforceSorting`
+                // applies on the physical side — but only when no Limit
+                // has been absorbed since the previous Sort, since the
+                // inner Sort would otherwise be determining which rows
+                // the Limit keeps.
+                //
+                // The fold is collected here without touching `query`
+                // (apart from non-literal direct Limits, which don't
+                // depend on projection form). Once we know whether every
+                // Sort/Limit was absorbed we can pick the right
+                // projection form and emit `ORDER BY` with or without
+                // unprojection.
+                let mut cur = p.input.as_ref();
+                let mut absorbed_sort: Option<&Sort> = None;
+                let mut combined_skip: usize = 0;
+                let mut combined_fetch: Option<usize> = None;
+                let mut have_combined_limit = false;
+                let mut have_direct_limit = false;
+                let mut have_order_by = false;
+                loop {
+                    match cur {
+                        LogicalPlan::Limit(limit) => {
+                            if have_order_by {
+                                // Limit-below-Sort: `ORDER BY … LIMIT N`
+                                // would apply the sort first, but the
+                                // logical plan applies the Limit first.
+                                break;
+                            }
+                            let skip_lit = limit.get_skip_type()?;
+                            let fetch_lit = limit.get_fetch_type()?;
+                            match (skip_lit, fetch_lit) {
+                                (SkipType::Literal(s), FetchType::Literal(f)) 
=> {
+                                    if have_direct_limit {
+                                        break;
+                                    }
+                                    if have_combined_limit {
+                                        // outer = already-accumulated;
+                                        // inner = this Limit. Same merge
+                                        // rule as the optimizer.
+                                        let (cs, cf) = combine_limit(
+                                            combined_skip,
+                                            combined_fetch,
+                                            s,
+                                            f,
+                                        );
+                                        combined_skip = cs;
+                                        combined_fetch = cf;
+                                    } else {
+                                        combined_skip = s;
+                                        combined_fetch = f;
+                                        have_combined_limit = true;
+                                    }
+                                }
+                                _ => {
+                                    if have_combined_limit || 
have_direct_limit {
+                                        // Cannot safely merge a
+                                        // non-literal Limit with a prior
+                                        // one; let recursion handle it.
+                                        break;
+                                    }
+                                    let Some(query_ref) = query.as_mut() else {
+                                        return internal_err!(
+                                            "Limit operator only valid in a 
statement context."
+                                        );
+                                    };
+                                    if let Some(fetch) = &limit.fetch {
+                                        
query_ref.limit(Some(self.expr_to_sql(fetch)?));
+                                    }
+                                    if let Some(skip) = &limit.skip {
+                                        query_ref.offset(Some(ast::Offset {
+                                            rows: ast::OffsetRows::None,
+                                            value: self.expr_to_sql(skip)?,
+                                        }));
+                                    }
+                                    have_direct_limit = true;
+                                }
+                            }
+                            cur = limit.input.as_ref();
+                        }
+                        LogicalPlan::Sort(sort) => {
+                            if have_order_by {
+                                if sort.fetch.is_some() {
+                                    // Inner Sort with `fetch` acts as
+                                    // Sort+Limit; dropping it would lose
+                                    // the hidden Limit.
+                                    break;
+                                }
+                                // Outer Sort already absorbed; the inner
+                                // Sort is reordered by it and is
+                                // conventionally dropped — matches
+                                // `EnforceSorting` on the physical side.
+                                cur = sort.input.as_ref();
+                                continue;
+                            }
+                            // First Sort. Capture `fetch` as a combined
+                            // Limit (since `Sort { fetch }` is equivalent
+                            // to Sort + Limit). Defer emitting `ORDER BY`
+                            // until we know whether the chain fully
+                            // absorbed.
+                            if let Some(fetch) = sort.fetch
+                                && !have_direct_limit

Review Comment:
   Thanks for catching this. 
   
   Decided to simplify the state machine by splitting the arms for `Sort { 
fetch }` from the plain `Sort` arm:
     - `Sort { fetch }` is logically `Limit(fetch) -> Sort`. The new arm makes 
that explicit: it runs the virtual Limit checks first (Limit-below-Sort, 
can't-combine-with-direct-limit), and only proceeds to
     absorb the Sort if the virtual Limit was absorbable. If either check fails 
we stop without touching anything, so the derived subquery boundary lands at 
the Sort node and both limits survive.
     - `Sort` (no fetch) is the plain case: drop the inner Sort when an outer 
one was already absorbed, matching `EnforceSorting`.
   
     For `Projection -> Limit(non-literal) -> Sort { fetch = 5 } -> Aggregate`, 
the emitted SQL is now:
   
     ```sql
     SELECT __agg_0 AS "..." FROM (
       SELECT max(...) AS __agg_0 FROM ... GROUP BY ... ORDER BY ... LIMIT 5
     ) LIMIT CAST(7 AS INTEGER)
     ```
   
     which preserves both limits. Added a regression test for the exact case 
you described.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to