Kontinuation commented on code in PR #1511: URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2022553633
########## native/core/src/execution/shuffle/shuffle_writer.rs: ########## @@ -667,175 +740,322 @@ impl Debug for ShuffleRepartitioner { } } -/// The status of appending rows to a partition buffer. -#[derive(Debug)] -enum AppendRowStatus { - /// Rows were appended - Appended, - /// Not all rows were appended due to lack of available memory - StartIndex(usize), -} - -struct PartitionBuffer { - /// The schema of batches to be partitioned. - schema: SchemaRef, - /// The "frozen" Arrow IPC bytes of active data. They are frozen when `flush` is called. - frozen: Vec<u8>, - /// Array builders for appending rows into buffering batches. - active: Vec<Box<dyn ArrayBuilder>>, - /// The estimation of memory size of active builders in bytes when they are filled. - active_slots_mem_size: usize, - /// Number of rows in active builders. - num_active_rows: usize, - /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, - /// the active array builders will be frozen and appended to frozen buffer `frozen`. +/// A partitioner that writes all shuffle data to a single file and a single index file +struct SinglePartitionShufflePartitioner { + // output_data_file: File, + output_data_writer: BufBatchWriter<ShuffleBlockWriter, File>, + output_index_path: String, + /// Batches that are smaller than the batch size and to be concatenated + buffered_batches: Vec<RecordBatch>, + /// Number of rows in the concatenating batches + num_buffered_rows: usize, + /// Metrics for the repartitioner + metrics: ShuffleRepartitionerMetrics, + /// The configured batch size batch_size: usize, - /// Memory reservation for this partition buffer. - reservation: MemoryReservation, - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option<SpillFile>, - /// Writer that performs encoding and compression - shuffle_block_writer: ShuffleBlockWriter, -} - -struct SpillFile { - temp_file: RefCountedTempFile, - file: File, } -impl PartitionBuffer { +impl SinglePartitionShufflePartitioner { fn try_new( + output_data_path: String, + output_index_path: String, schema: SchemaRef, + metrics: ShuffleRepartitionerMetrics, batch_size: usize, - reservation: MemoryReservation, codec: CompressionCodec, enable_fast_encoding: bool, ) -> Result<Self> { let shuffle_block_writer = - ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec)?; - let active_slots_mem_size = schema - .fields() - .iter() - .map(|field| slot_size(batch_size, field.data_type())) - .sum::<usize>(); + ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec.clone())?; + + let output_data_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_path) + .map_err(to_df_err)?; + + let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file); + Ok(Self { - schema, - frozen: vec![], - active: vec![], - active_slots_mem_size, - num_active_rows: 0, + output_data_writer, + output_index_path, + buffered_batches: vec![], + num_buffered_rows: 0, + metrics, batch_size, - reservation, - spill_file: None, - shuffle_block_writer, }) } - /// Initializes active builders if necessary. - /// Returns error if memory reservation fails. - fn allocate_active_builders(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { - if self.active.is_empty() { - let mut mempool_timer = metrics.mempool_time.timer(); - self.reservation.try_grow(self.active_slots_mem_size)?; - mempool_timer.stop(); - - let mut repart_timer = metrics.repart_time.timer(); - self.active = new_array_builders(&self.schema, self.batch_size); - repart_timer.stop(); - } - Ok(()) + /// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated + /// and written to the output data file when the number of rows in the buffer reaches the batch size. + fn add_buffered_batch(&mut self, batch: RecordBatch) { + self.num_buffered_rows += batch.num_rows(); + self.buffered_batches.push(batch); } - /// Appends rows of specified indices from columns into active array builders. - fn append_rows( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - start_index: usize, - metrics: &ShuffleRepartitionerMetrics, - ) -> Result<AppendRowStatus> { - let mut start = start_index; - - // loop until all indices are processed - while start < indices.len() { - let end = (start + self.batch_size).min(indices.len()); - - // allocate builders - if self.allocate_active_builders(metrics).is_err() { - // could not allocate memory for builders, so abort - // and return the current index - return Ok(AppendRowStatus::StartIndex(start)); + /// Consumes buffered batches and return a concatenated batch if successful + fn concat_buffered_batches(&mut self) -> Result<Option<RecordBatch>> { + if self.buffered_batches.is_empty() { + Ok(None) + } else if self.buffered_batches.len() == 1 { + let batch = self.buffered_batches.remove(0); + self.num_buffered_rows = 0; + Ok(Some(batch)) + } else { + let schema = &self.buffered_batches[0].schema(); + match arrow::compute::concat_batches(schema, self.buffered_batches.iter()) { + Ok(concatenated) => { + self.buffered_batches.clear(); + self.num_buffered_rows = 0; + Ok(Some(concatenated)) + } + Err(e) => Err(DataFusionError::ArrowError( + e, + Some(DataFusionError::get_back_trace()), + )), } + } + } +} - let mut repart_timer = metrics.repart_time.timer(); - self.active - .iter_mut() - .zip(columns) - .for_each(|(builder, column)| { - append_columns(builder, column, &indices[start..end], column.data_type()); - }); - self.num_active_rows += end - start; - repart_timer.stop(); - start = end; +#[async_trait::async_trait] +impl ShufflePartitioner for SinglePartitionShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); + let num_rows = batch.num_rows(); + + if num_rows > 0 { + self.metrics.data_size.add(batch.get_array_memory_size()); + self.metrics.baseline.record_output(num_rows); + + if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { + let concatenated_batch = self.concat_buffered_batches()?; + + let write_start_time = Instant::now(); - if self.num_active_rows >= self.batch_size { - self.flush(metrics)?; + // Write the concatenated buffered batch + if let Some(batch) = concatenated_batch { + self.output_data_writer + .write(&batch, &self.metrics.encode_time)?; + } + + if num_rows >= self.batch_size { + // Write the new batch + self.output_data_writer + .write(&batch, &self.metrics.encode_time)?; + } else { + // Add the new batch to the buffer + self.add_buffered_batch(batch); + } + + self.metrics + .write_time + .add_duration(write_start_time.elapsed()); Review Comment: Added write_time parameter to the write method of `BufBatchWriter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org