alamb commented on code in PR #15610: URL: https://github.com/apache/datafusion/pull/15610#discussion_r2035293849
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -535,56 +457,262 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = + let sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + debug!("SPM stream is constructed"); + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. if !self.in_mem_batches.is_empty() { return internal_err!( "in_mem_batches should be empty after constructing sorted stream" ); } - // 'global' here refers to all buffered batches when the memory limit is - // reached. This variable will buffer the sorted batches after - // sort-preserving merge and incrementally append to spill files. - let mut globally_sorted_batches: Vec<RecordBatch> = vec![]; + let spill_file = self.write_stream_to_spill_file(sorted_stream).await?; + self.finished_spill_files.push(spill_file); + + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + Ok(()) + } + + /// Create a new spill file, and write all batches from the stream to the file. + /// + /// Note: After the spill is done, the memory reservation will be freed to 0, + /// because `sorted_stream` holds all buffered batches. + async fn write_stream_to_spill_file( + &mut self, + mut sorted_stream: SendableRecordBatchStream, + ) -> Result<RefCountedTempFile> { + // Release the memory reserved for merge back to the pool so there is some + // left when the executed stream requests an allocation (now the stream to + // write are SortPreservingMergeStream, which requires memory). + // At the end of this function, memory will be reserved again for the next spill. + self.merge_reservation.free(); + + let mut in_progress_spill_file = + self.spill_manager.create_in_progress_file("Sorting")?; + + // Incrementally append globally sorted batches to the spill file, because + // there might not be enough memory to materialize all batches at once. while let Some(batch) = sorted_stream.next().await { - let batch = batch?; - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Although the reservation is not enough, the batch is - // already in memory, so it's okay to combine it with previously - // sorted batches, and spill together. - globally_sorted_batches.push(batch); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; // reservation is freed in spill() - } else { - globally_sorted_batches.push(batch); - } + let mut batch = vec![batch?]; + Self::organize_stringview_arrays(&mut batch)?; + in_progress_spill_file.append_batch(&batch[0])?; } // Drop early to free up memory reserved by the sorted stream, otherwise the // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. drop(sorted_stream); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; - self.spill_finish().await?; + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; - // Sanity check after spilling - let buffers_cleared_property = - self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty(); - if !buffers_cleared_property { - return internal_err!( - "in_mem_batches and globally_sorted_batches should be cleared before" - ); + let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| { + internal_datafusion_err!("Writing stream with 0 batch is not allowed") + })?; + + Ok(spill_file) + } + + /// Sort-preserving merges the spilled files into a single file. + /// + /// All of input spill files are sorted by sort keys within each file, and the + /// returned file is also sorted by sort keys. + /// + /// This method consumes the input spill files, and returns a new compacted + /// spill file. After returnning, the input files will be cleaned up (deleted). + /// + /// # Example: + /// Input spill files: + /// SpillFile1 (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + /// SpillFile2 (sorted by SortKeys): + /// [batch1(100 rows)] + /// + /// After merging, it returns a new spill file: + /// returns MergedSpillFile (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + async fn consume_and_merge_spill_files( + &mut self, + input_spill_files: Vec<RefCountedTempFile>, + ) -> Result<RefCountedTempFile> { + // ==== Convert each spill file into a stream ==== + let partially_sorted_streams = input_spill_files + .into_iter() + .map(|spill_file| { + if !spill_file.path().exists() { + return internal_err!( + "Spill file {:?} does not exist", + spill_file.path() + ); + } + + self.spill_manager.read_spill_as_stream(spill_file) + }) + .collect::<Result<Vec<_>>>()?; + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + // ==== Doing sort-preserving merge on input partially sorted streams ==== + let spm_stream = StreamingMergeBuilder::new() Review Comment: It is already pretty well optimized (not that it couldn't be made better) but there isn't a lot of low hanging fruit in my opinion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org