yonatan-sevenai commented on code in PR #21375:
URL: https://github.com/apache/datafusion/pull/21375#discussion_r3253498940


##########
datafusion/sql/src/unparser/plan.rs:
##########
@@ -605,7 +610,181 @@ impl Unparser<'_> {
                 if self.dialect.unnest_as_lateral_flatten() {
                     Self::collect_flatten_aliases(p.input.as_ref(), select);
                 }
-                self.reconstruct_select_statement(plan, p, select)?;
+                let found_agg = self.reconstruct_select_statement(plan, p, 
select)?;
+
+                // If the Projection claimed an Aggregate by reaching through
+                // one or more Limit / Sort nodes, fold their clauses into the
+                // current query and skip those nodes during recursion.
+                // Otherwise the Limit/Sort arm would see `already_projected`
+                // and wrap everything in a spurious derived subquery, emitting
+                // the aggregate twice.
+                //
+                // Repeated modifiers of the same kind are merged (rather than
+                // breaking out of the loop) so the chain still folds into a
+                // single SQL SELECT regardless of how the inputs were stacked:
+                //   * stacked Limits combine via `combine_limit` semantics —
+                //     the same merge `PushDownLimit` applies in the optimizer
+                //     when both skip/fetch are integer literals;
+                //   * stacked Sorts keep the outermost (top) one, since an
+                //     inner Sort is reordered by the outer one.
+                // If we cannot combine literally (e.g. expression skip/fetch
+                // for the inner Limit), we stop and let recursion handle the
+                // remainder via a derived subquery.
+                if found_agg {
+                    // Walk top-down through Limit / Sort nodes between the
+                    // Projection and the Aggregate, folding each clause into
+                    // the current SQL SELECT. We track combined_* for any
+                    // literal Limits and apply once at the end so repeated
+                    // Limits merge via `combine_limit` (the same rule used by
+                    // the optimizer's `PushDownLimit`). A non-literal Limit
+                    // is set on the query directly; if one is followed by
+                    // another Limit we have no safe way to merge, so the
+                    // loop terminates and recursion handles the remainder.
+                    let mut cur = p.input.as_ref();
+                    let mut combined_skip: usize = 0;
+                    let mut combined_fetch: Option<usize> = None;
+                    let mut have_combined_limit = false;
+                    let mut have_direct_limit = false;
+                    let mut have_order_by = false;
+                    loop {
+                        match cur {
+                            LogicalPlan::Limit(limit) => {

Review Comment:
   Good catch. Reworked the fold to only absorb chains that map cleanly onto 
one SELECT.
   
     The shape that fits a single SELECT (reading outer to inner) is `Limit* 
Sort? Aggregate`: any number of Limits on top (combined via `combine_limit`), 
then at most one
     Sort, then the Aggregate. Anything else needs a subquery boundary.
   
     The algorithm walks down from the Projection through consecutive 
Sort/Limit nodes, greedily absorbing into the outer SELECT and stopping at the 
first node that would
     violate that order. Specifically:
   
     - Limit after we already absorbed a Sort: stop. That's the unsafe 
transition you flagged.
     - Sort with `fetch` after we already absorbed a Sort: stop. The fetch is a 
hidden Limit we can't drop.
     - Two Sorts in a row with no Limit between them: drop the inner one, 
matching what `EnforceSorting` does on the physical side.
   
     If the walk reaches a non-Sort/non-Limit terminator, the whole chain folds 
into one SELECT and the Aggregate is claimed in place (sort exprs unprojected). 
If it stops
     early, the boundary node and everything below it become a derived subquery 
via the normal recursion, and the outer SELECT uses passthrough column refs 
plus raw sort
     exprs that resolve against the subquery's output.
   
     Concretely, `Projection -> Sort(ASC) -> Limit(10) -> Sort(DESC) -> 
Aggregate` now emits:
   
     ```sql
     SELECT __agg_0 AS "..." FROM (
       SELECT max(...) AS __agg_0 FROM ... GROUP BY ... ORDER BY ... DESC LIMIT 
10
     ) ORDER BY __agg_0 ASC
     ```
   
     which matches the logical plan: take the top 10 by DESC, then reorder 
those 10 ASC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to