qstommyshu commented on code in PR #15610: URL: https://github.com/apache/datafusion/pull/15610#discussion_r2034251397
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -535,56 +457,262 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = + let sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + debug!("SPM stream is constructed"); + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. if !self.in_mem_batches.is_empty() { return internal_err!( "in_mem_batches should be empty after constructing sorted stream" ); } - // 'global' here refers to all buffered batches when the memory limit is - // reached. This variable will buffer the sorted batches after - // sort-preserving merge and incrementally append to spill files. - let mut globally_sorted_batches: Vec<RecordBatch> = vec![]; + let spill_file = self.write_stream_to_spill_file(sorted_stream).await?; + self.finished_spill_files.push(spill_file); + + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + Ok(()) + } + + /// Create a new spill file, and write all batches from the stream to the file. + /// + /// Note: After the spill is done, the memory reservation will be freed to 0, + /// because `sorted_stream` holds all buffered batches. + async fn write_stream_to_spill_file( + &mut self, + mut sorted_stream: SendableRecordBatchStream, + ) -> Result<RefCountedTempFile> { + // Release the memory reserved for merge back to the pool so there is some + // left when the executed stream requests an allocation (now the stream to + // write are SortPreservingMergeStream, which requires memory). + // At the end of this function, memory will be reserved again for the next spill. + self.merge_reservation.free(); + + let mut in_progress_spill_file = + self.spill_manager.create_in_progress_file("Sorting")?; + + // Incrementally append globally sorted batches to the spill file, because + // there might not be enough memory to materialize all batches at once. while let Some(batch) = sorted_stream.next().await { - let batch = batch?; - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Although the reservation is not enough, the batch is - // already in memory, so it's okay to combine it with previously - // sorted batches, and spill together. - globally_sorted_batches.push(batch); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; // reservation is freed in spill() - } else { - globally_sorted_batches.push(batch); - } + let mut batch = vec![batch?]; + Self::organize_stringview_arrays(&mut batch)?; + in_progress_spill_file.append_batch(&batch[0])?; } // Drop early to free up memory reserved by the sorted stream, otherwise the // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. drop(sorted_stream); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; - self.spill_finish().await?; + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; - // Sanity check after spilling - let buffers_cleared_property = - self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty(); - if !buffers_cleared_property { - return internal_err!( - "in_mem_batches and globally_sorted_batches should be cleared before" - ); + let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| { + internal_datafusion_err!("Writing stream with 0 batch is not allowed") + })?; + + Ok(spill_file) + } + + /// Sort-preserving merges the spilled files into a single file. + /// + /// All of input spill files are sorted by sort keys within each file, and the + /// returned file is also sorted by sort keys. + /// + /// This method consumes the input spill files, and returns a new compacted + /// spill file. After returnning, the input files will be cleaned up (deleted). + /// + /// # Example: + /// Input spill files: + /// SpillFile1 (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + /// SpillFile2 (sorted by SortKeys): + /// [batch1(100 rows)] + /// + /// After merging, it returns a new spill file: + /// returns MergedSpillFile (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + async fn consume_and_merge_spill_files( + &mut self, + input_spill_files: Vec<RefCountedTempFile>, + ) -> Result<RefCountedTempFile> { + // ==== Convert each spill file into a stream ==== + let partially_sorted_streams = input_spill_files + .into_iter() + .map(|spill_file| { + if !spill_file.path().exists() { + return internal_err!( + "Spill file {:?} does not exist", + spill_file.path() + ); + } + + self.spill_manager.read_spill_as_stream(spill_file) + }) + .collect::<Result<Vec<_>>>()?; + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + // ==== Doing sort-preserving merge on input partially sorted streams ==== + let spm_stream = StreamingMergeBuilder::new() + .with_streams(partially_sorted_streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(sort_exprs.as_ref()) + .with_metrics(self.metrics.baseline.clone()) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build()?; + + debug!("Combining spilled files"); + + // ==== Write to a single merged spill file ==== + let merged_spill_file = self.write_stream_to_spill_file(spm_stream).await?; + + Ok(merged_spill_file) + } + + /// Maximum number of spill files to merge in a single pass + const MAX_SPILL_MERGE_DEGREE: usize = 8; + + /// Performs a multi-pass merge of spilled files to create a globally sorted stream. The merge degree is limited by memory constraints. + /// - In each pass, existing spill files are split into groups, then sort-preserving merged, and re-spilled to a smaller number of spill files. + /// - For each combining step, up to the maximum merge degree of spill files are merged. + /// + /// # Example + /// ```text + /// Notation: batch(n) means a batch with n rows + /// + /// Max merge degree: 2 + /// Initial spill files: + /// spill_file_1: batch(100) + /// spill_file_2: batch(100) + /// spill_file_3: batch(100) + /// spill_file_4: batch(100) + /// + /// After pass 1: + /// spill_file_1: batch(100), batch(100) + /// spill_file_2: batch(100), batch(100) + /// + /// After pass 2: + /// merged_stream: batch(100), batch(100), batch(100), batch(100) + /// ``` + async fn merge_spilled_files_multi_pass( + &mut self, + ) -> Result<SendableRecordBatchStream> { + // ────────────────────────────────────────────────────────────────────── + // Edge cases + // ────────────────────────────────────────────────────────────────────── + if self.finished_spill_files.is_empty() { + return internal_err!("No spilled files to merge"); + } + if self.finished_spill_files.len() == 1 { + return self + .spill_manager + .read_spill_as_stream(self.finished_spill_files.remove(0)); } - // Reserve headroom for next sort/merge - self.reserve_memory_for_merge()?; + // ────────────────────────────────────────────────────────────────────── + // Recursively merge spilled files + // ────────────────────────────────────────────────────────────────────── + let spill_files = std::mem::take(&mut self.finished_spill_files); + let spill_files = self.recursively_merge_spill_files(spill_files).await?; Review Comment: Maybe we can avoid recursion here if we don't have to use it? The maximum number of pass of multi-pass external merge sort is "Total Passes = 1 (initial run) + ⌈log_d (number of runs)⌉" for d way merge sort. We can use this information to convert the recursion into a for loop (recursion has many performance disadvantages compare to loop). ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -535,56 +457,262 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = + let sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + debug!("SPM stream is constructed"); + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. if !self.in_mem_batches.is_empty() { return internal_err!( "in_mem_batches should be empty after constructing sorted stream" ); } - // 'global' here refers to all buffered batches when the memory limit is - // reached. This variable will buffer the sorted batches after - // sort-preserving merge and incrementally append to spill files. - let mut globally_sorted_batches: Vec<RecordBatch> = vec![]; + let spill_file = self.write_stream_to_spill_file(sorted_stream).await?; + self.finished_spill_files.push(spill_file); + + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + Ok(()) + } + + /// Create a new spill file, and write all batches from the stream to the file. + /// + /// Note: After the spill is done, the memory reservation will be freed to 0, + /// because `sorted_stream` holds all buffered batches. + async fn write_stream_to_spill_file( + &mut self, + mut sorted_stream: SendableRecordBatchStream, + ) -> Result<RefCountedTempFile> { + // Release the memory reserved for merge back to the pool so there is some + // left when the executed stream requests an allocation (now the stream to + // write are SortPreservingMergeStream, which requires memory). + // At the end of this function, memory will be reserved again for the next spill. + self.merge_reservation.free(); + + let mut in_progress_spill_file = + self.spill_manager.create_in_progress_file("Sorting")?; + + // Incrementally append globally sorted batches to the spill file, because + // there might not be enough memory to materialize all batches at once. while let Some(batch) = sorted_stream.next().await { - let batch = batch?; - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Although the reservation is not enough, the batch is - // already in memory, so it's okay to combine it with previously - // sorted batches, and spill together. - globally_sorted_batches.push(batch); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; // reservation is freed in spill() - } else { - globally_sorted_batches.push(batch); - } + let mut batch = vec![batch?]; + Self::organize_stringview_arrays(&mut batch)?; + in_progress_spill_file.append_batch(&batch[0])?; } // Drop early to free up memory reserved by the sorted stream, otherwise the // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. drop(sorted_stream); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; - self.spill_finish().await?; + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; - // Sanity check after spilling - let buffers_cleared_property = - self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty(); - if !buffers_cleared_property { - return internal_err!( - "in_mem_batches and globally_sorted_batches should be cleared before" - ); + let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| { + internal_datafusion_err!("Writing stream with 0 batch is not allowed") + })?; + + Ok(spill_file) + } + + /// Sort-preserving merges the spilled files into a single file. + /// + /// All of input spill files are sorted by sort keys within each file, and the + /// returned file is also sorted by sort keys. + /// + /// This method consumes the input spill files, and returns a new compacted + /// spill file. After returnning, the input files will be cleaned up (deleted). + /// + /// # Example: + /// Input spill files: + /// SpillFile1 (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + /// SpillFile2 (sorted by SortKeys): + /// [batch1(100 rows)] + /// + /// After merging, it returns a new spill file: + /// returns MergedSpillFile (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + async fn consume_and_merge_spill_files( + &mut self, + input_spill_files: Vec<RefCountedTempFile>, + ) -> Result<RefCountedTempFile> { + // ==== Convert each spill file into a stream ==== + let partially_sorted_streams = input_spill_files + .into_iter() + .map(|spill_file| { + if !spill_file.path().exists() { + return internal_err!( + "Spill file {:?} does not exist", + spill_file.path() + ); + } + + self.spill_manager.read_spill_as_stream(spill_file) + }) + .collect::<Result<Vec<_>>>()?; + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + // ==== Doing sort-preserving merge on input partially sorted streams ==== + let spm_stream = StreamingMergeBuilder::new() + .with_streams(partially_sorted_streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(sort_exprs.as_ref()) + .with_metrics(self.metrics.baseline.clone()) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build()?; + + debug!("Combining spilled files"); + + // ==== Write to a single merged spill file ==== + let merged_spill_file = self.write_stream_to_spill_file(spm_stream).await?; + + Ok(merged_spill_file) + } + + /// Maximum number of spill files to merge in a single pass + const MAX_SPILL_MERGE_DEGREE: usize = 8; + + /// Performs a multi-pass merge of spilled files to create a globally sorted stream. The merge degree is limited by memory constraints. + /// - In each pass, existing spill files are split into groups, then sort-preserving merged, and re-spilled to a smaller number of spill files. + /// - For each combining step, up to the maximum merge degree of spill files are merged. + /// + /// # Example + /// ```text + /// Notation: batch(n) means a batch with n rows + /// + /// Max merge degree: 2 + /// Initial spill files: + /// spill_file_1: batch(100) + /// spill_file_2: batch(100) + /// spill_file_3: batch(100) + /// spill_file_4: batch(100) + /// + /// After pass 1: + /// spill_file_1: batch(100), batch(100) + /// spill_file_2: batch(100), batch(100) + /// + /// After pass 2: + /// merged_stream: batch(100), batch(100), batch(100), batch(100) + /// ``` + async fn merge_spilled_files_multi_pass( + &mut self, + ) -> Result<SendableRecordBatchStream> { + // ────────────────────────────────────────────────────────────────────── + // Edge cases + // ────────────────────────────────────────────────────────────────────── + if self.finished_spill_files.is_empty() { + return internal_err!("No spilled files to merge"); + } + if self.finished_spill_files.len() == 1 { + return self + .spill_manager + .read_spill_as_stream(self.finished_spill_files.remove(0)); } - // Reserve headroom for next sort/merge - self.reserve_memory_for_merge()?; + // ────────────────────────────────────────────────────────────────────── + // Recursively merge spilled files + // ────────────────────────────────────────────────────────────────────── + let spill_files = std::mem::take(&mut self.finished_spill_files); + let spill_files = self.recursively_merge_spill_files(spill_files).await?; + + // ────────────────────────────────────────────────────────────────────── + // Finally, <= max merge degree spilled files are left, merge them into a + // single globally sorted stream + // ────────────────────────────────────────────────────────────────────── + let partially_sorted_streams = spill_files + .into_iter() + .map(|spill_file| self.spill_manager.read_spill_as_stream(spill_file)) + .collect::<Result<Vec<_>>>()?; - Ok(()) + // Edge cases + if partially_sorted_streams.is_empty() { + return internal_err!("No spilled files to merge"); + } + if partially_sorted_streams.len() == 1 { + return Ok(partially_sorted_streams.into_iter().next().unwrap()); + } + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + let spm_stream = StreamingMergeBuilder::new() + .with_streams(partially_sorted_streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(sort_exprs.as_ref()) + .with_metrics(self.metrics.baseline.clone()) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build()?; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + spm_stream, + ))) + } + + /// Recursively merges and re-spills files until the number of spill files is ≤ MAX_SPILL_MERGE_DEGREE + async fn recursively_merge_spill_files( Review Comment: I find this name misleading, looks like this is not a recursive function. ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -535,56 +457,262 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = + let sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + debug!("SPM stream is constructed"); + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. if !self.in_mem_batches.is_empty() { return internal_err!( "in_mem_batches should be empty after constructing sorted stream" ); } - // 'global' here refers to all buffered batches when the memory limit is - // reached. This variable will buffer the sorted batches after - // sort-preserving merge and incrementally append to spill files. - let mut globally_sorted_batches: Vec<RecordBatch> = vec![]; + let spill_file = self.write_stream_to_spill_file(sorted_stream).await?; + self.finished_spill_files.push(spill_file); + + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + Ok(()) + } + + /// Create a new spill file, and write all batches from the stream to the file. + /// + /// Note: After the spill is done, the memory reservation will be freed to 0, + /// because `sorted_stream` holds all buffered batches. + async fn write_stream_to_spill_file( + &mut self, + mut sorted_stream: SendableRecordBatchStream, + ) -> Result<RefCountedTempFile> { + // Release the memory reserved for merge back to the pool so there is some + // left when the executed stream requests an allocation (now the stream to + // write are SortPreservingMergeStream, which requires memory). + // At the end of this function, memory will be reserved again for the next spill. + self.merge_reservation.free(); + + let mut in_progress_spill_file = + self.spill_manager.create_in_progress_file("Sorting")?; + + // Incrementally append globally sorted batches to the spill file, because + // there might not be enough memory to materialize all batches at once. while let Some(batch) = sorted_stream.next().await { - let batch = batch?; - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Although the reservation is not enough, the batch is - // already in memory, so it's okay to combine it with previously - // sorted batches, and spill together. - globally_sorted_batches.push(batch); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; // reservation is freed in spill() - } else { - globally_sorted_batches.push(batch); - } + let mut batch = vec![batch?]; + Self::organize_stringview_arrays(&mut batch)?; + in_progress_spill_file.append_batch(&batch[0])?; } // Drop early to free up memory reserved by the sorted stream, otherwise the // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. drop(sorted_stream); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; - self.spill_finish().await?; + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; - // Sanity check after spilling - let buffers_cleared_property = - self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty(); - if !buffers_cleared_property { - return internal_err!( - "in_mem_batches and globally_sorted_batches should be cleared before" - ); + let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| { + internal_datafusion_err!("Writing stream with 0 batch is not allowed") + })?; + + Ok(spill_file) + } + + /// Sort-preserving merges the spilled files into a single file. + /// + /// All of input spill files are sorted by sort keys within each file, and the + /// returned file is also sorted by sort keys. + /// + /// This method consumes the input spill files, and returns a new compacted + /// spill file. After returnning, the input files will be cleaned up (deleted). + /// + /// # Example: + /// Input spill files: + /// SpillFile1 (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + /// SpillFile2 (sorted by SortKeys): + /// [batch1(100 rows)] + /// + /// After merging, it returns a new spill file: + /// returns MergedSpillFile (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + async fn consume_and_merge_spill_files( + &mut self, + input_spill_files: Vec<RefCountedTempFile>, + ) -> Result<RefCountedTempFile> { + // ==== Convert each spill file into a stream ==== + let partially_sorted_streams = input_spill_files + .into_iter() + .map(|spill_file| { + if !spill_file.path().exists() { + return internal_err!( + "Spill file {:?} does not exist", + spill_file.path() + ); + } + + self.spill_manager.read_spill_as_stream(spill_file) + }) + .collect::<Result<Vec<_>>>()?; + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + // ==== Doing sort-preserving merge on input partially sorted streams ==== + let spm_stream = StreamingMergeBuilder::new() Review Comment: I wonder if this StreamingMergeBuilder uses heap under the hood, using heap is a common method to optimize external merge sort performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org