ozankabak commented on code in PR #11966:
URL: https://github.com/apache/datafusion/pull/11966#discussion_r1715846903


##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    /// Updates the buffer with the given batch. If the target batch count is 
reached,
+    /// the function returns true. Otherwise, it returns false.
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    /// Concatenates and returns the all buffered batches. It must be noticed 
that
+    /// the buffer is cleared during this operations.
+    fn finish_batch(&mut self) -> Result<RecordBatch> {
+        let batch = concat_batches(&self.schema, &self.buffer)?;
+        self.buffer.clear();
+        self.buffered_rows = 0;
+        Ok(batch)
     }
 }
 
+/// This variants are used to indicate the final status of the buffer of 
[`BatchCoalescer`]
+/// after a [`BatchCoalescer::push_batch()`] operation.
+enum CoalescerState {
+    /// Neither the limit nor the target batch is reached.
+    Continue,
+    /// The required row count for downstream operators to generate a result 
is reached.

Review Comment:
   ```suggestion
       /// The sufficient row count to produce a complete query result is 
reached.
   ```



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    /// Updates the buffer with the given batch. If the target batch count is 
reached,
+    /// the function returns true. Otherwise, it returns false.
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    /// Concatenates and returns the all buffered batches. It must be noticed 
that
+    /// the buffer is cleared during this operations.
+    fn finish_batch(&mut self) -> Result<RecordBatch> {
+        let batch = concat_batches(&self.schema, &self.buffer)?;
+        self.buffer.clear();
+        self.buffered_rows = 0;
+        Ok(batch)
     }
 }
 
+/// This variants are used to indicate the final status of the buffer of 
[`BatchCoalescer`]
+/// after a [`BatchCoalescer::push_batch()`] operation.

Review Comment:
   ```suggestion
   /// This enumeration acts as a status indicator for the [`BatchCoalescer`] 
after a
   /// [`BatchCoalescer::push_batch()`] operation.
   ```



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    /// Updates the buffer with the given batch. If the target batch count is 
reached,
+    /// the function returns true. Otherwise, it returns false.
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    /// Concatenates and returns the all buffered batches. It must be noticed 
that
+    /// the buffer is cleared during this operations.
+    fn finish_batch(&mut self) -> Result<RecordBatch> {
+        let batch = concat_batches(&self.schema, &self.buffer)?;
+        self.buffer.clear();
+        self.buffered_rows = 0;
+        Ok(batch)
     }
 }
 
