alamb commented on code in PR #11218:
URL: https://github.com/apache/datafusion/pull/11218#discussion_r1676554150


##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -1005,6 +1035,71 @@ mod tests {
         assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
     }
 
+    #[test]
+    fn test_batch_spill_and_read() -> Result<()> {
+        let batch1 = build_table_i32(
+            ("a2", &vec![0, 1, 2]),
+            ("b2", &vec![3, 4, 5]),
+            ("c2", &vec![4, 5, 6]),
+        );
+
+        let batch2 = build_table_i32(
+            ("a2", &vec![10, 11, 12]),
+            ("b2", &vec![13, 14, 15]),
+            ("c2", &vec![14, 15, 16]),
+        );
+
+        let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
+
+        let spill_file = disk_manager.create_tmp_file("Test Spill")?;
+        let schema = batch1.schema();
+        let num_rows = batch1.num_rows() + batch2.num_rows();
+        let cnt = spill_record_batches(
+            vec![batch1, batch2],
+            spill_file.path().into(),
+            Arc::clone(&schema),
+        );
+        assert_eq!(cnt.unwrap(), num_rows);
+
+        let file = BufReader::new(File::open(spill_file.path())?);
+        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+
+        assert_eq!(reader.num_batches(), 2);
+        assert_eq!(reader.schema(), schema);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_batch_spill_by_size() -> Result<()> {
+        let batch1 = build_table_i32(
+            ("a2", &vec![0, 1, 2, 3]),
+            ("b2", &vec![3, 4, 5, 6]),
+            ("c2", &vec![4, 5, 6, 7]),
+        );
+
+        let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
+
+        let spill_file = disk_manager.create_tmp_file("Test Spill")?;
+        let schema = batch1.schema();
+        let num_rows = batch1.num_rows();
+        let cnt = spill_record_batch_by_size(
+            batch1,
+            spill_file.path().into(),
+            Arc::clone(&schema),
+            1,

Review Comment:
   👍 



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -852,6 +852,30 @@ pub fn spill_record_batches(
     Ok(writer.num_rows)
 }
 
+/// Spill the `RecordBatch` to disk as smaller batches
+/// split by `batch_size`
+/// Return `total_rows` what is spilled
+pub fn spill_record_batch_by_size(
+    batch: RecordBatch,
+    path: PathBuf,
+    schema: SchemaRef,
+    batch_size: usize,
+) -> Result<usize, DataFusionError> {

Review Comment:
   What is the rationale for writing out small batch sizes? Isn't that less 
efficient than writing out one batch (which was already buffered)? 
   
   Maybe it helps keep memory low when reading 🤔 



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -602,15 +622,38 @@ impl BufferedBatch {
             + mem::size_of::<Range<usize>>()
             + mem::size_of::<usize>();
 
+        let num_rows = batch.num_rows();
         BufferedBatch {
             batch,
             range,
             join_arrays,
             null_joined: vec![],
             size_estimation,
             join_filter_failed_idxs: HashSet::new(),
+            num_rows,
+            spill_file: None,
         }
     }
+
+    fn spill_to_disk(
+        &mut self,
+        path: RefCountedTempFile,
+        buffered_schema: SchemaRef,
+        batch_size: usize,
+    ) -> Result<()> {
+        let batch = std::mem::replace(

Review Comment:
   I wonder if using `Option<RecordBatch>` would be better here as it would 
make it clearer when the spill had happned versus there were no buffered 
batches.



##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -220,6 +220,21 @@ impl MemoryReservation {
         self.size = new_size
     }
 
+    /// Tries to free `capacity` bytes from this reservation
+    /// if `capacity` does not exceed [`Self::size`]

Review Comment:
   I think it would help to document that this returns an error if the capacity 
is exceeded
   
   I wonder in general if we should remove shrink and instead always use 
try_shrink (in a different PR of course) to catch any accounting bugs 🤔 



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -852,6 +852,30 @@ pub fn spill_record_batches(
     Ok(writer.num_rows)
 }
 
+/// Spill the `RecordBatch` to disk as smaller batches
+/// split by `batch_size`
+/// Return `total_rows` what is spilled
+pub fn spill_record_batch_by_size(

Review Comment:
   maybe eventually the spill related untilities could go into 
`physical-plan/src/spill.rs` though I see this PR simply follows the existing 
pattern so I think it is find



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -852,6 +852,30 @@ pub fn spill_record_batches(
     Ok(writer.num_rows)
 }
 
+/// Spill the `RecordBatch` to disk as smaller batches
+/// split by `batch_size`

Review Comment:
   I had to read the code to see if this was bytes or rows. I see it is rows so 
maybe it makes sense to name it `batch_size_rows` 
   
   ```suggestion
   /// split by `batch_size_rows`
   ```
   
   (and the actual parameter too)



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -602,15 +622,38 @@ impl BufferedBatch {
             + mem::size_of::<Range<usize>>()
             + mem::size_of::<usize>();
 
+        let num_rows = batch.num_rows();
         BufferedBatch {
             batch,
             range,
             join_arrays,
             null_joined: vec![],
             size_estimation,
             join_filter_failed_idxs: HashSet::new(),
+            num_rows,
+            spill_file: None,
         }
     }
+
+    fn spill_to_disk(
+        &mut self,
+        path: RefCountedTempFile,
+        buffered_schema: SchemaRef,
+        batch_size: usize,
+    ) -> Result<()> {
+        let batch = std::mem::replace(
+            &mut self.batch,
+            RecordBatch::new_empty(Arc::clone(&buffered_schema)),
+        );
+        let _ = spill_record_batch_by_size(
+            batch,
+            path.path().into(),
+            buffered_schema,
+            batch_size,
+        );
+        self.spill_file = Some(path);

Review Comment:
   I couldn't quite tell -- is it guaranteed that `self.spill_file` is None 
here. Is it possible to spill more than once? And if so, would this spill file 
get lost ?



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -602,15 +622,38 @@ impl BufferedBatch {
             + mem::size_of::<Range<usize>>()
             + mem::size_of::<usize>();
 
+        let num_rows = batch.num_rows();
         BufferedBatch {
             batch,
             range,
             join_arrays,
             null_joined: vec![],
             size_estimation,
             join_filter_failed_idxs: HashSet::new(),
+            num_rows,
+            spill_file: None,
         }
     }
+
+    fn spill_to_disk(
+        &mut self,
+        path: RefCountedTempFile,
+        buffered_schema: SchemaRef,
+        batch_size: usize,
+    ) -> Result<()> {
+        let batch = std::mem::replace(
+            &mut self.batch,
+            RecordBatch::new_empty(Arc::clone(&buffered_schema)),
+        );
+        let _ = spill_record_batch_by_size(

Review Comment:
   Should this return value be ignored? It seems like we should pass the error



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -867,12 +951,16 @@ impl SMJStream {
                     while !self.buffered_data.batches.is_empty() {
                         let head_batch = self.buffered_data.head_batch();
                         // If the head batch is fully processed, dequeue it 
and produce output of it.
-                        if head_batch.range.end == head_batch.batch.num_rows() 
{
+                        if head_batch.range.end == head_batch.num_rows {
                             self.freeze_dequeuing_buffered()?;
                             if let Some(buffered_batch) =
                                 self.buffered_data.batches.pop_front()
                             {
-                                
self.reservation.shrink(buffered_batch.size_estimation);
+                                // Shrink mem usage for non spilled batches 
only
+                                if buffered_batch.spill_file.is_none() {

Review Comment:
   It might be easier to reason about if the reservation was updated correctly 
by moving it into the method on `BufferedBatch`.  
   
   Something like this perhaps
   ```rust
   buffered_batch.free_reservation(&reservation)?;
   ```
   
   That would also make it easier to verify that `num_rows` and `batch` and 
`spill_file` were kept in sync



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -577,6 +595,8 @@ struct BufferedBatch {
     /// The indices of buffered batch that failed the join filter.
     /// When dequeuing the buffered batch, we need to produce null joined rows 
for these indices.
     pub join_filter_failed_idxs: HashSet<u64>,
+    pub num_rows: usize,

Review Comment:
   It would also help if we could document how to interpret `null_joined` -- 
specifically what do the indicies represent if both `batch` and `spill_file` 
are set? ? Or is a single buffered batch either in memory or spilled?



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -577,6 +595,8 @@ struct BufferedBatch {
     /// The indices of buffered batch that failed the join filter.
     /// When dequeuing the buffered batch, we need to produce null joined rows 
for these indices.
     pub join_filter_failed_idxs: HashSet<u64>,
+    pub num_rows: usize,

Review Comment:
   perhaps we can also document the meaning of these fields. Specifically, is 
num_rows the total number of rows in the `batch` or in the `spill_file` or in  
both?



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1503,12 +1579,39 @@ fn get_buffered_columns(
     buffered_batch_idx: usize,
     buffered_indices: &UInt64Array,
 ) -> Result<Vec<ArrayRef>, ArrowError> {
-    buffered_data.batches[buffered_batch_idx]
-        .batch
-        .columns()
-        .iter()
-        .map(|column| take(column, &buffered_indices, None))
-        .collect::<Result<Vec<_>, ArrowError>>()
+    get_buffered_columns_from_batch(
+        &buffered_data.batches[buffered_batch_idx],
+        buffered_indices,
+    )
+}
+
+#[inline(always)]
+fn get_buffered_columns_from_batch(
+    buffered_batch: &BufferedBatch,
+    buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+    if let Some(spill_file) = &buffered_batch.spill_file {
+        // if spilled read from disk in smaller sub batches
+        let mut buffered_cols: Vec<ArrayRef> = 
Vec::with_capacity(buffered_indices.len());
+
+        let file = BufReader::new(File::open(spill_file.path())?);
+        let reader = FileReader::try_new(file, None)?;
+
+        for batch in reader {
+            batch?.columns().iter().for_each(|column| {
+                buffered_cols.extend(take(column, &buffered_indices, None))

Review Comment:
   Is this right to take the indexes on each buffered batch? It seems like the 
indices need to account for the fact that multiple batches got read in from the 
spill file
   
   Or put another way, does `buffered_indices` refer to the overall logical 
indices within the `BufferedBatch` or is it within each potential `RecordBatch` 
read from the spill file?



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -333,10 +333,7 @@ impl ExternalSorter {
 
             for spill in self.spills.drain(..) {
                 if !spill.path().exists() {
-                    return Err(DataFusionError::Internal(format!(
-                        "Spill file {:?} does not exist",
-                        spill.path()
-                    )));
+                    return internal_err!("Spill file {:?} does not exist", 
spill.path());

Review Comment:
   drive by cleanups: 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to