pepijnve opened a new pull request, #16301: URL: https://github.com/apache/datafusion/pull/16301
## Which issue does this PR close? - Closes #16193 - Closes #15314 - Closes #14036 This PR is an alternative 'operator intrusive' solution based on the work initially done in #16196. This should not be considered for merging until the dust has settled on #16196, but feedback would already be appreciated. ## Rationale for this change [Cancelling tasks in Tokio](https://docs.rs/tokio/latest/tokio/task/index.html#cancellation) is a cooperative endeavor. The documentation on [cooperative scheduling](https://docs.rs/tokio/latest/tokio/task/coop/index.html#cooperative-scheduling) describes a problematic situation when a task polls a stream in a loop and that stream happens to be always ready. In that situation the task effectively becomes a busy loop and will block the Tokio executor thread for an extended period of time. This prevents the task from being cancelled and also prevents other tasks from being scheduled. Since DataFusion queries are executed as Tokio tasks, the above applies to queries as well. The most common example of this type of code in DataFusion are pipeline breaking operators that try to drain their input completely before emitting results themselves. There are however other examples that may loop for extended periods of time. Filters that reject many full batches, the build phase of join operators, sort-merge that takes a long time finding inner join matches, etc. This style of looping in operator implementations causes issues when the operator in question polls from an always ready input stream since the operator then hits a busy loop. This in turn prevents the query from being cancelled. Even if the task is aborted via the JoinHandle of the task, the operator will keep on running until completion. Note that (in my opinion at least) the always ready input stream is not the issue. Viewed in isolation this is a perfectly fine implementation that always returns control in a timely fashion. The problem is the consumer looping in an uncontrolled fashion and not returning control. In order to give the Tokio scheduler the time to do its work operators should ensure they yield to the runtime every now and then. Unfortunately yielding to the runtime means the `poll_next` calls of each stream in a chain needs to return and then be invoked again. This is not a zero cost operation so we should be careful not to yield too often. Additionally the overhead of checking if we should yield and of unwinding and rebuilding the call chain should be kept to a minimum. ### Possible implementations #### Use [`tokio::task::coop::consume_budget`](https://docs.rs/tokio/latest/tokio/task/coop/fn.consume_budget.html) Tokio's `consume_budget` provides a runtime managed cooperative scheduling budget. Operators could use this to insert yield points where necessary. The benefit of this approach is that the tokio runtime manages the scheduling budget. There's very little work to do in each operator. The downside may be (but I did not measure this yet) that the task budget is stored in thread local storage which may make accessing it slower than desired. `consume_budget` is also an async function making it's use in state machine style stream implementations rather cumbersome. #### Implement something similar to `coop::consume_budget` Provide a mechanism similar in spirit to `coop::consume_budget`, but tailored to the needs of this project. This PR implements that approach. Details on the implementation provided below. This is the suggestion made in the current documentation at https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution and there is ad hoc prior art for this in [RepartitionExec](https://github.com/pepijnve/datafusion/blob/cancel_safety/datafusion/physical-plan/src/repartition/mod.rs#L980). The idea would be to make following the guideline as convenient as possible. #### Inject yield points externally PR #16196 takes the approach of injecting yield operators externally using a physical optimizer rule. The rule uses the `EmissionType` of operators to determine where yield points are required. When it determines an operator requires yield points it ensures all leaf nodes under the operator in question periodically return a `Pending` result. This is done either by asking the leaf node to do so or by wrapping the leaf node in an operator that creates a wrapper `Stream`. This is a nice solution because it does not necessarily require cooperation from operator implementations, but there are downsides as well. Describing precisely when yield points are required in a declarative fashion is not trivial. `EmissionType::Final` is a good first heuristic, but for some operators like filter and join build phase this is not sufficient. Determining where yield points are required based on the static ExecutionPlan tree alone is not always sufficient. As an example, a grouped aggregation that spills large amounts of data to disk will emit data from a stream that is created after the original input has been drained. This takes the original yielding wrapper stream out of the picture. If the new stream is then consumed by another 'final' operator, the query becomes uncancellable again. This could be solved by ensuring operators wrap streams they produce themselves as well, but that again requires operator cooperation again which somewhat negates the benefits of this approach. A possible downside to injecting yield at the leaves (but I have not measured this yet) is that in terms of stack unwinding this is the worst case scenario from which to yield. The call stack is by definition the deepest at the leave streams so yielding from there incurs the highest possible cost. Yielding closer to the root could mitigate this. ## What changes are included in this PR? A new `PollBudget` support type is introduced. This is a type safe wrapper around an `Option<u8>` that represents the number of times a single operator is allowed to poll its input (or loop in general) without itself returning a Poll value. Operators that manually implement `Stream` state machines make use of this type by creating a `ConsumeBudget` future on entry to `poll_next`. When polled, this future will return `Ready(())` as many times as the configured poll budget allows. When the budget is depleted it returns `Pending`. In the `Stream::poll_next` implementations, `ready!(consume_budget.poll(cx))` has been inserted at points where the implementation would loop. An (admittedly very amateur) analysis of the optimized [compiler output](https://godbolt.org/#z:OYLghAFBqd5QCxAYwPYBMCmBRdBLAF1QCcAaPECAMzwBtMA7AQwFtMQByARg9KtQYEAysib0QXACx8BBAKoBnTAAUAHpwAMvAFYTStJg1DEArgoKkl9ZATwDKjdAGFUtEywYgArBtKOAMngMmABy7gBGmMQSGgBspAAOqAqEdgwubh7evkkptgKBwWEskdFccZaY1vkMQgRMxAQZ7p4%2BldVpdQ0EhaERUTHx5t3NWW3Djb3FpYMAlJaoJsTI7BxmmADU5uggIFQmBEvsIABiB0cApADMAEIXGgCC61sEOyAJQbvKQdd3j8/bXYKACeDGQuwey1%2B9yeSheb3qCgA1l9XLRdhcAOw3ZSOILAUgbABKmCY6GBWIAItD/nDASBESiQFibsRSeTCS5BJhVBYNso0VSabDNvSosQSLtsMQJcRhTDnvtDmyFLszsqcLzhYrziqMdi6mzWITDaSWNgtZjqbcYbaAExXC5eG5YYh4ABumAgLgSwM5tAEmFmTupjwSJnCL1MNn5aJuJnQwEwBA2LJhGwzG3CCaTBBAGwA8gkatcnCYABzXbCkW1W22PPAsBK0WO0WjxxPJ1PY9OZ8ORqgMDZoBgKdyYAD62c7BAgFztsSsVFmGwAtFWNlyx2wO7nu38Hpmj5uBNvMLuu2nHsebxs2SwmEF8fmlwA6ae5mvX29C78Z38PABML9lGJgxlu44XimV6Hpm96Pgwz6FsWaSlhWVZfoBdaPDCjbNhs6pHBs/DECeo6QTml49n%2BGwEMCCSbAWBzhtBVyUhsEDBjaOE0YOGxJG2EAsAcWxVFQ%2BbfAwpbzrEwkpkIYkYcO6ipguclkQQPIEKWYAcBOVYruuVzYK2tClgptDiSATEECxG4wbeGwPgQyAIKJ lmvvBT5GPuvaORmISBqmbEbiSZLAlAszzH5/lCKgbAQF5iFGCu1zWiZDn%2BTeeBUHemAPt5wDBda7EaL5NFZTeyCqK%2BADuTBIlEnF1Q1k7hMCE5slQnHCpV/m4gw%2BBGDFfVUhsVRwplfU3m%2BSX4sVGxxQlc0%2BauGxcFxB7TbeYXkpFwYVZVAGjdhsE3sdmZAadMLmNGKYAJp4FU6Cmqw5VnRsQTBNE/I/FcTg3KgqiluSQ6vSwpYAJKaSwC0kmOtDaf9nGEoDwP/aDGzSrKVa48Zqk3Itjh49WMUfsm%2BYVphR4raAGxU7WoaAQ2TYto9z3g%2B9R4gXxwS1RAX0DL9Un/WjIOgotBBGhD/3Q/lcOYAjSNOCjGxixjEvYyQJP2XahMKYNJOEuTeb0%2BWhkbhZuVTceVtc5VgtkCNt4m9TlW0/mvjO/%2Bp1HldTO4azkvS8RJAbOztAvVLZr2xmdEMRscuw2lxKKyYiOlqr6tOJjWtysZeMHjFfECbQE7BLyc6HRm6lLhJf1ODJ6lWxh3vKfmTciVymlav9un6cZbsZhb%2BMCm2pZFiW/1W7sSck7Hx45e5VCeflCHzWlKdlTbjmzWvhULW%2BJu9ZV1UtY1xDNfVjVTh1XU9dxH2OWyypDgNQ3ACf53XdXTlMC5bk3yO1fKXcuWkJwmAYB8BgEBqqpWok/W8RpyQbAAAKp3ChAJaXoJxRWKvZBB20Mx7wKslIqRl2JcC/tNZBFJf5%2B19pVVABAEBRHwfjHelUSHrx8inI%2BlEkZbSIcw1hcp6GXUYd/JmDCA4/weLdMCKZIaCFMCkT0nMbZBBYvXEWAMgbizBtHVgUMYYKyVpneYat9EayHHnHWHC9ZE0NgXQeMVQEmwknGARmEgIs3wpzEiicVFmA9JgDRhDMzx02EnMx6dlZZ2sTnTWMptYuOwPKXiQ5QEV1nG3WuYkXyvF2NAr4DdO7yUUq43%2B1U O5qS7rILSOk9KtxoiPEyY8zL/UnqhaeBSQBzzSQvI89AUwsLwAoBa5SNgAColwZMQRmEZTkRIjjPFOARC0xkKBAWidZM5XyrPHHs3MD8i6/wDKgBIQzbxLJVHEhazlXJ5XCmASAWzXxaIODstsYDeQQKgUEWBqg8GcKytgiA4R/6uVSiFDhESiGZkeW5SFADrkIozAWJEEKYXpTReimm7JgSvNgaeI5Jtvll0gdAoFUVqH4qqrIIIJhMB0vpRdelmYJzsIyvCjlx5MX7Tbgi9l6KRXCvEVIoe00uWbzhUI/FArOIHQWf5MVP46yPz6i/JYb80S7F2hFcFdzEa0s1eq6REjZEBxwg6J08ZRxMCoF6BgqAJwPiMPQYMXgmY8yHAoAAjiYBozr3D5jwFcO0bTPoRoXgwdwMyNhxohjhK0HB5jog4F4XgngOBaFIKgTgRIzApgUIsZYmx5xXB4KQPMua03zBRF4WIr5JAaA0OWTEmJyxXEkLETEkhJCYn0JwSQ2bNC8ALRwXgqpfC1q0PMOAsAkBoCbHQKI5BKAro%2BPQaIxAuAdt8DQRGURVQQvHaQcIQQGjAk4NWy9zBiDAgLOEbQmAbC3t4CutgggCwMFoDeutpAsDZmAE4MQtBVTcF4Fgd1wBxCAfwGyGwoTIN5p5G%2Bg4qxq1BE0hmvNtA8DhGINelwWBz1S0bB%2B0gnpiDhGSJgSka8PXPjrfMKgBhgAKAAGpPVqkWRgVH%2BCCBEGIdgUgZCCEUCodQgHdB2n0IYYwxb9CEdVLAZgbA9i6kVtRgYuaI08CivmlCp5OCrgLBOmjbosBqagBp9gmB8A1Go2IZlnA7QaDtKOozVg301AcINUYnguB%2BEGlMfoZREjJFSAIILehcgxYYOFkoAwQu%2BeQwILojQ4tpaqH5zo9RJhBD6ClyLEwmiuBaHocryWZgbQWEsFYEh02cCzaQHNebJ13mLcgdar4O2vjKhAXAhAw6 Vo2rwOd9bSAokkFcV8XgACci2uB2jtFwWIXgI1eC28Ojgo72vnsndOkAs7x1GYzXaMdgHjs1vO/MGjKR7CSCAA%3D) shows that this is a fairly efficient implementation strategy. On entry to `poll_next` the initial counter value is loaded. The child poll call is then invoked. In this example when the child returns pending or an error this is immediately returned without any extra work. Only in the case where the stream would loop is the counter decremented and tested. All combined this makes for a fairly inexpensive yield point. The construct described above is difficult to integrate with `Stream` implementations that use async function based implementations. As an approximation they instead use a wrapper `YieldStream` around their input streams. This `YieldStream` counts the number of times `poll_next` is called and returns `Pending` when the budget is depleted. When `Pending is returned, or the wrapped stream returns Pending or an error itself, the budget is reset. This implementation is a bit less precise and optimal than the state machine variant. Ideally the budget counter is reset whenever the stream making use of the `YieldStream` returns any value, as is the case in the state machine variant, but I have not figured out how this could be implemented yet. ## Are these changes tested? Existing tests pass. Some additional tests for yielding have been added, but not all affected operators are covered already. ## Are there any user-facing changes? There are no breaking API changes. This PR does introduce a new soft requirement for downstream users who implement custom operators. In order to support cooperative cancellation operators that loop for an extended period of time when their input is always ready should make use of the cooperative yielding support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org