qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2036013722


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -535,56 +457,262 @@ impl ExternalSorter {
         // reserved again for the next spill.
         self.merge_reservation.free();
 
-        let mut sorted_stream =
+        let sorted_stream =
             self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+        debug!("SPM stream is constructed");
+
         // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
         // to construct a globally sorted stream.
         if !self.in_mem_batches.is_empty() {
             return internal_err!(
                 "in_mem_batches should be empty after constructing sorted 
stream"
             );
         }
-        // 'global' here refers to all buffered batches when the memory limit 
is
-        // reached. This variable will buffer the sorted batches after
-        // sort-preserving merge and incrementally append to spill files.
-        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
 
+        let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+        self.finished_spill_files.push(spill_file);
+
+        // Reserve headroom for next sort/merge
+        self.reserve_memory_for_merge()?;
+
+        Ok(())
+    }
+
+    /// Create a new spill file, and write all batches from the stream to the 
file.
+    ///
+    /// Note: After the spill is done, the memory reservation will be freed to 
0,
+    /// because `sorted_stream` holds all buffered batches.
+    async fn write_stream_to_spill_file(
+        &mut self,
+        mut sorted_stream: SendableRecordBatchStream,
+    ) -> Result<RefCountedTempFile> {
+        // Release the memory reserved for merge back to the pool so there is 
some
+        // left when the executed stream requests an allocation (now the 
stream to
+        // write are SortPreservingMergeStream, which requires memory).
+        // At the end of this function, memory will be reserved again for the 
next spill.
+        self.merge_reservation.free();
+
+        let mut in_progress_spill_file =
+            self.spill_manager.create_in_progress_file("Sorting")?;
+
+        // Incrementally append globally sorted batches to the spill file, 
because
+        // there might not be enough memory to materialize all batches at once.
         while let Some(batch) = sorted_stream.next().await {
-            let batch = batch?;
-            let sorted_size = get_reserved_byte_for_record_batch(&batch);
-            if self.reservation.try_grow(sorted_size).is_err() {
-                // Although the reservation is not enough, the batch is
-                // already in memory, so it's okay to combine it with 
previously
-                // sorted batches, and spill together.
-                globally_sorted_batches.push(batch);
-                self.consume_and_spill_append(&mut globally_sorted_batches)
-                    .await?; // reservation is freed in spill()
-            } else {
-                globally_sorted_batches.push(batch);
-            }
+            let mut batch = vec![batch?];
+            Self::organize_stringview_arrays(&mut batch)?;
+            in_progress_spill_file.append_batch(&batch[0])?;
         }
 
         // Drop early to free up memory reserved by the sorted stream, 
otherwise the
         // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
         drop(sorted_stream);
 
-        self.consume_and_spill_append(&mut globally_sorted_batches)
-            .await?;
-        self.spill_finish().await?;
+        // Reserve headroom for next sort/merge
+        self.reserve_memory_for_merge()?;
 
-        // Sanity check after spilling
-        let buffers_cleared_property =
-            self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-        if !buffers_cleared_property {
-            return internal_err!(
-                "in_mem_batches and globally_sorted_batches should be cleared 
before"
-            );
+        let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+            internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+        })?;
+
+        Ok(spill_file)
+    }
+
+    /// Sort-preserving merges the spilled files into a single file.
+    ///
+    /// All of input spill files are sorted by sort keys within each file, and 
the
+    /// returned file is also sorted by sort keys.
+    ///
+    /// This method consumes the input spill files, and returns a new compacted
+    /// spill file. After returnning, the input files will be cleaned up 
(deleted).
+    ///
+    /// # Example:
+    /// Input spill files:
+    ///     SpillFile1 (sorted by SortKeys):
+    ///         [batch1(100 rows)], [batch2(100 rows)]
+    ///     SpillFile2 (sorted by SortKeys):
+    ///         [batch1(100 rows)]
+    ///
+    /// After merging, it returns a new spill file:
+    ///     returns MergedSpillFile (sorted by SortKeys):
+    ///         [batch1(100 rows)], [batch2(100 rows)]
+    async fn consume_and_merge_spill_files(
+        &mut self,
+        input_spill_files: Vec<RefCountedTempFile>,
+    ) -> Result<RefCountedTempFile> {
+        // ==== Convert each spill file into a stream ====
+        let partially_sorted_streams = input_spill_files
+            .into_iter()
+            .map(|spill_file| {
+                if !spill_file.path().exists() {
+                    return internal_err!(
+                        "Spill file {:?} does not exist",
+                        spill_file.path()
+                    );
+                }
+
+                self.spill_manager.read_spill_as_stream(spill_file)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let sort_exprs: LexOrdering = self.expr.iter().cloned().collect();
+
+        // ==== Doing sort-preserving merge on input partially sorted streams 
====
+        let spm_stream = StreamingMergeBuilder::new()
+            .with_streams(partially_sorted_streams)
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(sort_exprs.as_ref())
+            .with_metrics(self.metrics.baseline.clone())
+            .with_batch_size(self.batch_size)
+            .with_fetch(None)
+            .with_reservation(self.merge_reservation.new_empty())
+            .build()?;
+
+        debug!("Combining spilled files");
+
+        // ==== Write to a single merged spill file ====
+        let merged_spill_file = 
self.write_stream_to_spill_file(spm_stream).await?;
+
+        Ok(merged_spill_file)
+    }
+
+    /// Maximum number of spill files to merge in a single pass
+    const MAX_SPILL_MERGE_DEGREE: usize = 8;
+
+    /// Performs a multi-pass merge of spilled files to create a globally 
sorted stream. The merge degree is limited by memory constraints.
+    /// - In each pass, existing spill files are split into groups, then 
sort-preserving merged, and re-spilled to a smaller number of spill files.
+    /// - For each combining step, up to the maximum merge degree of spill 
files are merged.
+    ///
+    /// # Example
+    /// ```text
+    /// Notation: batch(n) means a batch with n rows
+    ///
+    /// Max merge degree: 2
+    /// Initial spill files:
+    ///     spill_file_1: batch(100)
+    ///     spill_file_2: batch(100)
+    ///     spill_file_3: batch(100)
+    ///     spill_file_4: batch(100)
+    ///
+    /// After pass 1:
+    ///     spill_file_1: batch(100), batch(100)
+    ///     spill_file_2: batch(100), batch(100)
+    ///
+    /// After pass 2:
+    ///     merged_stream: batch(100), batch(100), batch(100), batch(100)
+    /// ```
+    async fn merge_spilled_files_multi_pass(
+        &mut self,
+    ) -> Result<SendableRecordBatchStream> {
+        // 
──────────────────────────────────────────────────────────────────────
+        // Edge cases
+        // 
──────────────────────────────────────────────────────────────────────
+        if self.finished_spill_files.is_empty() {
+            return internal_err!("No spilled files to merge");
+        }
+        if self.finished_spill_files.len() == 1 {
+            return self
+                .spill_manager
+                .read_spill_as_stream(self.finished_spill_files.remove(0));
         }
 
-        // Reserve headroom for next sort/merge
-        self.reserve_memory_for_merge()?;
+        // 
──────────────────────────────────────────────────────────────────────
+        // Merge spilled files in multiple pass
+        // 
──────────────────────────────────────────────────────────────────────
+        let spill_files = std::mem::take(&mut self.finished_spill_files);
+        let spill_files = self.merge_spill_files_multipass(spill_files).await?;
+
+        // 
──────────────────────────────────────────────────────────────────────
+        // Finally, <= max merge degree spilled files are left, merge them 
into a
+        // single globally sorted stream
+        // 
──────────────────────────────────────────────────────────────────────
+        let partially_sorted_streams = spill_files
+            .into_iter()
+            .map(|spill_file| 
self.spill_manager.read_spill_as_stream(spill_file))
+            .collect::<Result<Vec<_>>>()?;
 
-        Ok(())
+        // Edge cases
+        if partially_sorted_streams.is_empty() {
+            return internal_err!("No spilled files to merge");
+        }
+        if partially_sorted_streams.len() == 1 {
+            return Ok(partially_sorted_streams.into_iter().next().unwrap());
+        }
+
+        let sort_exprs: LexOrdering = self.expr.iter().cloned().collect();
+
+        let spm_stream = StreamingMergeBuilder::new()
+            .with_streams(partially_sorted_streams)
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(sort_exprs.as_ref())
+            .with_metrics(self.metrics.baseline.clone())
+            .with_batch_size(self.batch_size)
+            .with_fetch(None)
+            .with_reservation(self.merge_reservation.new_empty())
+            .build()?;
+
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            spm_stream,
+        )))
+    }
+
+    /// Iteratively merges and re-spills files until the number of spill files 
is ≤ MAX_SPILL_MERGE_DEGREE
+    async fn merge_spill_files_multipass(
+        &mut self,
+        mut spill_files_cur_pass: Vec<RefCountedTempFile>,
+    ) -> Result<Vec<RefCountedTempFile>> {
+        let mut spill_files_next_pass: Vec<RefCountedTempFile> = vec![];
+
+        // Merge spill files to the closest power of the configured max merge
+        // degree, until the number of spill files is <= max merge degree
+        //
+        // Example:
+        // initial spill files count: 30
+        // max merge degree: 4
+        // pass 1: merge 30 files into 16(4^2) files
+        // pass 2: merge 16 files into 4(4^1) files
+        // pass 3: now the number of spill files is <= max merge degree: merge 
them into a single sorted stream
+        while spill_files_cur_pass.len() > Self::MAX_SPILL_MERGE_DEGREE {
+            let log_base = Self::MAX_SPILL_MERGE_DEGREE as f64;
+            let num_files = spill_files_cur_pass.len() as f64;
+            let num_passes = num_files.log(log_base).ceil() as usize;
+            let next_pass_merge_degree = log_base.powi((num_passes - 1) as 
i32);

Review Comment:
   Maybe we can add a comment to explain what is `next_pass_merge_degree`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to