Kontinuation commented on code in PR #1511: URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2022987423
########## native/core/src/execution/shuffle/shuffle_writer.rs: ########## @@ -852,16 +1079,64 @@ impl PartitionBuffer { file: spill_data, }); } - self.spill_file - .as_mut() - .unwrap() - .file - .write_all(&output_batches)?; + Ok(()) + } +} + +/// Write batches to writer while using a buffer to avoid frequent system calls. +/// The record batches were first written by ShuffleBlockWriter into an internal buffer. +/// Once the buffer exceeds the max size, the buffer will be flushed to the writer. +struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> { + shuffle_block_writer: S, + writer: W, + buffer: Vec<u8>, + buffer_max_size: usize, +} + +impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> { + fn new(shuffle_block_writer: S, writer: W) -> Self { + // 1MB should be good enough to avoid frequent system calls, + // and also won't cause too much memory usage + let buffer_max_size = 1024 * 1024; + Self { + shuffle_block_writer, + writer, + buffer: vec![], + buffer_max_size, + } + } + + fn write( + &mut self, + batch: &RecordBatch, + encode_time: &Time, + write_time: &Time, + ) -> Result<usize> { + let mut cursor = Cursor::new(&mut self.buffer); + cursor.seek(SeekFrom::End(0))?; + let mut write_timer = write_time.timer(); + let bytes_written = + self.shuffle_block_writer + .borrow() + .write_batch(batch, &mut cursor, encode_time)?; + let pos = cursor.position(); + if pos >= self.buffer_max_size as u64 { + self.writer.write_all(&self.buffer)?; + self.buffer.clear(); + } write_timer.stop(); Review Comment: Just found that I have removed the timing for std::io::copy and BufWriter.flush, I have added them back in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org