This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 35749607f5 [branch-53] perf: sort replace free()->try_grow() pattern 
with try_resize() to reduce memory pool interactions (#20733)
35749607f5 is described below

commit 35749607f585b3bf25b66b7d2289c56c18d03e4f
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Mar 5 23:05:19 2026 -0500

    [branch-53] perf: sort replace free()->try_grow() pattern with try_resize() 
to reduce memory pool interactions (#20733)
    
    Backport #20729 to `branch-53`.
---
 datafusion/physical-plan/src/sorts/sort.rs | 50 ++++++++++++------------------
 1 file changed, 20 insertions(+), 30 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index b3ea548d53..5b64f0b2a6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -730,37 +730,27 @@ impl ExternalSorter {
             // Sort the batch immediately and get all output batches
             let sorted_batches = sort_batch_chunked(&batch, &expressions, 
batch_size)?;
 
-            // Free the old reservation and grow it to match the actual sorted 
output size
-            reservation.free();
+            // Resize the reservation to match the actual sorted output size.
+            // Using try_resize avoids a release-then-reacquire cycle, which
+            // matters for MemoryPool implementations where grow/shrink have
+            // non-trivial cost (e.g. JNI calls in Comet).
+            let total_sorted_size: usize = sorted_batches
+                .iter()
+                .map(get_record_batch_memory_size)
+                .sum();
+            reservation
+                .try_resize(total_sorted_size)
+                .map_err(Self::err_with_oom_context)?;
 
-            Result::<_, DataFusionError>::Ok((schema, sorted_batches, 
reservation))
-        })
-        .then({
-            move |batches| async move {
-                match batches {
-                    Ok((schema, sorted_batches, reservation)) => {
-                        // Calculate the total size of sorted batches
-                        let total_sorted_size: usize = sorted_batches
-                            .iter()
-                            .map(get_record_batch_memory_size)
-                            .sum();
-                        reservation
-                            .try_grow(total_sorted_size)
-                            .map_err(Self::err_with_oom_context)?;
-
-                        // Wrap in ReservationStream to hold the reservation
-                        Ok(Box::pin(ReservationStream::new(
-                            Arc::clone(&schema),
-                            Box::pin(RecordBatchStreamAdapter::new(
-                                schema,
-                                
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
-                            )),
-                            reservation,
-                        )) as SendableRecordBatchStream)
-                    }
-                    Err(e) => Err(e),
-                }
-            }
+            // Wrap in ReservationStream to hold the reservation
+            Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
+                Arc::clone(&schema),
+                Box::pin(RecordBatchStreamAdapter::new(
+                    Arc::clone(&schema),
+                    futures::stream::iter(sorted_batches.into_iter().map(Ok)),
+                )),
+                reservation,
+            )) as SendableRecordBatchStream)
         })
         .try_flatten()
         .map(move |batch| match batch {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to