pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137478983


##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -216,36 +212,50 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
         // Once all partitions have set their corresponding cursors for the 
loser tree,
         // we skip the following block. Until then, this function may be 
called multiple
         // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
         if self.loser_tree.is_empty() {
-            while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+            // Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+            let mut idx = 0;
+            while idx < self.uninitiated_partitions.len() {

Review Comment:
   I think you came to the same conclusion in the meantime, but I had typed 
this already. Just FYI for anyone following along.
   
   `SortPreservingMergeStream` has one `CursorStream` with `n` partitions. 
`SortPreservingMergeStream` has a single `uninitiated_partitions: Vec`; not one 
per partition. This vec contains the indices of the partitions, so `[0, 1, 2, 
3, ..., n]`.
   
   `SortPreservingMergeStream::poll_next` has two states: waiting for all 
partitions ready (wait for short), and merge. The state transition condition 
from wait to merge is that all partitions have been polled and returned ready 
exactly once. Additionally you want to ensure you poll the partitions 
round-robin.
   
   So what the wait state does is iterate over every remaining partition index 
in `uninitiated_partitions` once. That gives you the round-robin property. It 
calls poll_next for the partition in question and if that returns ready it 
removes  the partition index from `uninitiated_partitions`. If it returns 
pending the index is retained. Then we move on to the next partition. At the 
end of the loop you check if `uninitiated_partitions` is empty. If so, go to 
merge state, if not stay in wait and return pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to