pepijnve commented on issue #16353: URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963029503
> But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. > That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread. You're indeed 100% dependent on your child streams which is what makes the current solution somewhat brittle. If that happens to use a Receiver (or some other implementation that consumes budget) it will work. If it's some other stream that does not you may have issues again. Because the sources are user definable, I think it's wise to take a defensive stance in the implementation of operators and assume you don't know what they will or will not do. The current implementation attempts to fix this by ensuring the sources have yield points. That breaks when streams are swapped dynamically because you no longer have a way to ensure they contain the necessary yield points. This is a point the DataFusion library cannot currently intercept. The current implementation with the non-task wise budge also breaks when an intermediate operator uses `select!` (or something similar where you read from whatever stream happens to be ready) since this can obscure the Pending result from a stream. There's no way to guarantee that Pending bubbles all the way up. > I believe no major difference for it? Please correct me if i am wrong. The point of contention was where you put these yield points. Do you instrument all leave nodes, or do you instrument consumers that may refuse to yield. To make the system robust I really think you need to do this in the consumers. It's also beneficial for locality of reasoning. You can look at the implementation of an operator and assess that it's correct from a cooperative scheduling point of view without having to look at any other code. The objection was that there are many, many operators out there in the wild downstream of DataFusion. That's one that I do not have an answer for. How many people are building custom pipeline blocking operators? It's important to note that you would only need to take action in operators where you can see from the implementation that it may not return _any_ value, either Ready or Pending, relatively quickly. That's basically anything that loops over input streams an unbounded number of times. - Project (or any other simple transformation operator) doesn't need to do anything since it takes one record batch in and immediately emits another one. - Table scans shouldn't either. They'll yield naturally if their input is not ready, and otherwise they'll return a RecordBatch. - Filter in theory should not do anything, the exception being dropping lots of batches entirely. - Joins depends. A build/probe style implementation probably should consume during build, not during probe. But it depends on the implementation. - Aggregation and sorting do need to consume since those can block for an extended period time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org