rluvaton commented on code in PR #22945:
URL: https://github.com/apache/datafusion/pull/22945#discussion_r3412849721
##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -427,8 +470,111 @@ impl MultiLevelMergeBuilder {
.drain(..number_of_spills_to_read_for_current_phase)
.collect::<Vec<_>>();
- Ok((spills, buffer_len))
+ Ok(SpillFilesToMerge::Ready(spills, buffer_len))
+ }
+
+ /// Re-spill the spill file at `index` with half its batch size, putting
it back
+ /// at the same position. We read the file back and re-spill it through
the normal
+ /// spill API (which owns batch layout).
+ /// Slicing each batch in two halves the largest written batch,
+ /// which lowers the per-stream merge reservation so the
+ /// next attempt can seat both streams. One stream's worth of memory is
reserved
+ /// for the duration and freed afterwards. Makes the merge resilient to
skew.
+ async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> {
+ let target = self.sorted_spill_files.remove(index);
+ let old_max = target.max_record_batch_memory;
+
+ // Reserve enough to hold a single stream of this file while we
re-spill it.
+ let reservation = self.reservation.new_empty();
+ reservation
+ .try_grow(get_reserved_bytes_for_record_batch_size(old_max,
old_max))?;
+
+ let source = self
+ .spill_manager
+ .read_spill_as_stream(target.file, Some(old_max))?;
+ // Re-spill with half the batch size: slice every batch in two. The
spill
+ // writer owns the batch layout, we only change how many rows per
batch.
+ let mut halved: SendableRecordBatchStream =
+ Box::pin(RecordBatchStreamAdapter::new(
+ Arc::clone(&self.schema),
+ source.flat_map(|batch| {
+ futures::stream::iter(match batch {
+ Ok(batch) => split_batch_in_half(batch)
+ .into_iter()
+ .map(Ok)
+ .collect::<Vec<_>>(),
+ Err(e) => vec![Err(e)],
+ })
+ }),
+ ));
+
+ let result = self
+ .spill_manager
+ .spill_record_batch_stream_and_return_max_batch_memory(
+ &mut halved,
+ "MultiLevelMergeBuilder split skewed spill",
+ )
+ .await?;
+
+ reservation.free();
+
+ let Some((file, new_max)) = result else {
+ return internal_err!("re-spilling a skewed spill file produced no
data");
+ };
+
+ // If halving could not reduce the largest batch (e.g. a single row
that is
+ // itself wider than the budget), there is nothing more we can do -
surface
+ // the out-of-memory condition instead of looping forever.
+ if new_max >= old_max {
+ return resources_err!(
+ "Cannot merge sorted runs: a single record batch of {old_max}
bytes \
+ exceeds the available merge memory and cannot be split
further"
+ );
+ }
Review Comment:
this is why I said strings and not string view
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]