alamb commented on code in PR #15562:
URL: https://github.com/apache/datafusion/pull/15562#discussion_r2027641047


##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -241,10 +239,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
                     _ => {
                         // If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
                         // we remove this partition from the queue so it is 
not polled again.
-                        self.uninitiated_partitions.retain(|idx| *idx != i);
+                        self.uninitiated_partitions.pop_front();
                     }
                 }
             }
+
+            // Claim the memory for the uninitiated partitions

Review Comment:
   It might make more sense to simple reset the length of the fixed size queue 
back so it is full and the partitions are polled again next itme



##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -533,3 +534,134 @@ impl<C: CursorValues + Unpin> RecordBatchStream for 
SortPreservingMergeStream<C>
         Arc::clone(self.in_progress.schema())
     }
 }
+
+/// Fixed size queue implemented as a circular buffer
+/// The underlying `values` are immutable, so removing elements will not drop 
them and not change the
+/// capacity/length of the actual vector
+#[derive(Debug, Default)]
+struct FixedSizeQueue {
+    values: Vec<usize>,
+    start: usize,
+    end: usize,
+    len: usize,
+}
+
+impl From<Vec<usize>> for FixedSizeQueue {
+    fn from(values: Vec<usize>) -> Self {
+        let len = values.len();
+
+        Self {
+            values,
+            start: 0,
+            end: len.saturating_sub(1),
+            len,
+        }
+    }
+}
+
+impl FixedSizeQueue {
+    /// Get the value at the top of the queue
+    ///
+    /// # Implementation
+    /// return the value at [`Self::start`]
+    ///
+    /// ## Example
+    ///
+    /// ```plain
+    /// index: | 0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9  |
+    ///        | -------------------------------------------------------- |
+    /// value: | 0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9  |

Review Comment:
   I would find these examples easier to follow if `index` and `value` didn't 
have the same values. maybe something like
   
   ```suggestion
       /// value: | 9  |  8  |  7  |  5  |  5  |  4  |  3  |  2  |  1  |  0  |
   ```



##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -241,10 +239,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
                     _ => {
                         // If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
                         // we remove this partition from the queue so it is 
not polled again.
-                        self.uninitiated_partitions.retain(|idx| *idx != i);
+                        self.uninitiated_partitions.pop_front();
                     }
                 }
             }
+
+            // Claim the memory for the uninitiated partitions

Review Comment:
   This effectively resets the state of `uninitiated_partitions` to an empty 
FixedSizeQueue as I understand it
   
   Doesn't that mean after the first time the loser tree is empty it will never 
enter into the loop above again to start polling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to