alamb commented on code in PR #16249: URL: https://github.com/apache/datafusion/pull/16249#discussion_r2126736135
########## datafusion/physical-plan/src/coalesce/mod.rs: ########## @@ -488,110 +329,6 @@ mod tests { .unwrap() } - #[test] Review Comment: all moved upstream ########## datafusion/physical-plan/src/coalesce/mod.rs: ########## @@ -98,197 +49,93 @@ impl BatchCoalescer { fetch: Option<usize>, ) -> Self { Self { - schema, - target_batch_size, + inner: BatchCoalescer::new(schema, target_batch_size), total_rows: 0, - buffer: vec![], - buffered_rows: 0, fetch, + finished: false, } } /// Return the schema of the output batches pub fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) + self.inner.schema() } - /// Push next batch, and returns [`CoalescerState`] indicating the current - /// state of the buffer. - pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState { - let batch = gc_string_view_batch(&batch); - if self.limit_reached(&batch) { - CoalescerState::LimitReached - } else if self.target_reached(batch) { - CoalescerState::TargetReached - } else { - CoalescerState::Continue + /// Push next batch, and returns [`true`] indicating if the limit is hit + /// + /// If the limit is reached, the caller must call [`Self::finish()`] to + /// complete the buffered results as a batch and finish the query. + pub fn push_batch(&mut self, batch: RecordBatch) -> Result<bool> { + if self.finished { + return internal_err!( + "LimitedBatchCoalescer: cannot push batch after finish" + ); } - } - /// Return true if the there is no data buffered - pub fn is_empty(&self) -> bool { - self.buffer.is_empty() - } + // if we are at the limit, return LimitReached + if let Some(fetch) = self.fetch { + // limit previously reached + if self.total_rows >= fetch { + return Ok(true); + } - /// Checks if the buffer will reach the specified limit after getting - /// `batch`. - /// - /// If fetch would be exceeded, slices the received batch, updates the - /// buffer with it, and returns `true`. - /// - /// Otherwise: does nothing and returns `false`. - fn limit_reached(&mut self, batch: &RecordBatch) -> bool { - match self.fetch { - Some(fetch) if self.total_rows + batch.num_rows() >= fetch => { + // limit now reached + if self.total_rows + batch.num_rows() >= fetch { // Limit is reached let remaining_rows = fetch - self.total_rows; debug_assert!(remaining_rows > 0); - let batch = batch.slice(0, remaining_rows); - self.buffered_rows += batch.num_rows(); - self.total_rows = fetch; - self.buffer.push(batch); - true + let batch_head = batch.slice(0, remaining_rows); + self.total_rows += batch_head.num_rows(); + self.inner.push_batch(batch_head)?; + return Ok(true); } - _ => false, } - } - /// Updates the buffer with the given batch. - /// - /// If the target batch size is reached, returns `true`. Otherwise, returns - /// `false`. - fn target_reached(&mut self, batch: RecordBatch) -> bool { - if batch.num_rows() == 0 { - false - } else { - self.total_rows += batch.num_rows(); - self.buffered_rows += batch.num_rows(); - self.buffer.push(batch); - self.buffered_rows >= self.target_batch_size - } + self.total_rows += batch.num_rows(); + self.inner.push_batch(batch)?; + + Ok(false) // not at limit } - /// Concatenates and returns all buffered batches, and clears the buffer. - pub fn finish_batch(&mut self) -> datafusion_common::Result<RecordBatch> { - let batch = concat_batches(&self.schema, &self.buffer)?; - self.buffer.clear(); - self.buffered_rows = 0; - Ok(batch) + /// Return true if there is no data buffered + pub fn is_empty(&self) -> bool { + self.inner.is_empty() } -} -/// Indicates the state of the [`BatchCoalescer`] buffer after the -/// [`BatchCoalescer::push_batch()`] operation. -/// -/// The caller should take different actions, depending on the variant returned. -pub enum CoalescerState { Review Comment: the buffering is all managed upstream now, so there is no need to expose buffering details to the user of the coalescer ########## datafusion/physical-plan/src/coalesce_batches.rs: ########## @@ -252,12 +251,11 @@ struct CoalesceBatchesStream { /// The input plan input: SendableRecordBatchStream, /// Buffer for combining batches - coalescer: BatchCoalescer, + coalescer: LimitedBatchCoalescer, /// Execution metrics baseline_metrics: BaselineMetrics, - /// The current inner state of the stream. This state dictates the current - /// action or operation to be performed in the streaming process. - inner_state: CoalesceBatchesStreamState, Review Comment: Since the Coalescer can now buffer batches internally, this enum can be reduced to "complete" or not ########## datafusion/physical-plan/src/coalesce/mod.rs: ########## @@ -98,197 +49,93 @@ impl BatchCoalescer { fetch: Option<usize>, ) -> Self { Self { - schema, - target_batch_size, + inner: BatchCoalescer::new(schema, target_batch_size), Review Comment: this is the key change here -- move all the buffer management upstream into arrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org