zhuqi-lucas commented on code in PR #16641:
URL: https://github.com/apache/datafusion/pull/16641#discussion_r2181487323


##########
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs:
##########
@@ -216,7 +218,25 @@ fn pushdown_sorts_helper(
 fn pushdown_requirement_to_children(
     plan: &Arc<dyn ExecutionPlan>,
     parent_required: OrderingRequirements,
+    parent_fetch: Option<usize>,
 ) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
+    // Only attempt to push down TopK when there is an upstream LIMIT
+    if parent_fetch.is_some() {
+        // 1) Never push a new TopK below an operator that already has its own 
fetch
+        if plan.fetch().is_some() {
+            return Ok(None);
+        }
+        // 2) Only allow pushdown through operators that do not increase row 
count
+        //    (equal cardinality). Any other operator (including joins, filter,
+        //    sort-with-limit, or UDTFs that may expand rows) must stop the 
pushdown.
+        let effect = plan.cardinality_effect();
+        if !matches!(effect, CardinalityEffect::Equal) {
+            return Ok(None);
+        }
+        // At this point, only single-input, non-expanding operators
+        // such as ProjectionExec, CoalesceBatchesExec, are allowed to receive 
TopK.
+    }

Review Comment:
   After testing,  this is not enough, for example, the unionexec support limit 
push down, but the topk should not pushdown through UnionExec.
   ```rust
    // If there is a limit on the parent plan we cannot push it down through 
operators that change the cardinality.
       // E.g. consider if LIMIT 2 is applied below a FilteExec that filters 
out 1/2 of the rows we'll end up with 1 row instead of 2.
       // If the LIMIT is applied after the FilterExec and the FilterExec 
returns > 2 rows we'll end up with 2 rows (correct).
       if parent_fetch.is_some() {
           if !plan.supports_limit_pushdown() {
               return Ok(None);
           }
   
           // Note: we still need to check the cardinality effect of the plan 
here, because the
           // limit pushdown is not always safe, even if the plan supports it. 
Here's an example:
           //
           // UnionExec advertises `supports_limit_pushdown() == true` because 
it can
           // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” 
separately
           // on each branch before merging them together.
           //
           // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it 
sums up
           // all child row counts), so pushing a global TopK/LIMIT through it 
would
           // break the semantics of “take the first k rows of the combined 
result.”
           //
           // For example, with two branches A and B and k = 3:
           //   — Global LIMIT: take the first 3 rows from (A ∪ B) after 
merging.
           //   — Pushed down: take 3 from A, 3 from B, then merge → up to 6 
rows!
           //
           // That’s why we still block on cardinality: even though UnionExec 
can
           // push a LIMIT to its children, its GreaterEqual effect means it 
cannot
           // preserve the global TopK semantics.
           match plan.cardinality_effect() {
               CardinalityEffect::Equal => {
                   // safe: only true sources (e.g. CoalesceBatchesExec, 
ProjectionExec) pass
               }
               _ => return Ok(None),
           }
       }
   ```
   
   Change the code to above @adriangb , thank you !
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to