kosiew commented on code in PR #20922:
URL: https://github.com/apache/datafusion/pull/20922#discussion_r2964937755


##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -126,49 +131,56 @@ impl BatchBuilder {
         &self.schema
     }
 
-    /// Drains the in_progress row indexes, and builds a new RecordBatch from 
them
-    ///
-    /// Will then drop any batches for which all rows have been yielded to the 
output
-    ///
-    /// Returns `None` if no pending rows
-    pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
-        if self.is_empty() {
-            return Ok(None);
-        }
-
-        let columns = (0..self.schema.fields.len())
+    /// Try to interleave all columns using the given index slice.
+    fn try_interleave_columns(
+        &self,
+        indices: &[(usize, usize)],
+    ) -> Result<Vec<ArrayRef>> {
+        (0..self.schema.fields.len())
             .map(|column_idx| {
                 let arrays: Vec<_> = self
                     .batches
                     .iter()
                     .map(|(_, batch)| batch.column(column_idx).as_ref())
                     .collect();
-                Ok(interleave(&arrays, &self.indices)?)
+                recover_offset_overflow_from_panic(|| interleave(&arrays, 
indices))
             })
-            .collect::<Result<Vec<_>>>()?;
-
-        self.indices.clear();
-
-        // New cursors are only created once the previous cursor for the stream
-        // is finished. This means all remaining rows from all but the last 
batch
-        // for each stream have been yielded to the newly created record batch
-        //
-        // We can therefore drop all but the last batch for each stream
-        let mut batch_idx = 0;
-        let mut retained = 0;
-        self.batches.retain(|(stream_idx, batch)| {
-            let stream_cursor = &mut self.cursors[*stream_idx];
-            let retain = stream_cursor.batch_idx == batch_idx;
-            batch_idx += 1;
-
-            if retain {
-                stream_cursor.batch_idx = retained;
-                retained += 1;
-            } else {
-                self.batches_mem_used -= get_record_batch_memory_size(batch);
-            }
-            retain
-        });
+            .collect::<Result<Vec<_>>>()
+    }
+
+    /// Builds a record batch from the first `rows_to_emit` buffered rows.
+    fn finish_record_batch(
+        &mut self,
+        rows_to_emit: usize,
+        columns: Vec<ArrayRef>,
+    ) -> Result<RecordBatch> {
+        // Remove consumed indices, keeping any remaining for the next call.
+        self.indices.drain(..rows_to_emit);
+
+        // Only clean up fully-consumed batches when all indices are drained,

Review Comment:
   Nice change. One thing that stood out to me here: now that 
`build_record_batch()` can emit a prefix and leave the remainder buffered, this 
branch seems to keep every fully-consumed input batch alive until 
`self.indices` is empty.
   
   That seems functionally correct, but it also means overflow cases could 
retain quite a bit of memory across several follow-up polls — especially for 
`FETCH`-limited queries where we stop pulling new input and just drain 
leftovers.
   
   Would it make sense to either release batches that are no longer referenced 
by the remaining indices, or at least leave a quick comment here calling out 
that this retention is intentional? I think that would help future readers 
understand the tradeoff.



##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -200,3 +229,143 @@ pub(crate) fn try_grow_reservation_to_at_least(
     }
     Ok(())
 }
+
+/// Returns true if the error is an Arrow offset overflow.
+fn is_offset_overflow(e: &DataFusionError) -> bool {
+    matches!(
+        e,
+        DataFusionError::ArrowError(boxed, _)
+            if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
+    )
+}
+
+fn offset_overflow_error() -> DataFusionError {
+    DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), 
None)
+}
+
+fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>

Review Comment:
   The retry behavior looks good, but right now it seems like it’s only covered 
through synthetic helper failures.
   
   Since the production path depends on matching Arrow’s panic payload pretty 
closely, I think it’d be great to add one higher-level regression test closer 
to `BatchBuilder::build_record_batch()` or `SortPreservingMergeStream` that 
exercises the retry/drain flow end-to-end through an injectable interleave hook.
   
   That would make it a lot easier to catch future Arrow-side panic-message 
changes — or refactors in this file — before they slip through.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to