alamb commented on code in PR #15654:
URL: https://github.com/apache/datafusion/pull/15654#discussion_r2036219770


##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -24,27 +24,156 @@ use std::fs::File;
 use std::io::BufReader;
 use std::path::{Path, PathBuf};
 use std::ptr::NonNull;
+use std::sync::Arc;
 
 use arrow::array::ArrayData;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
 use arrow::record_batch::RecordBatch;
-use tokio::sync::mpsc::Sender;
-
-use datafusion_common::{exec_datafusion_err, HashSet, Result};
-
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
-    let file = BufReader::new(File::open(path)?);
-    // SAFETY: DataFusion's spill writer strictly follows Arrow IPC 
specifications
-    // with validated schemas and buffers. Skip redundant validation during 
read
-    // to speedup read operation. This is safe for DataFusion as input 
guaranteed to be correct when written.
-    let reader = unsafe { StreamReader::try_new(file, 
None)?.with_skip_validation(true) };
-    for batch in reader {
-        sender
-            .blocking_send(batch.map_err(Into::into))
-            .map_err(|e| exec_datafusion_err!("{e}"))?;
+
+use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::RecordBatchStream;
+use futures::{FutureExt as _, Stream};
+
+/// Stream that reads spill files from disk where each batch is read in a 
spawned blocking task
+/// It will read one batch at a time and will not do any buffering, to buffer 
data use [`spawn_buffered`]
+struct SpillReaderStream {
+    schema: SchemaRef,
+    state: SpillReaderStreamState,
+}
+
+/// When we poll for the next batch, we will get back both the batch and the 
reader,
+/// so we can call `next` again.
+type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, 
Option<RecordBatch>)>;
+
+enum SpillReaderStreamState {
+    /// Initial state: the stream was not initialized yet
+    /// and the file was not opened
+    Uninitialized(RefCountedTempFile),
+
+    /// A read is in progress in a spawned blocking task for which we hold the 
handle.
+    ReadInProgress(SpawnedTask<NextRecordBatchResult>),
+
+    /// A read has finished and we wait for being polled again in order to 
start reading the next batch.
+    Waiting(StreamReader<BufReader<File>>),
+
+    /// The stream has finished, successfully or not.
+    Done,
+}
+
+impl SpillReaderStream {
+    fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
+        Self {
+            schema,
+            state: SpillReaderStreamState::Uninitialized(spill_file),
+        }
+    }
+
+    fn poll_next_inner(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Result<RecordBatch>>> {
+        match &mut self.state {
+            SpillReaderStreamState::Uninitialized(_) => {
+                // Temporarily replace with `Done` to be able to pass the file 
to the task.

Review Comment:
   That is fair -- I don't have a strong preference about which pattern to use, 
I was just mentioning an alternate pattern as a possibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to