This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 9a67de58c0 [branch-52] Fix Arrow Spill Underrun (#20159) (#20684)
9a67de58c0 is described below
commit 9a67de58c027e6057aa37327ae4d0192d5c45fc5
Author: Haresh Khanna <[email protected]>
AuthorDate: Wed Mar 4 20:13:36 2026 +0000
[branch-52] Fix Arrow Spill Underrun (#20159) (#20684)
## Which issue does this PR close?
- Related to #20681
- Backport of https://github.com/apache/datafusion/pull/20159
## Rationale for this change
This adjusts the way that the spill channel works. Currently we have a
spill writer & reader pairing which uses a mutex to coordindate when a
file is ready to be read.
What happens is, that because we were using a `spawn_buffered` call, the
read task would race ahead trying to read a file which is yet to be
written out completely.
Alongside this, we need to flush each write to the file, as there is a
chance that another thread may see stale data.
## What changes are included in this PR?
Adds a flush on write, and converts the read task to not buffer reads.
## Are these changes tested?
I haven't written a test, but I have been running the example in the
attached issue. While it now fails with allocation errors, the original
error goes away.
## Are there any user-facing changes?
Nope
Co-authored-by: Peter L <[email protected]>
---
.../physical-plan/src/spill/in_progress_spill_file.rs | 7 +++++++
datafusion/physical-plan/src/spill/mod.rs | 5 +++++
datafusion/physical-plan/src/spill/spill_manager.rs | 13 +++++++++++++
datafusion/physical-plan/src/spill/spill_pool.rs | 8 +++++++-
4 files changed, 32 insertions(+), 1 deletion(-)
diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
index d2acf4993b..b9ff6b2f3b 100644
--- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
+++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
@@ -88,6 +88,13 @@ impl InProgressSpillFile {
Ok(())
}
+ pub fn flush(&mut self) -> Result<()> {
+ if let Some(writer) = &mut self.writer {
+ writer.flush()?;
+ }
+ Ok(())
+ }
+
/// Returns a reference to the in-progress file, if it exists.
/// This can be used to get the file path for creating readers before the
file is finished.
pub fn file(&self) -> Option<&RefCountedTempFile> {
diff --git a/datafusion/physical-plan/src/spill/mod.rs
b/datafusion/physical-plan/src/spill/mod.rs
index 78dea99ac8..3c4ee065c3 100644
--- a/datafusion/physical-plan/src/spill/mod.rs
+++ b/datafusion/physical-plan/src/spill/mod.rs
@@ -310,6 +310,11 @@ impl IPCStreamWriter {
Ok((delta_num_rows, delta_num_bytes))
}
+ pub fn flush(&mut self) -> Result<()> {
+ self.writer.flush()?;
+ Ok(())
+ }
+
/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(Into::into)
diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs
b/datafusion/physical-plan/src/spill/spill_manager.rs
index 89b0276206..6d931112ad 100644
--- a/datafusion/physical-plan/src/spill/spill_manager.rs
+++ b/datafusion/physical-plan/src/spill/spill_manager.rs
@@ -188,6 +188,19 @@ impl SpillManager {
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
}
+
+ /// Same as `read_spill_as_stream`, but without buffering.
+ pub fn read_spill_as_stream_unbuffered(
+ &self,
+ spill_file_path: RefCountedTempFile,
+ max_record_batch_memory: Option<usize>,
+ ) -> Result<SendableRecordBatchStream> {
+ Ok(Box::pin(cooperative(SpillReaderStream::new(
+ Arc::clone(&self.schema),
+ spill_file_path,
+ max_record_batch_memory,
+ ))))
+ }
}
pub(crate) trait GetSlicedSize {
diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs
b/datafusion/physical-plan/src/spill/spill_pool.rs
index e3b547b573..e8eea360da 100644
--- a/datafusion/physical-plan/src/spill/spill_pool.rs
+++ b/datafusion/physical-plan/src/spill/spill_pool.rs
@@ -194,6 +194,8 @@ impl SpillPoolWriter {
// Append the batch
if let Some(ref mut writer) = file_shared.writer {
writer.append_batch(batch)?;
+ // make sure we flush the writer for readers
+ writer.flush()?;
file_shared.batches_written += 1;
file_shared.estimated_size += batch_size;
}
@@ -535,7 +537,11 @@ impl Stream for SpillFile {
// Step 2: Lazy-create reader stream if needed
if self.reader.is_none() && should_read {
if let Some(file) = file {
- match self.spill_manager.read_spill_as_stream(file, None) {
+ // we want this unbuffered because files are actively being
written to
+ match self
+ .spill_manager
+ .read_spill_as_stream_unbuffered(file, None)
+ {
Ok(stream) => {
self.reader = Some(SpillFileReader {
stream,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]