pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963029503

   > But most other operators (Projection, Filter, HashAggregate, HashJoin, 
WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
   > That means, we still need to insert explicit yield points 
YieldStream/PollBudget to avoid starving the thread.
   
   You're indeed 100% dependent on your child streams which is what makes the 
current solution somewhat brittle. If that happens to use a Receiver (or some 
other implementation that consumes budget) it will work. If it's some other 
stream that does not you may have issues again. Because the sources are user 
definable, I think it's wise to take a defensive stance in the implementation 
of operators and assume you don't know what they will or will not do.
   The current implementation attempts to fix this by ensuring the sources have 
yield points. That breaks when streams are swapped dynamically because you no 
longer have a way to ensure they contain the necessary yield points. This is a 
point the DataFusion library cannot currently intercept.
   The current implementation with the non-task wise budge also breaks when an 
intermediate operator uses `select!` (or something similar where you read from 
whatever stream happens to be ready) since this can obscure the Pending result 
from a stream. There's no way to guarantee that Pending bubbles all the way up.
   
   > I believe no major difference for it? Please correct me if i am wrong.
   
   The point of contention was where you put these yield points. Do you 
instrument all leave nodes, or do you instrument consumers that may refuse to 
yield. To make the system robust I really think you need to do this in the 
consumers. It's also beneficial for locality of reasoning. You can look at the 
implementation of an operator and assess that it's correct from a cooperative 
scheduling point of view without having to look at any other code.
   The objection was that there are many, many operators out there in the wild 
downstream of DataFusion. That's one that I do not have an answer for. How many 
people are building custom pipeline blocking operators?
   
   It's important to note that you would only need to take action in operators 
where you can see from the implementation that it may not return _any_ value, 
either Ready or Pending, relatively quickly. That's basically anything that 
loops over input streams an unbounded number of times.
   - Project (or any other simple transformation operator) doesn't need to do 
anything since it takes one record batch in and immediately emits another one.
   - Table scans shouldn't either. They'll yield naturally if their input is 
not ready, and otherwise they'll return a RecordBatch.
   - Filter in theory should not do anything, the exception being dropping lots 
of batches entirely.
   - Joins depends. A build/probe style implementation probably should consume 
during build, not during probe. But it depends on the implementation.
   - Aggregation and sorting do need to consume since those can block for an 
extended period time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to