pepijnve opened a new pull request, #16301:
URL: https://github.com/apache/datafusion/pull/16301

   ## Which issue does this PR close?
   
   - Closes #16193
   - Closes #15314
   - Closes #14036
   
   This PR is an alternative 'operator intrusive' solution based on the work 
initially done in #16196. This should not be considered for merging until the 
dust has settled on #16196, but feedback would already be appreciated.
   
   ## Rationale for this change
   
   [Cancelling tasks in 
Tokio](https://docs.rs/tokio/latest/tokio/task/index.html#cancellation) is a 
cooperative endeavor. The documentation on [cooperative 
scheduling](https://docs.rs/tokio/latest/tokio/task/coop/index.html#cooperative-scheduling)
 describes a problematic situation when a task polls a stream in a loop and 
that stream happens to be always ready. In that situation the task effectively 
becomes a busy loop and will block the Tokio executor thread for an extended 
period of time. This prevents the task from being cancelled and also prevents 
other tasks from being scheduled.
   
   Since DataFusion queries are executed as Tokio tasks, the above applies to 
queries as well. The most common example of this type of code in DataFusion are 
pipeline breaking operators that try to drain their input completely before 
emitting results themselves. There are however other examples that may loop for 
extended periods of time. Filters that reject many full batches, the build 
phase of join operators, sort-merge that takes a long time finding inner join 
matches, etc.
   
   This style of looping in operator implementations causes issues when the 
operator in question polls from an always ready input stream since the operator 
then hits a busy loop. This in turn prevents the query from being cancelled. 
Even if the task is aborted via the JoinHandle of the task, the operator will 
keep on running until completion.
   Note that (in my opinion at least) the always ready input stream is not the 
issue. Viewed in isolation this is a perfectly fine implementation that always 
returns control in a timely fashion. The problem is the consumer looping in an 
uncontrolled fashion and not returning control.
   
   In order to give the Tokio scheduler the time to do its work operators 
should ensure they yield to the runtime every now and then. Unfortunately 
yielding to the runtime means the `poll_next` calls of each stream in a chain 
needs to return and then be invoked again. This is not a zero cost operation so 
we should be careful not to yield too often. Additionally the overhead of 
checking if we should yield and of unwinding and rebuilding the call chain 
should be kept to a minimum.
   
   ### Possible implementations
   
   #### Use 
[`tokio::task::coop::consume_budget`](https://docs.rs/tokio/latest/tokio/task/coop/fn.consume_budget.html)
   
   Tokio's `consume_budget` provides a runtime managed cooperative scheduling 
budget. Operators could use this to insert yield points where necessary. The 
benefit of this approach is that the tokio runtime manages the scheduling 
budget. There's very little work to do in each operator.
   The downside may be (but I did not measure this yet) that the task budget is 
stored in thread local storage which may make accessing it slower than desired. 
`consume_budget` is also an async function making it's use in state machine 
style stream implementations rather cumbersome.
   
   #### Implement something similar to `coop::consume_budget`
   
   Provide a mechanism similar in spirit to `coop::consume_budget`, but 
tailored to the needs of this project. This PR implements that approach. 
Details on the implementation provided below.
   
   This is the suggestion made in the current documentation at 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution
 and there is ad hoc prior art for this in 
[RepartitionExec](https://github.com/pepijnve/datafusion/blob/cancel_safety/datafusion/physical-plan/src/repartition/mod.rs#L980).
 The idea would be to make following the guideline as convenient as possible.
   
   #### Inject yield points externally
   
   PR #16196 takes the approach of injecting yield operators externally using a 
physical optimizer rule. The rule uses the `EmissionType` of operators to 
determine where yield points are required. When it determines an operator 
requires yield points it ensures all leaf nodes under the operator in question 
periodically return a `Pending` result. This is done either by asking the leaf 
node to do so or by wrapping the leaf node in an operator that creates a 
wrapper `Stream`.
   
   This is a nice solution because it does not necessarily require cooperation 
from operator implementations, but there are downsides as well.
   
   Describing precisely when yield points are required in a declarative fashion 
is not trivial. `EmissionType::Final` is a good first heuristic, but for some 
operators like filter and join build phase this is not sufficient.
   
   Determining where yield points are required based on the static 
ExecutionPlan tree alone is not always sufficient. As an example, a grouped 
aggregation that spills large amounts of data to disk will emit data from a 
stream that is created after the original input has been drained. This takes 
the original yielding wrapper stream out of the picture. If the new stream is 
then consumed by another 'final' operator, the query becomes uncancellable 
again. This could be solved by ensuring operators wrap streams they produce 
themselves as well, but that again requires operator cooperation again which 
somewhat negates the benefits of this approach.
   
   A possible downside to injecting yield at the leaves (but I have not 
measured this yet) is that in terms of stack unwinding this is the worst case 
scenario from which to yield. The call stack is by definition the deepest at 
the leave streams so yielding from there incurs the highest possible cost. 
Yielding closer to the root could mitigate this.
   
   ## What changes are included in this PR?
   
   A new `PollBudget` support type is introduced. This is a type safe wrapper 
around an `Option<u8>` that represents the number of times a single operator is 
allowed to poll its input (or loop in general) without itself returning a Poll 
value.
   
   Operators that manually implement `Stream` state machines make use of this 
type by creating a `ConsumeBudget` future on entry to `poll_next`. When polled, 
this future will return `Ready(())` as many times as the configured poll budget 
allows. When the budget is depleted it returns `Pending`. In the 
`Stream::poll_next` implementations, `ready!(consume_budget.poll(cx))` has been 
inserted at points where the implementation would loop.
   
   An (admittedly very amateur) analysis of the optimized [compiler 
output](https://godbolt.org/#z:OYLghAFBqd5QCxAYwPYBMCmBRdBLAF1QCcAaPECAMzwBtMA7AQwFtMQByARg9KtQYEAysib0QXACx8BBAKoBnTAAUAHpwAMvAFYTStJg1DEArgoKkl9ZATwDKjdAGFUtEywYgArBtKOAMngMmABy7gBGmMQSGgBspAAOqAqEdgwubh7evkkptgKBwWEskdFccZaY1vkMQgRMxAQZ7p4%2BldVpdQ0EhaERUTHx5t3NWW3Djb3FpYMAlJaoJsTI7BxmmADU5uggIFQmBEvsIABiB0cApADMAEIXGgCC61sEOyAJQbvKQdd3j8/bXYKACeDGQuwey1%2B9yeSheb3qCgA1l9XLRdhcAOw3ZSOILAUgbABKmCY6GBWIAItD/nDASBESiQFibsRSeTCS5BJhVBYNso0VSabDNvSosQSLtsMQJcRhTDnvtDmyFLszsqcLzhYrziqMdi6mzWITDaSWNgtZjqbcYbaAExXC5eG5YYh4ABumAgLgSwM5tAEmFmTupjwSJnCL1MNn5aJuJnQwEwBA2LJhGwzG3CCaTBBAGwA8gkatcnCYABzXbCkW1W22PPAsBK0WO0WjxxPJ1PY9OZ8ORqgMDZoBgKdyYAD62c7BAgFztsSsVFmGwAtFWNlyx2wO7nu38Hpmj5uBNvMLuu2nHsebxs2SwmEF8fmlwA6ae5mvX29C78Z38PABML9lGJgxlu44XimV6Hpm96Pgwz6FsWaSlhWVZfoBdaPDCjbNhs6pHBs/DECeo6QTml49n%2BGwEMCCSbAWBzhtBVyUhsEDBjaOE0YOGxJG2EAsAcWxVFQ%2BbfAwpbzrEwkpkIYkYcO6ipguclkQQPIEKWYAcBOVYruuVzYK2tClgptDiSATEECxG4wbeGwPgQyAIKJ
 
lmvvBT5GPuvaORmISBqmbEbiSZLAlAszzH5/lCKgbAQF5iFGCu1zWiZDn%2BTeeBUHemAPt5wDBda7EaL5NFZTeyCqK%2BADuTBIlEnF1Q1k7hMCE5slQnHCpV/m4gw%2BBGDFfVUhsVRwplfU3m%2BSX4sVGxxQlc0%2BauGxcFxB7TbeYXkpFwYVZVAGjdhsE3sdmZAadMLmNGKYAJp4FU6Cmqw5VnRsQTBNE/I/FcTg3KgqiluSQ6vSwpYAJKaSwC0kmOtDaf9nGEoDwP/aDGzSrKVa48Zqk3Itjh49WMUfsm%2BYVphR4raAGxU7WoaAQ2TYto9z3g%2B9R4gXxwS1RAX0DL9Un/WjIOgotBBGhD/3Q/lcOYAjSNOCjGxixjEvYyQJP2XahMKYNJOEuTeb0%2BWhkbhZuVTceVtc5VgtkCNt4m9TlW0/mvjO/%2Bp1HldTO4azkvS8RJAbOztAvVLZr2xmdEMRscuw2lxKKyYiOlqr6tOJjWtysZeMHjFfECbQE7BLyc6HRm6lLhJf1ODJ6lWxh3vKfmTciVymlav9un6cZbsZhb%2BMCm2pZFiW/1W7sSck7Hx45e5VCeflCHzWlKdlTbjmzWvhULW%2BJu9ZV1UtY1xDNfVjVTh1XU9dxH2OWyypDgNQ3ACf53XdXTlMC5bk3yO1fKXcuWkJwmAYB8BgEBqqpWok/W8RpyQbAAAKp3ChAJaXoJxRWKvZBB20Mx7wKslIqRl2JcC/tNZBFJf5%2B19pVVABAEBRHwfjHelUSHrx8inI%2BlEkZbSIcw1hcp6GXUYd/JmDCA4/weLdMCKZIaCFMCkT0nMbZBBYvXEWAMgbizBtHVgUMYYKyVpneYat9EayHHnHWHC9ZE0NgXQeMVQEmwknGARmEgIs3wpzEiicVFmA9JgDRhDMzx02EnMx6dlZZ2sTnTWMptYuOwPKXiQ5QEV1nG3WuYkXyvF2NAr4DdO7yUUq43%2B1U
 
O5qS7rILSOk9KtxoiPEyY8zL/UnqhaeBSQBzzSQvI89AUwsLwAoBa5SNgAColwZMQRmEZTkRIjjPFOARC0xkKBAWidZM5XyrPHHs3MD8i6/wDKgBIQzbxLJVHEhazlXJ5XCmASAWzXxaIODstsYDeQQKgUEWBqg8GcKytgiA4R/6uVSiFDhESiGZkeW5SFADrkIozAWJEEKYXpTReimm7JgSvNgaeI5Jtvll0gdAoFUVqH4qqrIIIJhMB0vpRdelmYJzsIyvCjlx5MX7Tbgi9l6KRXCvEVIoe00uWbzhUI/FArOIHQWf5MVP46yPz6i/JYb80S7F2hFcFdzEa0s1eq6REjZEBxwg6J08ZRxMCoF6BgqAJwPiMPQYMXgmY8yHAoAAjiYBozr3D5jwFcO0bTPoRoXgwdwMyNhxohjhK0HB5jog4F4XgngOBaFIKgTgRIzApgUIsZYmx5xXB4KQPMua03zBRF4WIr5JAaA0OWTEmJyxXEkLETEkhJCYn0JwSQ2bNC8ALRwXgqpfC1q0PMOAsAkBoCbHQKI5BKAro%2BPQaIxAuAdt8DQRGURVQQvHaQcIQQGjAk4NWy9zBiDAgLOEbQmAbC3t4CutgggCwMFoDeutpAsDZmAE4MQtBVTcF4Fgd1wBxCAfwGyGwoTIN5p5G%2Bg4qxq1BE0hmvNtA8DhGINelwWBz1S0bB%2B0gnpiDhGSJgSka8PXPjrfMKgBhgAKAAGpPVqkWRgVH%2BCCBEGIdgUgZCCEUCodQgHdB2n0IYYwxb9CEdVLAZgbA9i6kVtRgYuaI08CivmlCp5OCrgLBOmjbosBqagBp9gmB8A1Go2IZlnA7QaDtKOozVg301AcINUYnguB%2BEGlMfoZREjJFSAIILehcgxYYOFkoAwQu%2BeQwILojQ4tpaqH5zo9RJhBD6ClyLEwmiuBaHocryWZgbQWEsFYEh02cCzaQHNebJ13mLcgdar4O2vjKhAXAhAw6
 
Vo2rwOd9bSAokkFcV8XgACci2uB2jtFwWIXgI1eC28Ojgo72vnsndOkAs7x1GYzXaMdgHjs1vO/MGjKR7CSCAA%3D)
 shows that this is a fairly efficient implementation strategy. On entry to 
`poll_next` the initial counter value is loaded. The child poll call is then 
invoked. In this example when the child returns pending or an error this is 
immediately returned without any extra work. Only in the case where the stream 
would loop is the counter decremented and tested. All combined this makes for a 
fairly inexpensive yield point.
   
   The construct described above is difficult to integrate with `Stream` 
implementations that use async function based implementations. As an 
approximation they instead use a wrapper `YieldStream` around their input 
streams. This `YieldStream` counts the number of times `poll_next` is called 
and returns `Pending` when the budget is depleted. When `Pending is returned, 
or the wrapped stream returns Pending or an error itself, the budget is reset.
   This implementation is a bit less precise and optimal than the state machine 
variant. Ideally the budget counter is reset whenever the stream making use of 
the `YieldStream` returns any value, as is the case in the state machine 
variant, but I have not figured out how this could be implemented yet.
   
   ## Are these changes tested?
   
   Existing tests pass. Some additional tests for yielding have been added, but 
not all affected operators are covered already.
   
   ## Are there any user-facing changes?
   
   There are no breaking API changes. This PR does introduce a new soft 
requirement for downstream users who implement custom operators. In order to 
support cooperative cancellation operators that loop for an extended period of 
time when their input is always ready should make use of the cooperative 
yielding support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to