alamb commented on code in PR #14644:
URL: https://github.com/apache/datafusion/pull/14644#discussion_r1957155614


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -302,31 +299,16 @@ impl ExternalSorter {
         }
         self.reserve_memory_for_merge()?;
 
-        let size = get_record_batch_memory_size(&input);
-
+        let size = get_reserved_byte_for_record_batch(&input);
         if self.reservation.try_grow(size).is_err() {
-            let before = self.reservation.size();
-            self.in_mem_sort().await?;
-
-            // Sorting may have freed memory, especially if fetch is `Some`
-            //
-            // As such we check again, and if the memory usage has dropped by
-            // a factor of 2, and we can allocate the necessary capacity,
-            // we don't spill
-            //
-            // The factor of 2 aims to avoid a degenerate case where the
-            // memory required for `fetch` is just under the memory available,
-            // causing repeated re-sorting of data
-            if self.reservation.size() > before / 2
-                || self.reservation.try_grow(size).is_err()
-            {
-                self.spill().await?;
-                self.reservation.try_grow(size)?
-            }
+            self.sort_or_spill_in_mem_batches().await?;
+            // We've already freed more than half of reserved memory,
+            // so we can grow the reservation again. There's nothing we can do
+            // if this try_grow fails.

Review Comment:
   > We can implement a more complicated multi-stage merging phase which merges 
a small portion of streams each time, and perform multiple rounds of merges to 
produce the final merged stream. However this approach is quite complex and out 
of the scope of this PR. I believe that there should be dedicated discussions 
around this problem.
   
   I agree we should implement such a feature as a dedicated, follow on PR / 
Project.
   
    @zhuqi-lucas or @Kontinuation is there   one of you can file a ticket to 
track the work. I think especially highlighting what cases the current code 
won't work  well is important. 
   
   It sounds like there are several issues:
   1. ensuring we can *always* spill data (now spilling will sometimes fail if 
we run out of memory to sort the batches in)
   2. Ensuring that we can *always* merge the data that was spilled, even if it 
had a really wide fanout (like 1000 of spill files)
   
   I think problem 1 could be solved by potentially spilling unsorted batches 
(and then sorting them separately). This woudl be less efficient (read/write 
some tuples twice but would work.
   
   Problem 2 could use the multi-pass merge that @Kontinuation describes



##########
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##########
@@ -54,7 +54,9 @@ async fn test_sort_10k_mem() {
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_100k_mem() {

Review Comment:
   Can you please also add some fuzz tests for the cases you are coverting in 
this PR? Specifically:
   1. `RecordBatch`es with String arrays
   2. `RecordBatch`es that have multiple columns (and thus use the Row format)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to