+/// This variants are used to indicate the final status of the buffer of 
[`BatchCoalescer`]
+/// after a [`BatchCoalescer::push_batch()`] operation.
+enum CoalescerState {
+    /// Neither the limit nor the target batch is reached.
+    Continue,
+    /// The required row count for downstream operators to generate a result 
is reached.
+    LimitReached,
+    /// Specified minimum number of rows which a batch should have is reached.

Review Comment:
   ```suggestion
       /// The specified minimum number of rows a batch should have is reached.
   ```



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    /// Updates the buffer with the given batch. If the target batch count is 
reached,
+    /// the function returns true. Otherwise, it returns false.
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    /// Concatenates and returns the all buffered batches. It must be noticed 
that
+    /// the buffer is cleared during this operations.

Review Comment:
   ```suggestion
       /// Concatenates and returns all buffered batches, and clears the buffer.
   ```



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    /// Updates the buffer with the given batch. If the target batch count is 
reached,
+    /// the function returns true. Otherwise, it returns false.
+    fn target_reached(&mut self, batch: RecordBatch) -> bool {
+        if batch.num_rows() == 0 {
+            false
         } else {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Ok(Some(batch))
+            self.total_rows += batch.num_rows();
+            self.buffered_rows += batch.num_rows();
+            self.buffer.push(batch);
+            self.buffered_rows >= self.target_batch_size
         }
     }
 
-    /// returns true if there is a limit and it has been reached
-    pub fn limit_reached(&self) -> bool {
-        if let Some(fetch) = self.fetch {
-            self.total_rows >= fetch
-        } else {
-            false
-        }
+    /// Concatenates and returns the all buffered batches. It must be noticed 
that
+    /// the buffer is cleared during this operations.
+    fn finish_batch(&mut self) -> Result<RecordBatch> {
+        let batch = concat_batches(&self.schema, &self.buffer)?;
+        self.buffer.clear();
+        self.buffered_rows = 0;
+        Ok(batch)
     }
 }
 
+/// This variants are used to indicate the final status of the buffer of 
[`BatchCoalescer`]
+/// after a [`BatchCoalescer::push_batch()`] operation.
+enum CoalescerState {
+    /// Neither the limit nor the target batch is reached.

Review Comment:
   ```suggestion
       /// Neither the limit nor the target batch size is reached.
   ```



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.

Review Comment:
   ```suggestion
       /// The function checks if the buffer can reach the specified limit 
after getting `batch`.
       /// If it does, it slices the received batch as needed, updates the 
buffer with it, amd
       /// finally returns `true`. Otherwise; the function does nothing and 
returns `false`.
   ```



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -364,90 +419,73 @@ impl BatchCoalescer {
         Arc::clone(&self.schema)
     }
 
-    /// Add a batch, returning a batch if the target batch size or limit is 
reached
-    fn push_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
-        // discard empty batches
-        if batch.num_rows() == 0 {
-            return Ok(None);
-        }
-
-        // past limit
-        if self.limit_reached() {
-            return Ok(None);
-        }
-
+    /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns
+    /// a variant of [`CoalescerState`] indicating the final state of the 
buffer.
+    fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
         let batch = gc_string_view_batch(&batch);
+        if self.limit_reached(&batch) {
+            CoalescerState::LimitReached
+        } else if self.target_reached(batch) {
+            CoalescerState::TargetReached
+        } else {
+            CoalescerState::Continue
+        }
+    }
 
-        // Handle fetch limit:
-        if let Some(fetch) = self.fetch {
-            if self.total_rows + batch.num_rows() >= fetch {
-                // We have reached the fetch limit.
+    /// After getting the `batch`, the function checks if the buffer can reach 
limit.
+    /// If it is so, it slices the received batch as needed, and updates the 
buffer with it,
+    /// and finally returns `true`. Otherwise; the function does nothing and 
returns false.
+    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
+        match self.fetch {
+            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+                // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
-                self.total_rows = fetch;
-                // Trim the batch and add to buffered batches:
+
                 let batch = batch.slice(0, remaining_rows);
                 self.buffered_rows += batch.num_rows();
+                self.total_rows = fetch;
                 self.buffer.push(batch);
-                // Combine buffered batches:
-                let batch = concat_batches(&self.schema, &self.buffer)?;
-                // Reset the buffer state and return final batch:
-                self.buffer.clear();
-                self.buffered_rows = 0;
-                return Ok(Some(batch));
+                true
             }
+            _ => false,
         }
-        self.total_rows += batch.num_rows();
-
-        // batch itself is already big enough and we have no buffered rows so
-        // return it directly
-        if batch.num_rows() >= self.target_batch_size && 
self.buffer.is_empty() {
-            return Ok(Some(batch));
-        }
-        // add to the buffered batches
-        self.buffered_rows += batch.num_rows();
-        self.buffer.push(batch);
-        // check to see if we have enough batches yet
-        let batch = if self.buffered_rows >= self.target_batch_size {
-            // combine the batches and return
-            let batch = concat_batches(&self.schema, &self.buffer)?;
-            // reset buffer state
-            self.buffer.clear();
-            self.buffered_rows = 0;
-            // return batch
-            Some(batch)
-        } else {
-            None
-        };
-        Ok(batch)
     }
 
-    /// Finish the coalescing process, returning all buffered data as a final,
-    /// single batch, if any
-    fn finish(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffer.is_empty() {
-            Ok(None)
+    /// Updates the buffer with the given batch. If the target batch count is 
reached,
+    /// the function returns true. Otherwise, it returns false.

Review Comment:
   ```suggestion
       /// Updates the buffer with the given batch. If the target batch size is 
reached,
       /// the function returns `true`. Otherwise, it returns `false`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to