alamb commented on code in PR #11966:
URL: https://github.com/apache/datafusion/pull/11966#discussion_r1715352613


##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {

Review Comment:
   Could you please restore the documentation explaining what this does? Maybe 
saying to look at the documentation on `CoalescerState` to figure out what 
should be done on different return values



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+        let batch = gc_string_view_batch(&batch);

Review Comment:
   I think the previous semantics of checking limit before calling 
`gc_string_view_batch` make more sense than gc'ing the batch if the batch will 
be ignored



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+        let batch = gc_string_view_batch(&batch);
 
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
         }
+    }
 
-        let batch = gc_string_view_batch(&batch);
-
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    fn finish(&mut self) -> Result<RecordBatch> {

Review Comment:
   is there a reason to remove the documentation of this method?
   
   It seems like the semantics might have changed too -- `finish` now is called 
to produce the currently buffered batches, not just when no more input is 
coming. 
   
   Maybe renaming it to `finish_batch` or `make_batch` or something similar 
would make the semantics  clearer



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+        let batch = gc_string_view_batch(&batch);
 
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
         }
+    }
 
-        let batch = gc_string_view_batch(&batch);
-
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    fn finish(&mut self) -> Result<RecordBatch> {
+        let batch = concat_batches(&self.schema, &self.buffer)?;
+        self.buffer.clear();
+        self.buffered_rows = 0;
+        Ok(batch)
     }
 }
 
+enum CoalescerState {

Review Comment:
   Could we possibly add some documentation on this struct explaining what it 
is used for (specifically I think documenting what the caller should when the 
different varaints of `Continue` / `LimitReached` / `TargetReached` would be 
helpful)



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -259,45 +261,72 @@ impl Stream for CoalesceBatchesStream {
     }
 }
 
+/// Enumeration of possible states for `CoalesceBatchesStream`.
+/// It represents different stages in the lifecycle of a stream of record 
batches.
+#[derive(Debug, Clone, Eq, PartialEq)]
+enum CoalesceBatchesStreamState {
+    // State to pull a new batch from the input stream.
+    Pull,
+    // State to return a buffered batch.
+    ReturnBuffer,
+    // State indicating that the stream is exhausted.
+    Exhausted,
+}
+
 impl CoalesceBatchesStream {
     fn poll_next_inner(
         self: &mut Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Result<RecordBatch>>> {
-        // Get a clone (uses same underlying atomic) as self gets borrowed 
below
         let cloned_time = self.baseline_metrics.elapsed_compute().clone();
-
-        if self.is_closed {
-            return Poll::Ready(None);
-        }
         loop {
-            let input_batch = self.input.poll_next_unpin(cx);
-            // records time on drop
-            let _timer = cloned_time.timer();
-            match ready!(input_batch) {
-                Some(result) => {
-                    let Ok(input_batch) = result else {
-                        return Poll::Ready(Some(result)); // pass back error
+            match self.inner_state.clone() {
+                CoalesceBatchesStreamState::Pull => {
+                    // Attempt to pull the next batch from the input stream.
+                    let input_batch = match self.input.poll_next_unpin(cx) {
+                        Poll::Ready(t) => t,
+                        Poll::Pending => {
+                            // If the batch is not ready, do not alter the 
current state and return `Poll::Pending`.
+                            return Poll::Pending;
+                        }
                     };

Review Comment:
   You can use the ready! macro here to simplify the 
code:https://doc.rust-lang.org/stable/std/task/macro.ready.html
   
   Something like
   ```suggestion
                       let input_batch = ready!(self.input.poll_next_unpin(cx));
   ```
   
   



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +393,60 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
+        let batch = gc_string_view_batch(&batch);
 
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
         }
+    }
 
-        let batch = gc_string_view_batch(&batch);
-
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {

Review Comment:
   Could we also document this method too? Specifically that it doesn't just 
check that the target is reached but also buffers the batch if needed



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -259,45 +261,72 @@ impl Stream for CoalesceBatchesStream {
     }
 }
 
+/// Enumeration of possible states for `CoalesceBatchesStream`.
+/// It represents different stages in the lifecycle of a stream of record 
batches.
+#[derive(Debug, Clone, Eq, PartialEq)]

Review Comment:
   I think you could make this as `Copy` as well which would simplify the code 
below (no need to call `clone`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to