This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 07ddfd7a22 Fix sort merge interleave overflow (#20922)
07ddfd7a22 is described below
commit 07ddfd7a22bf17a3cdcb188509ecc7b8dceab259
Author: xudong.w <[email protected]>
AuthorDate: Fri Mar 27 12:01:32 2026 +0800
Fix sort merge interleave overflow (#20922)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
When SortPreservingMergeStream merges batches containing large
string/binary columns whose combined offsets exceed i32::MAX, Arrow's
interleave panics with .expect("overflow"). This PR catches that panic
and retries with progressively fewer rows, producing smaller output
batches that fit within i32 offset limits.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes UT
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/physical-plan/src/sorts/builder.rs | 258 +++++++++++++++++----
datafusion/physical-plan/src/sorts/merge.rs | 123 +++++++++-
.../src/sorts/sort_preserving_merge.rs | 56 +++++
3 files changed, 393 insertions(+), 44 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/builder.rs
b/datafusion/physical-plan/src/sorts/builder.rs
index a462b83205..73386212a2 100644
--- a/datafusion/physical-plan/src/sorts/builder.rs
+++ b/datafusion/physical-plan/src/sorts/builder.rs
@@ -16,11 +16,16 @@
// under the License.
use crate::spill::get_record_batch_memory_size;
+use arrow::array::ArrayRef;
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
-use datafusion_common::Result;
+use datafusion_common::{DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
+use log::warn;
+use std::any::Any;
+use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;
#[derive(Debug, Copy, Clone, Default)]
@@ -126,49 +131,60 @@ impl BatchBuilder {
&self.schema
}
- /// Drains the in_progress row indexes, and builds a new RecordBatch from
them
- ///
- /// Will then drop any batches for which all rows have been yielded to the
output
- ///
- /// Returns `None` if no pending rows
- pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
- if self.is_empty() {
- return Ok(None);
- }
-
- let columns = (0..self.schema.fields.len())
+ /// Try to interleave all columns using the given index slice.
+ fn try_interleave_columns(
+ &self,
+ indices: &[(usize, usize)],
+ ) -> Result<Vec<ArrayRef>> {
+ (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
- Ok(interleave(&arrays, &self.indices)?)
+ recover_offset_overflow_from_panic(|| interleave(&arrays,
indices))
})
- .collect::<Result<Vec<_>>>()?;
-
- self.indices.clear();
-
- // New cursors are only created once the previous cursor for the stream
- // is finished. This means all remaining rows from all but the last
batch
- // for each stream have been yielded to the newly created record batch
- //
- // We can therefore drop all but the last batch for each stream
- let mut batch_idx = 0;
- let mut retained = 0;
- self.batches.retain(|(stream_idx, batch)| {
- let stream_cursor = &mut self.cursors[*stream_idx];
- let retain = stream_cursor.batch_idx == batch_idx;
- batch_idx += 1;
-
- if retain {
- stream_cursor.batch_idx = retained;
- retained += 1;
- } else {
- self.batches_mem_used -= get_record_batch_memory_size(batch);
- }
- retain
- });
+ .collect::<Result<Vec<_>>>()
+ }
+
+ /// Builds a record batch from the first `rows_to_emit` buffered rows.
+ fn finish_record_batch(
+ &mut self,
+ rows_to_emit: usize,
+ columns: Vec<ArrayRef>,
+ ) -> Result<RecordBatch> {
+ // Remove consumed indices, keeping any remaining for the next call.
+ self.indices.drain(..rows_to_emit);
+
+ // Only clean up fully-consumed batches when all indices are drained,
+ // because remaining indices may still reference earlier batches.
+ // In the overflow/partial-emit case this may retain some extra memory
+ // across a few drain polls, but avoids costly index scanning on the
+ // hot path. The retention is bounded and short-lived since leftover
+ // rows are drained over subsequent polls.
+ if self.indices.is_empty() {
+ // New cursors are only created once the previous cursor for the
stream
+ // is finished. This means all remaining rows from all but the
last batch
+ // for each stream have been yielded to the newly created record
batch
+ //
+ // We can therefore drop all but the last batch for each stream
+ let mut batch_idx = 0;
+ let mut retained = 0;
+ self.batches.retain(|(stream_idx, batch)| {
+ let stream_cursor = &mut self.cursors[*stream_idx];
+ let retain = stream_cursor.batch_idx == batch_idx;
+ batch_idx += 1;
+
+ if retain {
+ stream_cursor.batch_idx = retained;
+ retained += 1;
+ } else {
+ self.batches_mem_used -=
get_record_batch_memory_size(batch);
+ }
+ retain
+ });
+ }
// Release excess memory back to the pool, but never shrink below
// initial_reservation to maintain the anti-starvation guarantee
@@ -178,10 +194,27 @@ impl BatchBuilder {
self.reservation.shrink(self.reservation.size() - target);
}
- Ok(Some(RecordBatch::try_new(
- Arc::clone(&self.schema),
- columns,
- )?))
+ RecordBatch::try_new(Arc::clone(&self.schema),
columns).map_err(Into::into)
+ }
+
+ /// Drains the in_progress row indexes, and builds a new RecordBatch from
them
+ ///
+ /// Will then drop any batches for which all rows have been yielded to the
output.
+ /// If an offset overflow occurs (e.g. string/list offsets exceed
i32::MAX),
+ /// retries with progressively fewer rows until it succeeds.
+ ///
+ /// Returns `None` if no pending rows
+ pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
+ if self.is_empty() {
+ return Ok(None);
+ }
+
+ let (rows_to_emit, columns) =
+ retry_interleave(self.indices.len(), self.indices.len(),
|rows_to_emit| {
+ self.try_interleave_columns(&self.indices[..rows_to_emit])
+ })?;
+
+ Ok(Some(self.finish_record_batch(rows_to_emit, columns)?))
}
}
@@ -200,3 +233,146 @@ pub(crate) fn try_grow_reservation_to_at_least(
}
Ok(())
}
+
+/// Returns true if the error is an Arrow offset overflow.
+fn is_offset_overflow(e: &DataFusionError) -> bool {
+ matches!(
+ e,
+ DataFusionError::ArrowError(boxed, _)
+ if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
+ )
+}
+
+fn offset_overflow_error() -> DataFusionError {
+ DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)),
None)
+}
+
+fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
+where
+ F: FnOnce() -> std::result::Result<T, ArrowError>,
+{
+ // Arrow's interleave can panic on i32 offset overflow with
+ // `.expect("overflow")` / `.expect("offset overflow")`.
+ // Catch only those specific panics so the caller can retry
+ // with fewer rows while unrelated defects still unwind.
+ //
+ // TODO: remove once arrow-rs#9549 lands — interleave will return
+ // OffsetOverflowError directly instead of panicking.
+ match catch_unwind(AssertUnwindSafe(f)) {
+ Ok(result) => Ok(result?),
+ Err(panic_payload) => {
+ if is_arrow_offset_overflow_panic(panic_payload.as_ref()) {
+ Err(offset_overflow_error())
+ } else {
+ std::panic::resume_unwind(panic_payload);
+ }
+ }
+ }
+}
+
+fn retry_interleave<T, F>(
+ mut rows_to_emit: usize,
+ total_rows: usize,
+ mut interleave: F,
+) -> Result<(usize, T)>
+where
+ F: FnMut(usize) -> Result<T>,
+{
+ loop {
+ match interleave(rows_to_emit) {
+ Ok(value) => return Ok((rows_to_emit, value)),
+ Err(e) if is_offset_overflow(&e) => {
+ rows_to_emit /= 2;
+ if rows_to_emit == 0 {
+ return Err(e);
+ }
+ warn!(
+ "Interleave offset overflow with {total_rows} rows,
retrying with {rows_to_emit}"
+ );
+ }
+ Err(e) => return Err(e),
+ }
+ }
+}
+
+fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> {
+ if let Some(msg) = payload.downcast_ref::<&str>() {
+ return Some(msg);
+ }
+ if let Some(msg) = payload.downcast_ref::<String>() {
+ return Some(msg.as_str());
+ }
+ None
+}
+
+/// Returns true if a caught panic payload matches the Arrow offset overflows
+/// raised by interleave's offset builders.
+fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool {
+ matches!(panic_message(payload), Some("overflow" | "offset overflow"))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::error::ArrowError;
+
+ #[test]
+ fn test_retry_interleave_halves_rows_until_success() {
+ let mut attempts = Vec::new();
+
+ let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| {
+ attempts.push(rows_to_emit);
+ if rows_to_emit > 1 {
+ Err(offset_overflow_error())
+ } else {
+ Ok("ok")
+ }
+ })
+ .unwrap();
+
+ assert_eq!(rows_to_emit, 1);
+ assert_eq!(result, "ok");
+ assert_eq!(attempts, vec![4, 2, 1]);
+ }
+
+ #[test]
+ fn test_recover_offset_overflow_from_panic() {
+ let error = recover_offset_overflow_from_panic(
+ || -> std::result::Result<(), ArrowError> { panic!("offset
overflow") },
+ )
+ .unwrap_err();
+
+ assert!(is_offset_overflow(&error));
+ }
+
+ #[test]
+ fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() {
+ let panic_payload = catch_unwind(AssertUnwindSafe(|| {
+ let _ = recover_offset_overflow_from_panic(
+ || -> std::result::Result<(), ArrowError> { panic!("capacity
overflow") },
+ );
+ }));
+
+ assert!(panic_payload.is_err());
+ }
+
+ #[test]
+ fn test_is_arrow_offset_overflow_panic() {
+ let overflow = Box::new("overflow") as Box<dyn Any + Send>;
+ assert!(is_arrow_offset_overflow_panic(overflow.as_ref()));
+
+ let offset_overflow =
+ Box::new(String::from("offset overflow")) as Box<dyn Any + Send>;
+ assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref()));
+
+ let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any +
Send>;
+ assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref()));
+
+ let arithmetic_overflow =
+ Box::new(String::from("attempt to multiply with overflow"))
+ as Box<dyn Any + Send>;
+ assert!(!is_arrow_offset_overflow_panic(
+ arithmetic_overflow.as_ref()
+ ));
+ }
+}
diff --git a/datafusion/physical-plan/src/sorts/merge.rs
b/datafusion/physical-plan/src/sorts/merge.rs
index 272816251d..c29933535a 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -53,6 +53,14 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues>
{
/// `fetch` limit.
done: bool,
+ /// Whether buffered rows should be drained after `done` is set.
+ ///
+ /// This is enabled when we stop because the `fetch` limit has been
+ /// reached, allowing partial batches left over after overflow handling to
+ /// be emitted on subsequent polls. It remains disabled for terminal
+ /// errors so the stream does not yield data after returning `Err`.
+ drain_in_progress_on_done: bool,
+
/// A loser tree that always produces the minimum cursor
///
/// Node 0 stores the top winner, Nodes 1..num_streams store
@@ -164,6 +172,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
streams,
metrics,
done: false,
+ drain_in_progress_on_done: false,
cursors: (0..stream_count).map(|_| None).collect(),
prev_cursors: (0..stream_count).map(|_| None).collect(),
round_robin_tie_breaker_mode: false,
@@ -203,11 +212,28 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
}
}
+ fn emit_in_progress_batch(&mut self) -> Result<Option<RecordBatch>> {
+ let rows_before = self.in_progress.len();
+ let result = self.in_progress.build_record_batch();
+ self.produced += rows_before - self.in_progress.len();
+ result
+ }
+
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.done {
+ // When `build_record_batch()` hits an i32 offset overflow (e.g.
+ // combined string offsets exceed 2 GB), it emits a partial batch
+ // and keeps the remaining rows in `self.in_progress.indices`.
+ // Drain those leftover rows before terminating the stream,
+ // otherwise they would be silently dropped.
+ // Repeated overflows are fine — each poll emits another partial
+ // batch until `in_progress` is fully drained.
+ if self.drain_in_progress_on_done && !self.in_progress.is_empty() {
+ return Poll::Ready(self.emit_in_progress_batch().transpose());
+ }
return Poll::Ready(None);
}
// Once all partitions have set their corresponding cursors for the
loser tree,
@@ -283,14 +309,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// stop sorting if fetch has been reached
if self.fetch_reached() {
self.done = true;
+ self.drain_in_progress_on_done = true;
} else if self.in_progress.len() < self.batch_size {
continue;
}
}
- self.produced += self.in_progress.len();
-
- return
Poll::Ready(self.in_progress.build_record_batch().transpose());
+ return Poll::Ready(self.emit_in_progress_batch().transpose());
}
}
@@ -542,3 +567,95 @@ impl<C: CursorValues + Unpin> RecordBatchStream for
SortPreservingMergeStream<C>
Arc::clone(self.in_progress.schema())
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metrics::ExecutionPlanMetricsSet;
+ use crate::sorts::stream::PartitionedStream;
+ use arrow::array::Int32Array;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_execution::memory_pool::{
+ MemoryConsumer, MemoryPool, UnboundedMemoryPool,
+ };
+ use futures::task::noop_waker_ref;
+ use std::cmp::Ordering;
+
+ #[derive(Debug)]
+ struct EmptyPartitionedStream;
+
+ impl PartitionedStream for EmptyPartitionedStream {
+ type Output = Result<(DummyValues, RecordBatch)>;
+
+ fn partitions(&self) -> usize {
+ 1
+ }
+
+ fn poll_next(
+ &mut self,
+ _cx: &mut Context<'_>,
+ _stream_idx: usize,
+ ) -> Poll<Option<Self::Output>> {
+ Poll::Ready(None)
+ }
+ }
+
+ #[derive(Debug)]
+ struct DummyValues;
+
+ impl CursorValues for DummyValues {
+ fn len(&self) -> usize {
+ 0
+ }
+
+ fn eq(_l: &Self, _l_idx: usize, _r: &Self, _r_idx: usize) -> bool {
+ unreachable!("done-path test should not compare cursors")
+ }
+
+ fn eq_to_previous(_cursor: &Self, _idx: usize) -> bool {
+ unreachable!("done-path test should not compare cursors")
+ }
+
+ fn compare(_l: &Self, _l_idx: usize, _r: &Self, _r_idx: usize) ->
Ordering {
+ unreachable!("done-path test should not compare cursors")
+ }
+ }
+
+ #[test]
+ fn test_done_drains_buffered_rows() {
+ let schema = Arc::new(Schema::new(vec![Field::new("i",
DataType::Int32, false)]));
+ let pool: Arc<dyn MemoryPool> =
Arc::new(UnboundedMemoryPool::default());
+ let reservation = MemoryConsumer::new("test").register(&pool);
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ let mut stream = SortPreservingMergeStream::<DummyValues>::new(
+ Box::new(EmptyPartitionedStream),
+ Arc::clone(&schema),
+ BaselineMetrics::new(&metrics, 0),
+ 16,
+ Some(1),
+ reservation,
+ true,
+ );
+
+ let batch =
+ RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1]))])
+ .unwrap();
+ stream.in_progress.push_batch(0, batch).unwrap();
+ stream.in_progress.push_row(0);
+ stream.done = true;
+ stream.drain_in_progress_on_done = true;
+
+ let waker = noop_waker_ref();
+ let mut cx = Context::from_waker(waker);
+
+ match stream.poll_next_inner(&mut cx) {
+ Poll::Ready(Some(Ok(batch))) => assert_eq!(batch.num_rows(), 1),
+ other => {
+ panic!("expected buffered rows to be drained after done, got
{other:?}")
+ }
+ }
+ assert!(stream.in_progress.is_empty());
+ assert!(matches!(stream.poll_next_inner(&mut cx), Poll::Ready(None)));
+ }
+}
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index b1ee5b4d5e..1e60c391f5 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -491,6 +491,7 @@ mod tests {
.with_session_config(config);
Ok(Arc::new(task_ctx))
}
+
// The number in the function is highly related to the memory limit we are
testing,
// any change of the constant should be aware of
fn generate_spm_for_round_robin_tie_breaker(
@@ -1539,4 +1540,59 @@ mod tests {
Err(_) => exec_err!("SortPreservingMerge caused a deadlock"),
}
}
+
+ #[tokio::test]
+ async fn test_sort_merge_stops_after_error_with_buffered_rows() ->
Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let schema = Arc::new(Schema::new(vec![Field::new("i",
DataType::Int32, false)]));
+ let sort: LexOrdering =
[PhysicalSortExpr::new_default(Arc::new(Column::new(
+ "i", 0,
+ ))
+ as Arc<dyn PhysicalExpr>)]
+ .into();
+
+ let mut stream0 =
RecordBatchReceiverStream::builder(Arc::clone(&schema), 2);
+ let tx0 = stream0.tx();
+ let schema0 = Arc::clone(&schema);
+ stream0.spawn(async move {
+ let batch =
+ RecordBatch::try_new(schema0,
vec![Arc::new(Int32Array::from(vec![1]))])?;
+ tx0.send(Ok(batch)).await.unwrap();
+ tx0.send(exec_err!("stream failure")).await.unwrap();
+ Ok(())
+ });
+
+ let mut stream1 =
RecordBatchReceiverStream::builder(Arc::clone(&schema), 1);
+ let tx1 = stream1.tx();
+ let schema1 = Arc::clone(&schema);
+ stream1.spawn(async move {
+ let batch =
+ RecordBatch::try_new(schema1,
vec![Arc::new(Int32Array::from(vec![2]))])?;
+ tx1.send(Ok(batch)).await.unwrap();
+ Ok(())
+ });
+
+ let metrics = ExecutionPlanMetricsSet::new();
+ let reservation =
+
MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool);
+
+ let mut merge_stream = StreamingMergeBuilder::new()
+ .with_streams(vec![stream0.build(), stream1.build()])
+ .with_schema(Arc::clone(&schema))
+ .with_expressions(&sort)
+ .with_metrics(BaselineMetrics::new(&metrics, 0))
+ .with_batch_size(task_ctx.session_config().batch_size())
+ .with_fetch(None)
+ .with_reservation(reservation)
+ .build()?;
+
+ let first = merge_stream.next().await.unwrap();
+ assert!(first.is_err(), "expected merge stream to surface the error");
+ assert!(
+ merge_stream.next().await.is_none(),
+ "merge stream yielded data after returning an error"
+ );
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]