Kontinuation commented on code in PR #1511:
URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2017871489
##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -498,81 +502,63 @@ impl ShuffleRepartitioner {
Ok(())
}
- /// Writes buffered shuffled record batches into Arrow IPC bytes.
- async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
- let mut elapsed_compute =
self.metrics.baseline.elapsed_compute().timer();
- let buffered_partitions = &mut self.buffered_partitions;
- let num_output_partitions = buffered_partitions.len();
- let mut output_batches: Vec<Vec<u8>> = vec![vec![];
num_output_partitions];
- let mut offsets = vec![0; num_output_partitions + 1];
- for i in 0..num_output_partitions {
- buffered_partitions[i].flush(&self.metrics)?;
- output_batches[i] = std::mem::take(&mut
buffered_partitions[i].frozen);
- }
-
- let data_file = self.output_data_file.clone();
- let index_file = self.output_index_file.clone();
-
- let mut write_time = self.metrics.write_time.timer();
-
- let output_data = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(data_file)
- .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {:?}", e)))?;
-
- let mut output_data = BufWriter::new(output_data);
-
- for i in 0..num_output_partitions {
- offsets[i] = output_data.stream_position()?;
- output_data.write_all(&output_batches[i])?;
- output_batches[i].clear();
+ async fn buffer_partitioned_batch_may_spill(
+ &mut self,
+ input: RecordBatch,
+ shuffled_partition_ids: &[u32],
+ partition_starts: &[u32],
+ ) -> Result<()> {
+ let mut mem_growth: usize = input.get_array_memory_size();
+ let buffered_partition_idx = self.buffered_batches.len() as u32;
+ self.buffered_batches.push(input);
- // if we wrote a spill file for this partition then copy the
- // contents into the shuffle file
- if let Some(spill_data) =
self.buffered_partitions[i].spill_file.as_ref() {
- let mut spill_file = BufReader::new(
-
File::open(spill_data.temp_file.path()).map_err(Self::to_df_err)?,
- );
- std::io::copy(&mut spill_file, &mut
output_data).map_err(Self::to_df_err)?;
+ for (partition_id, (&start, &end)) in partition_starts
Review Comment:
I have added comments to describe what we are doing here. I have also found
that `shuffle_partition_ids` is not a good name, I have renamed it to
`partition_row_indices` and the code become easier to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]