alamb commented on code in PR #15562: URL: https://github.com/apache/datafusion/pull/15562#discussion_r2027641047
########## datafusion/physical-plan/src/sorts/merge.rs: ########## @@ -241,10 +239,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { _ => { // If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None), // we remove this partition from the queue so it is not polled again. - self.uninitiated_partitions.retain(|idx| *idx != i); + self.uninitiated_partitions.pop_front(); } } } + + // Claim the memory for the uninitiated partitions Review Comment: It might make more sense to simple reset the length of the fixed size queue back so it is full and the partitions are polled again next itme ########## datafusion/physical-plan/src/sorts/merge.rs: ########## @@ -533,3 +534,134 @@ impl<C: CursorValues + Unpin> RecordBatchStream for SortPreservingMergeStream<C> Arc::clone(self.in_progress.schema()) } } + +/// Fixed size queue implemented as a circular buffer +/// The underlying `values` are immutable, so removing elements will not drop them and not change the +/// capacity/length of the actual vector +#[derive(Debug, Default)] +struct FixedSizeQueue { + values: Vec<usize>, + start: usize, + end: usize, + len: usize, +} + +impl From<Vec<usize>> for FixedSizeQueue { + fn from(values: Vec<usize>) -> Self { + let len = values.len(); + + Self { + values, + start: 0, + end: len.saturating_sub(1), + len, + } + } +} + +impl FixedSizeQueue { + /// Get the value at the top of the queue + /// + /// # Implementation + /// return the value at [`Self::start`] + /// + /// ## Example + /// + /// ```plain + /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | Review Comment: I would find these examples easier to follow if `index` and `value` didn't have the same values. maybe something like ```suggestion /// value: | 9 | 8 | 7 | 5 | 5 | 4 | 3 | 2 | 1 | 0 | ``` ########## datafusion/physical-plan/src/sorts/merge.rs: ########## @@ -241,10 +239,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { _ => { // If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None), // we remove this partition from the queue so it is not polled again. - self.uninitiated_partitions.retain(|idx| *idx != i); + self.uninitiated_partitions.pop_front(); } } } + + // Claim the memory for the uninitiated partitions Review Comment: This effectively resets the state of `uninitiated_partitions` to an empty FixedSizeQueue as I understand it Doesn't that mean after the first time the loser tree is empty it will never enter into the loop above again to start polling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org