kosiew commented on code in PR #21375:
URL: https://github.com/apache/datafusion/pull/21375#discussion_r3254002175


##########
datafusion/sql/src/unparser/plan.rs:
##########
@@ -680,8 +711,219 @@ impl Unparser<'_> {
                 if self.dialect.unnest_as_lateral_flatten() {
                     Self::collect_flatten_aliases(p.input.as_ref(), select);
                 }
-                self.reconstruct_select_statement(plan, p, select)?;
-                self.select_to_sql_recursively(p.input.as_ref(), query, 
select, relation)
+                // Walk down through consecutive Sort/Limit nodes, greedily
+                // absorbing what can be folded into the SELECT we're
+                // building around the Aggregate. A single SQL SELECT can
+                // carry at most one `ORDER BY` (applied before `LIMIT`),
+                // so the safe shape between us and the Aggregate is
+                // `Limit* Sort?` (outer→inner). We stop at the first node
+                // that would violate this; that node becomes the
+                // subquery boundary, and recursion (seeing
+                // `already_projected = true`) wraps it in a derived
+                // relation. If we walk all the way to a non-Sort/non-Limit
+                // terminator, the entire chain folds into one SELECT.
+                //
+                // Stacked Sorts with nothing between them collapse to the
+                // outermost — the same simplification `EnforceSorting`
+                // applies on the physical side — but only when no Limit
+                // has been absorbed since the previous Sort, since the
+                // inner Sort would otherwise be determining which rows
+                // the Limit keeps.
+                //
+                // The fold is collected here without touching `query`
+                // (apart from non-literal direct Limits, which don't
+                // depend on projection form). Once we know whether every
+                // Sort/Limit was absorbed we can pick the right
+                // projection form and emit `ORDER BY` with or without
+                // unprojection.
+                let mut cur = p.input.as_ref();
+                let mut absorbed_sort: Option<&Sort> = None;
+                let mut combined_skip: usize = 0;
+                let mut combined_fetch: Option<usize> = None;
+                let mut have_combined_limit = false;
+                let mut have_direct_limit = false;
+                let mut have_order_by = false;
+                loop {
+                    match cur {
+                        LogicalPlan::Limit(limit) => {
+                            if have_order_by {
+                                // Limit-below-Sort: `ORDER BY … LIMIT N`
+                                // would apply the sort first, but the
+                                // logical plan applies the Limit first.
+                                break;
+                            }
+                            let skip_lit = limit.get_skip_type()?;
+                            let fetch_lit = limit.get_fetch_type()?;
+                            match (skip_lit, fetch_lit) {
+                                (SkipType::Literal(s), FetchType::Literal(f)) 
=> {
+                                    if have_direct_limit {
+                                        break;
+                                    }
+                                    if have_combined_limit {
+                                        // outer = already-accumulated;
+                                        // inner = this Limit. Same merge
+                                        // rule as the optimizer.
+                                        let (cs, cf) = combine_limit(
+                                            combined_skip,
+                                            combined_fetch,
+                                            s,
+                                            f,
+                                        );
+                                        combined_skip = cs;
+                                        combined_fetch = cf;
+                                    } else {
+                                        combined_skip = s;
+                                        combined_fetch = f;
+                                        have_combined_limit = true;
+                                    }
+                                }
+                                _ => {
+                                    if have_combined_limit || 
have_direct_limit {
+                                        // Cannot safely merge a
+                                        // non-literal Limit with a prior
+                                        // one; let recursion handle it.
+                                        break;
+                                    }
+                                    let Some(query_ref) = query.as_mut() else {
+                                        return internal_err!(
+                                            "Limit operator only valid in a 
statement context."
+                                        );
+                                    };
+                                    if let Some(fetch) = &limit.fetch {
+                                        
query_ref.limit(Some(self.expr_to_sql(fetch)?));
+                                    }
+                                    if let Some(skip) = &limit.skip {
+                                        query_ref.offset(Some(ast::Offset {
+                                            rows: ast::OffsetRows::None,
+                                            value: self.expr_to_sql(skip)?,
+                                        }));
+                                    }
+                                    have_direct_limit = true;
+                                }
+                            }
+                            cur = limit.input.as_ref();
+                        }
+                        LogicalPlan::Sort(sort) => {
+                            if have_order_by {
+                                if sort.fetch.is_some() {
+                                    // Inner Sort with `fetch` acts as
+                                    // Sort+Limit; dropping it would lose
+                                    // the hidden Limit.
+                                    break;
+                                }
+                                // Outer Sort already absorbed; the inner
+                                // Sort is reordered by it and is
+                                // conventionally dropped — matches
+                                // `EnforceSorting` on the physical side.
+                                cur = sort.input.as_ref();
+                                continue;
+                            }
+                            // First Sort. Capture `fetch` as a combined
+                            // Limit (since `Sort { fetch }` is equivalent
+                            // to Sort + Limit). Defer emitting `ORDER BY`
+                            // until we know whether the chain fully
+                            // absorbed.
+                            if let Some(fetch) = sort.fetch
+                                && !have_direct_limit

Review Comment:
   I think there is still one unsafe case here.
   
   If we have a non-literal outer `Limit` followed by an inner `Sort { fetch 
}`, the inner fetch can still be dropped. In the `Sort` arm, `sort.fetch` is 
only folded when `!have_direct_limit`. When the walk has already absorbed a 
direct non-literal `Limit`, this condition skips the `Sort.fetch`, but the code 
still absorbs the `Sort` and continues recursing.
   
   For example:
   
   ```text
   Projection -> Limit(<non-literal>) -> Sort(fetch = 5) -> Aggregate
   ```
   
   The logical semantics are: sort the aggregate output, keep 5 rows, then 
apply the outer limit. The emitted SQL appears to preserve only the outer 
non-literal limit, which drops the top-k limit from `Sort.fetch`.
   
   Since these two limits cannot be safely combined, I think we should stop 
here and introduce a derived subquery boundary, or otherwise make sure both 
limits are preserved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to