pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137478983
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -216,36 +212,50 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// Once all partitions have set their corresponding cursors for the
loser tree,
// we skip the following block. Until then, this function may be
called multiple
// times and can return Poll::Pending if any partition returns
Poll::Pending.
+
if self.loser_tree.is_empty() {
- while let Some(&partition_idx) =
self.uninitiated_partitions.front() {
+ // Manual indexing since we're iterating over the vector and
shrinking it in the loop
+ let mut idx = 0;
+ while idx < self.uninitiated_partitions.len() {
Review Comment:
I think you came to the same conclusion in the meantime, but I had typed
this already. Just FYI for anyone following along.
`SortPreservingMergeStream` has one `CursorStream` with `n` partitions.
`SortPreservingMergeStream` has a single `uninitiated_partitions: Vec`; not one
per partition. This vec contains the indices of the partitions, so `[0, 1, 2,
3, ..., n]`.
`SortPreservingMergeStream::poll_next` has two states: waiting for all
partitions ready (wait for short), and merge. The state transition condition
from wait to merge is that all partitions have been polled and returned ready
exactly once. Additionally you want to ensure you poll the partitions
round-robin.
So what the wait state does is iterate over every remaining partition index
in `uninitiated_partitions` once. That gives you the round-robin property. It
calls poll_next for the partition in question and if that returns ready it
removes the partition index from `uninitiated_partitions`. If it returns
pending the index is retained. Then we move on to the next partition. At the
end of the loop you check if `uninitiated_partitions` is empty. If so, go to
merge state, if not stay in wait and return pending.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]