alamb commented on code in PR #14644: URL: https://github.com/apache/datafusion/pull/14644#discussion_r1957155614
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -302,31 +299,16 @@ impl ExternalSorter { } self.reserve_memory_for_merge()?; - let size = get_record_batch_memory_size(&input); - + let size = get_reserved_byte_for_record_batch(&input); if self.reservation.try_grow(size).is_err() { - let before = self.reservation.size(); - self.in_mem_sort().await?; - - // Sorting may have freed memory, especially if fetch is `Some` - // - // As such we check again, and if the memory usage has dropped by - // a factor of 2, and we can allocate the necessary capacity, - // we don't spill - // - // The factor of 2 aims to avoid a degenerate case where the - // memory required for `fetch` is just under the memory available, - // causing repeated re-sorting of data - if self.reservation.size() > before / 2 - || self.reservation.try_grow(size).is_err() - { - self.spill().await?; - self.reservation.try_grow(size)? - } + self.sort_or_spill_in_mem_batches().await?; + // We've already freed more than half of reserved memory, + // so we can grow the reservation again. There's nothing we can do + // if this try_grow fails. Review Comment: > We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem. I agree we should implement such a feature as a dedicated, follow on PR / Project. @zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important. It sounds like there are several issues: 1. ensuring we can *always* spill data (now spilling will sometimes fail if we run out of memory to sort the batches in) 2. Ensuring that we can *always* merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files) I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work. Problem 2 could use the multi-pass merge that @Kontinuation describes ########## datafusion/core/tests/fuzz_cases/sort_fuzz.rs: ########## @@ -54,7 +54,9 @@ async fn test_sort_10k_mem() { #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_100k_mem() { Review Comment: Can you please also add some fuzz tests for the cases you are coverting in this PR? Specifically: 1. `RecordBatch`es with String arrays 2. `RecordBatch`es that have multiple columns (and thus use the Row format) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org