alamb commented on code in PR #11966:
URL: https://github.com/apache/datafusion/pull/11966#discussion_r1715352613
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
Arc::clone(&self.schema)
}
- /// Add a batch, returning a batch if the target batch size or limit is
reached
- fn push_batch(&mut self, batch: RecordBatch) ->
Result<Option<RecordBatch>> {
- // discard empty batches
- if batch.num_rows() == 0 {
- return Ok(None);
- }
+ fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
Review Comment:
Could you please restore the documentation explaining what this does? Maybe
saying to look at the documentation on `CoalescerState` to figure out what
should be done on different return values
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
Arc::clone(&self.schema)
}
- /// Add a batch, returning a batch if the target batch size or limit is
reached
- fn push_batch(&mut self, batch: RecordBatch) ->
Result<Option<RecordBatch>> {
- // discard empty batches
- if batch.num_rows() == 0 {
- return Ok(None);
- }
+ fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+ let batch = gc_string_view_batch(&batch);
Review Comment:
I think the previous semantics of checking limit before calling
`gc_string_view_batch` make more sense than gc'ing the batch if the batch will
be ignored
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
Arc::clone(&self.schema)
}
- /// Add a batch, returning a batch if the target batch size or limit is
reached
- fn push_batch(&mut self, batch: RecordBatch) ->
Result<Option<RecordBatch>> {
- // discard empty batches
- if batch.num_rows() == 0 {
- return Ok(None);
- }
+ fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+ let batch = gc_string_view_batch(&batch);
- // past limit
- if self.limit_reached() {
- return Ok(None);
+ if self.limit_reached(&batch) {
+ CoalescerState::LimitReached
+ } else if self.target_reached(batch) {
+ CoalescerState::TargetReached
+ } else {
+ CoalescerState::Continue
}
+ }
- let batch = gc_string_view_batch(&batch);
-
- // Handle fetch limit:
- if let Some(fetch) = self.fetch {
- if self.total_rows + batch.num_rows() >= fetch {
- // We have reached the fetch limit.
+ fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+ match self.fetch {
+ Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+ // Limit is reached
let remaining_rows = fetch - self.total_rows;
debug_assert!(remaining_rows > 0);
- self.total_rows = fetch;
- // Trim the batch and add to buffered batches:
+
let batch = batch.slice(0, remaining_rows);
self.buffered_rows += batch.num_rows();
+ self.total_rows = fetch;
self.buffer.push(batch);
- // Combine buffered batches:
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // Reset the buffer state and return final batch:
- self.buffer.clear();
- self.buffered_rows = 0;
- return Ok(Some(batch));
+ true
}
+ _ => false,
}
- self.total_rows += batch.num_rows();
-
- // batch itself is already big enough and we have no buffered rows so
- // return it directly
- if batch.num_rows() >= self.target_batch_size &&
self.buffer.is_empty() {
- return Ok(Some(batch));
- }
- // add to the buffered batches
- self.buffered_rows += batch.num_rows();
- self.buffer.push(batch);
- // check to see if we have enough batches yet
- let batch = if self.buffered_rows >= self.target_batch_size {
- // combine the batches and return
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // reset buffer state
- self.buffer.clear();
- self.buffered_rows = 0;
- // return batch
- Some(batch)
- } else {
- None
- };
- Ok(batch)
}
- /// Finish the coalescing process, returning all buffered data as a final,
- /// single batch, if any
- fn finish(&mut self) -> Result<Option<RecordBatch>> {
- if self.buffer.is_empty() {
- Ok(None)
+ fn target_reached(&mut self, batch: RecordBatch) -> bool {
+ if batch.num_rows() == 0 {
+ false
} else {
- // combine the batches and return
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // reset buffer state
- self.buffer.clear();
- self.buffered_rows = 0;
- // return batch
- Ok(Some(batch))
+ self.total_rows += batch.num_rows();
+ self.buffered_rows += batch.num_rows();
+ self.buffer.push(batch);
+ self.buffered_rows >= self.target_batch_size
}
}
- /// returns true if there is a limit and it has been reached
- pub fn limit_reached(&self) -> bool {
- if let Some(fetch) = self.fetch {
- self.total_rows >= fetch
- } else {
- false
- }
+ fn finish(&mut self) -> Result<RecordBatch> {
Review Comment:
is there a reason to remove the documentation of this method?
It seems like the semantics might have changed too -- `finish` now is called
to produce the currently buffered batches, not just when no more input is
coming.
Maybe renaming it to `finish_batch` or `make_batch` or something similar
would make the semantics clearer
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
Arc::clone(&self.schema)
}
- /// Add a batch, returning a batch if the target batch size or limit is
reached
- fn push_batch(&mut self, batch: RecordBatch) ->
Result<Option<RecordBatch>> {
- // discard empty batches
- if batch.num_rows() == 0 {
- return Ok(None);
- }
+ fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+ let batch = gc_string_view_batch(&batch);
- // past limit
- if self.limit_reached() {
- return Ok(None);
+ if self.limit_reached(&batch) {
+ CoalescerState::LimitReached
+ } else if self.target_reached(batch) {
+ CoalescerState::TargetReached
+ } else {
+ CoalescerState::Continue
}
+ }
- let batch = gc_string_view_batch(&batch);
-
- // Handle fetch limit:
- if let Some(fetch) = self.fetch {
- if self.total_rows + batch.num_rows() >= fetch {
- // We have reached the fetch limit.
+ fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+ match self.fetch {
+ Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+ // Limit is reached
let remaining_rows = fetch - self.total_rows;
debug_assert!(remaining_rows > 0);
- self.total_rows = fetch;
- // Trim the batch and add to buffered batches:
+
let batch = batch.slice(0, remaining_rows);
self.buffered_rows += batch.num_rows();
+ self.total_rows = fetch;
self.buffer.push(batch);
- // Combine buffered batches:
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // Reset the buffer state and return final batch:
- self.buffer.clear();
- self.buffered_rows = 0;
- return Ok(Some(batch));
+ true
}
+ _ => false,
}
- self.total_rows += batch.num_rows();
-
- // batch itself is already big enough and we have no buffered rows so
- // return it directly
- if batch.num_rows() >= self.target_batch_size &&
self.buffer.is_empty() {
- return Ok(Some(batch));
- }
- // add to the buffered batches
- self.buffered_rows += batch.num_rows();
- self.buffer.push(batch);
- // check to see if we have enough batches yet
- let batch = if self.buffered_rows >= self.target_batch_size {
- // combine the batches and return
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // reset buffer state
- self.buffer.clear();
- self.buffered_rows = 0;
- // return batch
- Some(batch)
- } else {
- None
- };
- Ok(batch)
}
- /// Finish the coalescing process, returning all buffered data as a final,
- /// single batch, if any
- fn finish(&mut self) -> Result<Option<RecordBatch>> {
- if self.buffer.is_empty() {
- Ok(None)
+ fn target_reached(&mut self, batch: RecordBatch) -> bool {
+ if batch.num_rows() == 0 {
+ false
} else {
- // combine the batches and return
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // reset buffer state
- self.buffer.clear();
- self.buffered_rows = 0;
- // return batch
- Ok(Some(batch))
+ self.total_rows += batch.num_rows();
+ self.buffered_rows += batch.num_rows();
+ self.buffer.push(batch);
+ self.buffered_rows >= self.target_batch_size
}
}
- /// returns true if there is a limit and it has been reached
- pub fn limit_reached(&self) -> bool {
- if let Some(fetch) = self.fetch {
- self.total_rows >= fetch
- } else {
- false
- }
+ fn finish(&mut self) -> Result<RecordBatch> {
+ let batch = concat_batches(&self.schema, &self.buffer)?;
+ self.buffer.clear();
+ self.buffered_rows = 0;
+ Ok(batch)
}
}
+enum CoalescerState {
Review Comment:
Could we possibly add some documentation on this struct explaining what it
is used for (specifically I think documenting what the caller should when the
different varaints of `Continue` / `LimitReached` / `TargetReached` would be
helpful)
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -259,45 +261,72 @@ impl Stream for CoalesceBatchesStream {
}
}
+/// Enumeration of possible states for `CoalesceBatchesStream`.
+/// It represents different stages in the lifecycle of a stream of record
batches.
+#[derive(Debug, Clone, Eq, PartialEq)]
+enum CoalesceBatchesStreamState {
+ // State to pull a new batch from the input stream.
+ Pull,
+ // State to return a buffered batch.
+ ReturnBuffer,
+ // State indicating that the stream is exhausted.
+ Exhausted,
+}
+
impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
- // Get a clone (uses same underlying atomic) as self gets borrowed
below
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
-
- if self.is_closed {
- return Poll::Ready(None);
- }
loop {
- let input_batch = self.input.poll_next_unpin(cx);
- // records time on drop
- let _timer = cloned_time.timer();
- match ready!(input_batch) {
- Some(result) => {
- let Ok(input_batch) = result else {
- return Poll::Ready(Some(result)); // pass back error
+ match self.inner_state.clone() {
+ CoalesceBatchesStreamState::Pull => {
+ // Attempt to pull the next batch from the input stream.
+ let input_batch = match self.input.poll_next_unpin(cx) {
+ Poll::Ready(t) => t,
+ Poll::Pending => {
+ // If the batch is not ready, do not alter the
current state and return `Poll::Pending`.
+ return Poll::Pending;
+ }
};
Review Comment:
You can use the ready! macro here to simplify the
code:https://doc.rust-lang.org/stable/std/task/macro.ready.html
Something like
```suggestion
let input_batch = ready!(self.input.poll_next_unpin(cx));
```
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
Arc::clone(&self.schema)
}
- /// Add a batch, returning a batch if the target batch size or limit is
reached
- fn push_batch(&mut self, batch: RecordBatch) ->
Result<Option<RecordBatch>> {
- // discard empty batches
- if batch.num_rows() == 0 {
- return Ok(None);
- }
+ fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+ let batch = gc_string_view_batch(&batch);
- // past limit
- if self.limit_reached() {
- return Ok(None);
+ if self.limit_reached(&batch) {
+ CoalescerState::LimitReached
+ } else if self.target_reached(batch) {
+ CoalescerState::TargetReached
+ } else {
+ CoalescerState::Continue
}
+ }
- let batch = gc_string_view_batch(&batch);
-
- // Handle fetch limit:
- if let Some(fetch) = self.fetch {
- if self.total_rows + batch.num_rows() >= fetch {
- // We have reached the fetch limit.
+ fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+ match self.fetch {
+ Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+ // Limit is reached
let remaining_rows = fetch - self.total_rows;
debug_assert!(remaining_rows > 0);
- self.total_rows = fetch;
- // Trim the batch and add to buffered batches:
+
let batch = batch.slice(0, remaining_rows);
self.buffered_rows += batch.num_rows();
+ self.total_rows = fetch;
self.buffer.push(batch);
- // Combine buffered batches:
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // Reset the buffer state and return final batch:
- self.buffer.clear();
- self.buffered_rows = 0;
- return Ok(Some(batch));
+ true
}
+ _ => false,
}
- self.total_rows += batch.num_rows();
-
- // batch itself is already big enough and we have no buffered rows so
- // return it directly
- if batch.num_rows() >= self.target_batch_size &&
self.buffer.is_empty() {
- return Ok(Some(batch));
- }
- // add to the buffered batches
- self.buffered_rows += batch.num_rows();
- self.buffer.push(batch);
- // check to see if we have enough batches yet
- let batch = if self.buffered_rows >= self.target_batch_size {
- // combine the batches and return
- let batch = concat_batches(&self.schema, &self.buffer)?;
- // reset buffer state
- self.buffer.clear();
- self.buffered_rows = 0;
- // return batch
- Some(batch)
- } else {
- None
- };
- Ok(batch)
}
- /// Finish the coalescing process, returning all buffered data as a final,
- /// single batch, if any
- fn finish(&mut self) -> Result<Option<RecordBatch>> {
- if self.buffer.is_empty() {
- Ok(None)
+ fn target_reached(&mut self, batch: RecordBatch) -> bool {
Review Comment:
Could we also document this method too? Specifically that it doesn't just
check that the target is reached but also buffers the batch if needed
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -259,45 +261,72 @@ impl Stream for CoalesceBatchesStream {
}
}
+/// Enumeration of possible states for `CoalesceBatchesStream`.
+/// It represents different stages in the lifecycle of a stream of record
batches.
+#[derive(Debug, Clone, Eq, PartialEq)]
Review Comment:
I think you could make this as `Copy` as well which would simplify the code
below (no need to call `clone`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]