zhuqi-lucas commented on code in PR #16641: URL: https://github.com/apache/datafusion/pull/16641#discussion_r2181487323
########## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ########## @@ -216,7 +218,25 @@ fn pushdown_sorts_helper( fn pushdown_requirement_to_children( plan: &Arc<dyn ExecutionPlan>, parent_required: OrderingRequirements, + parent_fetch: Option<usize>, ) -> Result<Option<Vec<Option<OrderingRequirements>>>> { + // Only attempt to push down TopK when there is an upstream LIMIT + if parent_fetch.is_some() { + // 1) Never push a new TopK below an operator that already has its own fetch + if plan.fetch().is_some() { + return Ok(None); + } + // 2) Only allow pushdown through operators that do not increase row count + // (equal cardinality). Any other operator (including joins, filter, + // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. + let effect = plan.cardinality_effect(); + if !matches!(effect, CardinalityEffect::Equal) { + return Ok(None); + } + // At this point, only single-input, non-expanding operators + // such as ProjectionExec, CoalesceBatchesExec, are allowed to receive TopK. + } Review Comment: After testing, this is not enough, for example, the unionexec support limit push down, but the topk should not pushdown through UnionExec. ```rust // If there is a limit on the parent plan we cannot push it down through operators that change the cardinality. // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2. // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct). if parent_fetch.is_some() { if !plan.supports_limit_pushdown() { return Ok(None); } // Note: we still need to check the cardinality effect of the plan here, because the // limit pushdown is not always safe, even if the plan supports it. Here's an example: // // UnionExec advertises `supports_limit_pushdown() == true` because it can // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately // on each branch before merging them together. // // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up // all child row counts), so pushing a global TopK/LIMIT through it would // break the semantics of “take the first k rows of the combined result.” // // For example, with two branches A and B and k = 3: // — Global LIMIT: take the first 3 rows from (A ∪ B) after merging. // — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows! // // That’s why we still block on cardinality: even though UnionExec can // push a LIMIT to its children, its GreaterEqual effect means it cannot // preserve the global TopK semantics. match plan.cardinality_effect() { CardinalityEffect::Equal => { // safe: only true sources (e.g. CoalesceBatchesExec, ProjectionExec) pass } _ => return Ok(None), } } ``` Change the code to above @adriangb , thank you ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org