2010YOUY01 commented on code in PR #15355:
URL: https://github.com/apache/datafusion/pull/15355#discussion_r2009639272


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -379,46 +382,64 @@ impl ExternalSorter {
 
     /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
-        self.metrics.spilled_bytes.value()
+        self.metrics.spill_metrics.spilled_bytes.value()
     }
 
     /// How many rows have been spilled to disk?
     fn spilled_rows(&self) -> usize {
-        self.metrics.spilled_rows.value()
+        self.metrics.spill_metrics.spilled_rows.value()
     }
 
     /// How many spill files have been created?
     fn spill_count(&self) -> usize {
-        self.metrics.spill_count.value()
+        self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Writes any `in_memory_batches` to a spill file and clears
-    /// the batches. The contents of the spill file are sorted.
-    ///
-    /// Returns the amount of memory freed.
-    async fn spill(&mut self) -> Result<usize> {
+    /// When calling, all `in_mem_batches` must be sorted, and then all of 
them will
+    /// be appended to the in-progress spill file.
+    async fn spill_append(&mut self) -> Result<()> {
         // we could always get a chance to free some memory as long as we are 
holding some
         if self.in_mem_batches.is_empty() {
-            return Ok(0);
+            return Ok(());
+        }
+
+        // Lazily initialize the in-progress spill file
+        if self.in_progress_spill_file.is_none() {
+            self.in_progress_spill_file =
+                Some(self.spill_manager.create_in_progress_file("Sorting")?);
         }
 
         self.organize_stringview_arrays()?;
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
-        let (spilled_rows, spilled_bytes) = spill_record_batches(
-            &batches,
-            spill_file.path().into(),
-            Arc::clone(&self.schema),
-        )?;
-        let used = self.reservation.free();
-        self.metrics.spill_count.add(1);
-        self.metrics.spilled_bytes.add(spilled_bytes);
-        self.metrics.spilled_rows.add(spilled_rows);
-        self.spills.push(spill_file);
-        Ok(used)
+        self.reservation.free();
+
+        let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
+            internal_datafusion_err!("In-progress spill file should be 
initialized")
+        })?;
+
+        for batch in batches {
+            in_progress_file.append_batch(&batch)?;
+        }
+
+        Ok(())
+    }
+
+    /// Finishes the in-progress spill file and moves it to the finished spill 
files.
+    async fn spill_finish(&mut self) -> Result<()> {
+        let mut in_progress_file =

Review Comment:
   https://github.com/apache/datafusion/issues/15372



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -379,46 +382,64 @@ impl ExternalSorter {
 
     /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
-        self.metrics.spilled_bytes.value()
+        self.metrics.spill_metrics.spilled_bytes.value()
     }
 
     /// How many rows have been spilled to disk?
     fn spilled_rows(&self) -> usize {
-        self.metrics.spilled_rows.value()
+        self.metrics.spill_metrics.spilled_rows.value()
     }
 
     /// How many spill files have been created?
     fn spill_count(&self) -> usize {
-        self.metrics.spill_count.value()
+        self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Writes any `in_memory_batches` to a spill file and clears
-    /// the batches. The contents of the spill file are sorted.
-    ///
-    /// Returns the amount of memory freed.
-    async fn spill(&mut self) -> Result<usize> {
+    /// When calling, all `in_mem_batches` must be sorted, and then all of 
them will
+    /// be appended to the in-progress spill file.
+    async fn spill_append(&mut self) -> Result<()> {
         // we could always get a chance to free some memory as long as we are 
holding some
         if self.in_mem_batches.is_empty() {
-            return Ok(0);
+            return Ok(());
+        }
+
+        // Lazily initialize the in-progress spill file
+        if self.in_progress_spill_file.is_none() {
+            self.in_progress_spill_file =
+                Some(self.spill_manager.create_in_progress_file("Sorting")?);
         }
 
         self.organize_stringview_arrays()?;
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
-        let (spilled_rows, spilled_bytes) = spill_record_batches(
-            &batches,
-            spill_file.path().into(),
-            Arc::clone(&self.schema),
-        )?;
-        let used = self.reservation.free();
-        self.metrics.spill_count.add(1);
-        self.metrics.spilled_bytes.add(spilled_bytes);
-        self.metrics.spilled_rows.add(spilled_rows);
-        self.spills.push(spill_file);
-        Ok(used)
+        self.reservation.free();
+
+        let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
+            internal_datafusion_err!("In-progress spill file should be 
initialized")
+        })?;
+
+        for batch in batches {
+            in_progress_file.append_batch(&batch)?;
+        }
+
+        Ok(())
+    }
+
+    /// Finishes the in-progress spill file and moves it to the finished spill 
files.
+    async fn spill_finish(&mut self) -> Result<()> {
+        let mut in_progress_file =

Review Comment:
   Filed https://github.com/apache/datafusion/issues/15372



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to