2010YOUY01 commented on code in PR #15355:
URL: https://github.com/apache/datafusion/pull/15355#discussion_r2009638919


##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -223,25 +229,182 @@ impl IPCStreamWriter {
     }
 }
 
+/// The `SpillManager` is responsible for the following tasks:
+/// - Reading and writing `RecordBatch`es to raw files based on the provided 
configurations.
+/// - Updating the associated metrics.
+///
+/// Note: The caller (external operators such as `SortExec`) is responsible 
for interpreting the spilled files.
+/// For example, all records within the same spill file are ordered according 
to a specific order.
+#[derive(Debug, Clone)]
+pub(crate) struct SpillManager {
+    env: Arc<RuntimeEnv>,
+    metrics: SpillMetrics,
+    schema: SchemaRef,
+    /// Number of batches to buffer in memory during disk reads
+    batch_read_buffer_capacity: usize,
+    // TODO: Add general-purpose compression options
+}
+
+impl SpillManager {
+    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) 
-> Self {
+        Self {
+            env,
+            metrics,
+            schema,
+            batch_read_buffer_capacity: 2,
+        }
+    }
+
+    /// Creates a temporary file for in-progress operations, returning an error
+    /// message if file creation fails. The file can be used to append batches
+    /// incrementally and then finish the file when done.
+    pub fn create_in_progress_file(
+        &self,
+        request_msg: &str,
+    ) -> Result<InProgressSpillFile> {
+        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
+        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
+    }
+
+    /// Spill input `batches` into a single file in a atomic operation. If it 
is
+    /// intended to incrementally write in-memory batches into the same spill 
file,
+    /// use [`Self::create_in_progress_file`] instead.
+    /// None is returned if no batches are spilled.
+    #[allow(dead_code)] // TODO: remove after change SPM to use SpillManager
+    pub fn spill_record_batch_and_finish(
+        &self,
+        batches: &[RecordBatch],
+        request_msg: &str,
+    ) -> Result<Option<RefCountedTempFile>> {
+        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
+
+        for batch in batches {
+            in_progress_file.append_batch(batch)?;
+        }
+
+        in_progress_file.finish()
+    }
+
+    /// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+    /// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+    #[allow(dead_code)] // TODO: remove after change aggregate to use 
SpillManager
+    pub fn spill_record_batch_by_size(
+        &self,
+        batch: &RecordBatch,
+        request_description: &str,
+        row_limit: usize,
+    ) -> Result<Option<RefCountedTempFile>> {
+        let total_rows = batch.num_rows();
+        let mut batches = Vec::new();
+        let mut offset = 0;
+
+        // It's ok to calculate all slices first, because slicing is zero-copy.
+        while offset < total_rows {
+            let length = std::cmp::min(total_rows - offset, row_limit);
+            let sliced_batch = batch.slice(offset, length);
+            batches.push(sliced_batch);
+            offset += length;
+        }
+
+        // Spill the sliced batches to disk
+        self.spill_record_batch_and_finish(&batches, request_description)
+    }
+
+    /// Reads a spill file as a stream. The file must be created by the 
current `SpillManager`.
+    pub fn read_spill_as_stream(
+        &self,
+        spill_file_path: RefCountedTempFile,
+    ) -> Result<SendableRecordBatchStream> {
+        let mut builder = RecordBatchReceiverStream::builder(
+            Arc::clone(&self.schema),
+            self.batch_read_buffer_capacity,
+        );
+        let sender = builder.tx();
+
+        builder.spawn_blocking(move || read_spill(sender, 
spill_file_path.path()));
+
+        Ok(builder.build())
+    }
+}
+
+pub(crate) struct InProgressSpillFile {

Review Comment:
   Addressed in 
[bf4ab62](https://github.com/apache/datafusion/pull/15355/commits/bf4ab62ceb5333e1180b20f68a87f467ad047a82)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to