rluvaton commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2030454805


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -535,56 +457,262 @@ impl ExternalSorter {
         // reserved again for the next spill.
         self.merge_reservation.free();
 
-        let mut sorted_stream =
+        let sorted_stream =
             self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+        debug!("SPM stream is constructed");
+
         // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
         // to construct a globally sorted stream.
         if !self.in_mem_batches.is_empty() {
             return internal_err!(
                 "in_mem_batches should be empty after constructing sorted 
stream"
             );
         }
-        // 'global' here refers to all buffered batches when the memory limit 
is
-        // reached. This variable will buffer the sorted batches after
-        // sort-preserving merge and incrementally append to spill files.
-        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
 
+        let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+        self.finished_spill_files.push(spill_file);
+
+        // Reserve headroom for next sort/merge
+        self.reserve_memory_for_merge()?;
+
+        Ok(())
+    }
+
+    /// Create a new spill file, and write all batches from the stream to the 
file.
+    ///
+    /// Note: After the spill is done, the memory reservation will be freed to 
0,
+    /// because `sorted_stream` holds all buffered batches.
+    async fn write_stream_to_spill_file(
+        &mut self,
+        mut sorted_stream: SendableRecordBatchStream,
+    ) -> Result<RefCountedTempFile> {
+        // Release the memory reserved for merge back to the pool so there is 
some
+        // left when the executed stream requests an allocation (now the 
stream to
+        // write are SortPreservingMergeStream, which requires memory).
+        // At the end of this function, memory will be reserved again for the 
next spill.
+        self.merge_reservation.free();
+
+        let mut in_progress_spill_file =
+            self.spill_manager.create_in_progress_file("Sorting")?;
+
+        // Incrementally append globally sorted batches to the spill file, 
because
+        // there might not be enough memory to materialize all batches at once.
         while let Some(batch) = sorted_stream.next().await {
-            let batch = batch?;
-            let sorted_size = get_reserved_byte_for_record_batch(&batch);
-            if self.reservation.try_grow(sorted_size).is_err() {
-                // Although the reservation is not enough, the batch is
-                // already in memory, so it's okay to combine it with 
previously
-                // sorted batches, and spill together.
-                globally_sorted_batches.push(batch);
-                self.consume_and_spill_append(&mut globally_sorted_batches)
-                    .await?; // reservation is freed in spill()
-            } else {
-                globally_sorted_batches.push(batch);
-            }
+            let mut batch = vec![batch?];
+            Self::organize_stringview_arrays(&mut batch)?;
+            in_progress_spill_file.append_batch(&batch[0])?;
         }
 
         // Drop early to free up memory reserved by the sorted stream, 
otherwise the
         // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
         drop(sorted_stream);
 
-        self.consume_and_spill_append(&mut globally_sorted_batches)
-            .await?;
-        self.spill_finish().await?;
+        // Reserve headroom for next sort/merge
+        self.reserve_memory_for_merge()?;
 
-        // Sanity check after spilling
-        let buffers_cleared_property =
-            self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-        if !buffers_cleared_property {
-            return internal_err!(
-                "in_mem_batches and globally_sorted_batches should be cleared 
before"
-            );
+        let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+            internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+        })?;
+
+        Ok(spill_file)
+    }
+
+    /// Sort-preserving merges the spilled files into a single file.
+    ///
+    /// All of input spill files are sorted by sort keys within each file, and 
the
+    /// returned file is also sorted by sort keys.
+    ///
+    /// This method consumes the input spill files, and returns a new compacted
+    /// spill file. After returnning, the input files will be cleaned up 
(deleted).
+    ///
+    /// # Example:
+    /// Input spill files:
+    ///     SpillFile1 (sorted by SortKeys):
+    ///         [batch1(100 rows)], [batch2(100 rows)]
+    ///     SpillFile2 (sorted by SortKeys):
+    ///         [batch1(100 rows)]
+    ///
+    /// After merging, it returns a new spill file:
+    ///     returns MergedSpillFile (sorted by SortKeys):
+    ///         [batch1(100 rows)], [batch2(100 rows)]
+    async fn consume_and_merge_spill_files(
+        &mut self,
+        input_spill_files: Vec<RefCountedTempFile>,
+    ) -> Result<RefCountedTempFile> {
+        // ==== Convert each spill file into a stream ====
+        let partially_sorted_streams = input_spill_files
+            .into_iter()
+            .map(|spill_file| {
+                if !spill_file.path().exists() {
+                    return internal_err!(
+                        "Spill file {:?} does not exist",
+                        spill_file.path()
+                    );
+                }
+
+                self.spill_manager.read_spill_as_stream(spill_file)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let sort_exprs: LexOrdering = self.expr.iter().cloned().collect();
+
+        // ==== Doing sort-preserving merge on input partially sorted streams 
====
+        let spm_stream = StreamingMergeBuilder::new()
+            .with_streams(partially_sorted_streams)
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(sort_exprs.as_ref())
+            .with_metrics(self.metrics.baseline.clone())
+            .with_batch_size(self.batch_size)
+            .with_fetch(None)
+            .with_reservation(self.merge_reservation.new_empty())
+            .build()?;
+
+        debug!("Combining spilled files");
+
+        // ==== Write to a single merged spill file ====
+        let merged_spill_file = 
self.write_stream_to_spill_file(spm_stream).await?;
+
+        Ok(merged_spill_file)
+    }
+
+    /// Maximum number of spill files to merge in a single pass
+    const MAX_SPILL_MERGE_DEGREE: usize = 8;

Review Comment:
   This should be configurable based on the number of available tokio blocking 
tasks I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to