Kontinuation commented on code in PR #1511:
URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2022987423


##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -852,16 +1079,64 @@ impl PartitionBuffer {
                 file: spill_data,
             });
         }
-        self.spill_file
-            .as_mut()
-            .unwrap()
-            .file
-            .write_all(&output_batches)?;
+        Ok(())
+    }
+}
+
+/// Write batches to writer while using a buffer to avoid frequent system 
calls.
+/// The record batches were first written by ShuffleBlockWriter into an 
internal buffer.
+/// Once the buffer exceeds the max size, the buffer will be flushed to the 
writer.
+struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
+    shuffle_block_writer: S,
+    writer: W,
+    buffer: Vec<u8>,
+    buffer_max_size: usize,
+}
+
+impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
+    fn new(shuffle_block_writer: S, writer: W) -> Self {
+        // 1MB should be good enough to avoid frequent system calls,
+        // and also won't cause too much memory usage
+        let buffer_max_size = 1024 * 1024;
+        Self {
+            shuffle_block_writer,
+            writer,
+            buffer: vec![],
+            buffer_max_size,
+        }
+    }
+
+    fn write(
+        &mut self,
+        batch: &RecordBatch,
+        encode_time: &Time,
+        write_time: &Time,
+    ) -> Result<usize> {
+        let mut cursor = Cursor::new(&mut self.buffer);
+        cursor.seek(SeekFrom::End(0))?;
+        let mut write_timer = write_time.timer();
+        let bytes_written =
+            self.shuffle_block_writer
+                .borrow()
+                .write_batch(batch, &mut cursor, encode_time)?;
+        let pos = cursor.position();
+        if pos >= self.buffer_max_size as u64 {
+            self.writer.write_all(&self.buffer)?;
+            self.buffer.clear();
+        }
         write_timer.stop();

Review Comment:
   Just found that I have removed the timing for std::io::copy and 
BufWriter.flush, I have added them back in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to