andygrove commented on code in PR #1511:
URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2022924240


##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -852,16 +1079,64 @@ impl PartitionBuffer {
                 file: spill_data,
             });
         }
-        self.spill_file
-            .as_mut()
-            .unwrap()
-            .file
-            .write_all(&output_batches)?;
+        Ok(())
+    }
+}
+
+/// Write batches to writer while using a buffer to avoid frequent system 
calls.
+/// The record batches were first written by ShuffleBlockWriter into an 
internal buffer.
+/// Once the buffer exceeds the max size, the buffer will be flushed to the 
writer.
+struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
+    shuffle_block_writer: S,
+    writer: W,
+    buffer: Vec<u8>,
+    buffer_max_size: usize,
+}
+
+impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
+    fn new(shuffle_block_writer: S, writer: W) -> Self {
+        // 1MB should be good enough to avoid frequent system calls,
+        // and also won't cause too much memory usage
+        let buffer_max_size = 1024 * 1024;
+        Self {
+            shuffle_block_writer,
+            writer,
+            buffer: vec![],
+            buffer_max_size,
+        }
+    }
+
+    fn write(
+        &mut self,
+        batch: &RecordBatch,
+        encode_time: &Time,
+        write_time: &Time,
+    ) -> Result<usize> {
+        let mut cursor = Cursor::new(&mut self.buffer);
+        cursor.seek(SeekFrom::End(0))?;
+        let mut write_timer = write_time.timer();
+        let bytes_written =
+            self.shuffle_block_writer
+                .borrow()
+                .write_batch(batch, &mut cursor, encode_time)?;
+        let pos = cursor.position();
+        if pos >= self.buffer_max_size as u64 {
+            self.writer.write_all(&self.buffer)?;
+            self.buffer.clear();
+        }
         write_timer.stop();

Review Comment:
   The `write_time` should only measure the time for writing bytes to disk, not 
the encoding and buffering. I think that the following code would be correct 
but I haven't tested this yet. I will test with this change today.
   
   ```suggestion
           let bytes_written =
               self.shuffle_block_writer
                   .borrow()
                   .write_batch(batch, &mut cursor, encode_time)?;
           let pos = cursor.position();
           if pos >= self.buffer_max_size as u64 {
               let mut write_timer = write_time.timer();
               self.writer.write_all(&self.buffer)?;
               write_timer.stop();
               self.buffer.clear();
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to