zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963178882

   > > But most other operators (Projection, Filter, HashAggregate, HashJoin, 
WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
   > > That means, we still need to insert explicit yield points 
YieldStream/PollBudget to avoid starving the thread.
   > 
   > You're indeed 100% dependent on your child streams which is what makes the 
current solution somewhat brittle. If that happens to use a Receiver (or some 
other implementation that consumes budget) it will work. If it's some other 
stream that does not you may have issues again. Because the sources are user 
definable, I think it's wise to take a defensive stance in the implementation 
of operators and assume you don't know what they will or will not do. The 
current implementation attempts to fix this by ensuring the sources have yield 
points. That breaks when streams are swapped dynamically because you no longer 
have a way to ensure they contain the necessary yield points. This is a point 
the DataFusion library cannot currently intercept. The current implementation 
with the non-task wise budge also breaks when an intermediate operator uses 
`select!` (or something similar where you read from whatever stream happens to 
be ready) since this can obscure the Pending result from a stream.
  There's no way to guarantee that Pending bubbles all the way up.
   > 
   > > I believe no major difference for it? Please correct me if i am wrong.
   > 
   > The point of contention was where you put these yield points. Do you 
instrument all leave nodes, or do you instrument consumers that may refuse to 
yield. To make the system robust I really think you need to do this in the 
consumers. It's also beneficial for locality of reasoning. You can look at the 
implementation of an operator and assess that it's correct from a cooperative 
scheduling point of view without having to look at any other code. The 
objection was that there are many, many operators out there in the wild 
downstream of DataFusion. That's one that I do not have an answer for. How many 
people are building custom pipeline blocking operators?
   > 
   > It's important to note that you would only need to take action in 
operators where you can see from the implementation that it may not return 
_any_ value, either Ready or Pending, relatively quickly. That's basically 
anything that loops over input streams an unbounded number of times.
   > 
   > * Project (or any other simple transformation operator) doesn't need to do 
anything since it takes one record batch in and immediately emits another one.
   > * Table scans shouldn't either. They'll yield naturally if their input is 
not ready, and otherwise they'll return a RecordBatch.
   > * Filter in theory should not do anything, the exception being dropping 
lots of batches entirely.
   > * Joins depends. A build/probe style implementation probably should 
consume during build, not during probe. But it depends on the implementation.
   > * Aggregation and sorting do need to consume since those can block for an 
extended period time.
   
   Thank you, i may got your point,  i was thinking optimize the rule, is it a 
similar point?
   
   ```rust
    // traverse all nodes, not just leaves
           plan.transform_down(|plan| {
               // wrap if leaf OR long-running
               if plan.children().is_empty() || is_long_running(plan.as_ref()) {
                   // use existing cooperative variant if available
                   let wrapped = plan
                       .clone()
                       .with_cooperative_yields()
                       .unwrap_or_else(|| 
Arc::new(YieldStreamExec::new(Arc::clone(&plan), yield_period)));
                   Ok(Transformed::new(wrapped, true, TreeNodeRecursion::Jump))
               } else {
                   Ok(Transformed::no(plan))
               }
           })
           .map(|t| t.data)
   ```
   
   1. Leaf-only wrapping can be bypassed if someone plugs in a custom Stream or 
uses select!‑style combinators.
   2. By also wrapping every consumer that does heavy looping—aggregations, 
sorts, joins, window funcs—you guarantee that no matter how the streams are 
composed, there’s always an explicit YieldStreamExec (or the built‑in 
cooperative variant) in the path. (This can be optimized to PollBudget if 
possible)
   3. We still avoid unnecessary overhead on “simple” operators like Projection 
or basic TableScan, because they’re neither leaves with no loops nor in your 
“long‑running” list.
   
   
   Thanks!
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to