This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 0b3329cdd chore: Move writer-related logic to "writers" module (#3385)
0b3329cdd is described below
commit 0b3329cdd45e4d6894cba774c56716537081786d
Author: Emily Matheys <[email protected]>
AuthorDate: Wed Feb 4 19:43:10 2026 +0200
chore: Move writer-related logic to "writers" module (#3385)
---
native/core/src/execution/shuffle/mod.rs | 1 +
.../core/src/execution/shuffle/shuffle_writer.rs | 193 +++------------------
.../execution/shuffle/writers/buf_batch_writer.rs | 82 +++++++++
.../src/execution/shuffle/{ => writers}/mod.rs | 12 +-
.../execution/shuffle/writers/partition_writer.rs | 122 +++++++++++++
5 files changed, 233 insertions(+), 177 deletions(-)
diff --git a/native/core/src/execution/shuffle/mod.rs
b/native/core/src/execution/shuffle/mod.rs
index a72258322..a41d269d8 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -20,6 +20,7 @@ mod comet_partitioning;
mod metrics;
mod shuffle_writer;
pub mod spark_unsafe;
+mod writers;
pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
pub use comet_partitioning::CometPartitioning;
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 5c68940b9..669a6df97 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -18,10 +18,12 @@
//! Defines the External shuffle repartition plan.
use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
use crate::execution::shuffle::{CometPartitioning, CompressionCodec,
ShuffleBlockWriter};
use crate::execution::tracing::{with_trace, with_trace_async};
use arrow::compute::interleave_record_batch;
use async_trait::async_trait;
+use datafusion::common::exec_datafusion_err;
use datafusion::common::utils::proxy::VecAllocExt;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -31,7 +33,6 @@ use datafusion::{
error::{DataFusionError, Result},
execution::{
context::TaskContext,
- disk_manager::RefCountedTempFile,
memory_pool::{MemoryConsumer, MemoryReservation},
runtime_env::RuntimeEnv,
},
@@ -45,8 +46,6 @@ use datafusion::{
use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
-use std::borrow::Borrow;
-use std::io::{Cursor, Error, SeekFrom};
use std::{
any::Any,
fmt,
@@ -256,10 +255,15 @@ async fn external_shuffle(
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might
overwrite the
// current batch in the repartitioner.
- repartitioner.insert_batch(batch?).await?;
+ repartitioner
+ .insert_batch(batch?)
+ .await
+ .map_err(|err| exec_datafusion_err!("Error inserting batch:
{err}"))?;
}
- repartitioner.shuffle_write()?;
+ repartitioner
+ .shuffle_write()
+ .map_err(|err| exec_datafusion_err!("Error in shuffle write:
{err}"))?;
// shuffle writer always has empty output
Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema))) as
SendableRecordBatchStream)
@@ -803,11 +807,10 @@ impl ShufflePartitioner for
MultiPartitionShuffleRepartitioner {
// if we wrote a spill file for this partition then copy the
// contents into the shuffle file
- if let Some(spill_data) =
self.partition_writers[i].spill_file.as_ref() {
- let mut spill_file =
-
BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?);
+ if let Some(spill_path) = self.partition_writers[i].path() {
+ let mut spill_file =
BufReader::new(File::open(spill_path)?);
let mut write_timer = self.metrics.write_time.timer();
- std::io::copy(&mut spill_file, &mut
output_data).map_err(to_df_err)?;
+ std::io::copy(&mut spill_file, &mut output_data)?;
write_timer.stop();
}
@@ -828,7 +831,7 @@ impl ShufflePartitioner for
MultiPartitionShuffleRepartitioner {
write_timer.stop();
// add one extra offset at last to ease partition length
computation
- offsets[num_output_partitions] =
output_data.stream_position().map_err(to_df_err)?;
+ offsets[num_output_partitions] = output_data.stream_position()?;
let mut write_timer = self.metrics.write_time.timer();
let mut output_index =
@@ -836,9 +839,7 @@ impl ShufflePartitioner for
MultiPartitionShuffleRepartitioner {
DataFusionError::Execution(format!("shuffle write error:
{e:?}"))
})?);
for offset in offsets {
- output_index
- .write_all(&(offset as i64).to_le_bytes()[..])
- .map_err(to_df_err)?;
+ output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
}
output_index.flush()?;
write_timer.stop();
@@ -895,8 +896,7 @@ impl SinglePartitionShufflePartitioner {
.write(true)
.create(true)
.truncate(true)
- .open(output_data_path)
- .map_err(to_df_err)?;
+ .open(output_data_path)?;
let output_data_writer =
BufBatchWriter::new(shuffle_block_writer, output_data_file,
write_buffer_size);
@@ -1011,15 +1011,9 @@ impl ShufflePartitioner for
SinglePartitionShufflePartitioner {
.open(self.output_index_path.clone())
.map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
let mut index_buf_writer = BufWriter::new(index_file);
- let data_file_length = self
- .output_data_writer
- .writer
- .stream_position()
- .map_err(to_df_err)?;
+ let data_file_length =
self.output_data_writer.writer_stream_position()?;
for offset in [0, data_file_length] {
- index_buf_writer
- .write_all(&(offset as i64).to_le_bytes()[..])
- .map_err(to_df_err)?;
+ index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
}
index_buf_writer.flush()?;
@@ -1031,21 +1025,17 @@ impl ShufflePartitioner for
SinglePartitionShufflePartitioner {
}
}
-fn to_df_err(e: Error) -> DataFusionError {
- DataFusionError::Execution(format!("shuffle write error: {e:?}"))
-}
-
/// A helper struct to produce shuffled batches.
/// This struct takes ownership of the buffered batches and partition indices
from the
/// ShuffleRepartitioner, and provides an iterator over the batches in the
specified partitions.
-struct PartitionedBatchesProducer {
+pub(crate) struct PartitionedBatchesProducer {
buffered_batches: Vec<RecordBatch>,
partition_indices: Vec<Vec<(u32, u32)>>,
batch_size: usize,
}
impl PartitionedBatchesProducer {
- fn new(
+ pub(crate) fn new(
buffered_batches: Vec<RecordBatch>,
indices: Vec<Vec<(u32, u32)>>,
batch_size: usize,
@@ -1066,7 +1056,7 @@ impl PartitionedBatchesProducer {
}
}
-struct PartitionedBatchIterator<'a> {
+pub(crate) struct PartitionedBatchIterator<'a> {
record_batches: Vec<&'a RecordBatch>,
batch_size: usize,
indices: Vec<(usize, usize)>,
@@ -1125,141 +1115,6 @@ impl Iterator for PartitionedBatchIterator<'_> {
}
}
-struct PartitionWriter {
- /// Spill file for intermediate shuffle output for this partition. Each
spill event
- /// will append to this file and the contents will be copied to the
shuffle file at
- /// the end of processing.
- spill_file: Option<SpillFile>,
- /// Writer that performs encoding and compression
- shuffle_block_writer: ShuffleBlockWriter,
-}
-
-struct SpillFile {
- temp_file: RefCountedTempFile,
- file: File,
-}
-
-impl PartitionWriter {
- fn try_new(shuffle_block_writer: ShuffleBlockWriter) -> Result<Self> {
- Ok(Self {
- spill_file: None,
- shuffle_block_writer,
- })
- }
-
- fn spill(
- &mut self,
- iter: &mut PartitionedBatchIterator,
- runtime: &RuntimeEnv,
- metrics: &ShufflePartitionerMetrics,
- write_buffer_size: usize,
- ) -> Result<usize> {
- if let Some(batch) = iter.next() {
- self.ensure_spill_file_created(runtime)?;
-
- let total_bytes_written = {
- let mut buf_batch_writer = BufBatchWriter::new(
- &mut self.shuffle_block_writer,
- &mut self.spill_file.as_mut().unwrap().file,
- write_buffer_size,
- );
- let mut bytes_written =
- buf_batch_writer.write(&batch?, &metrics.encode_time,
&metrics.write_time)?;
- for batch in iter {
- let batch = batch?;
- bytes_written += buf_batch_writer.write(
- &batch,
- &metrics.encode_time,
- &metrics.write_time,
- )?;
- }
- buf_batch_writer.flush(&metrics.write_time)?;
- bytes_written
- };
-
- Ok(total_bytes_written)
- } else {
- Ok(0)
- }
- }
-
- fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) ->
Result<()> {
- if self.spill_file.is_none() {
- // Spill file is not yet created, create it
- let spill_file = runtime
- .disk_manager
- .create_tmp_file("shuffle writer spill")?;
- let spill_data = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(spill_file.path())
- .map_err(|e| {
- DataFusionError::Execution(format!("Error occurred while
spilling {e}"))
- })?;
- self.spill_file = Some(SpillFile {
- temp_file: spill_file,
- file: spill_data,
- });
- }
- Ok(())
- }
-}
-
-/// Write batches to writer while using a buffer to avoid frequent system
calls.
-/// The record batches were first written by ShuffleBlockWriter into an
internal buffer.
-/// Once the buffer exceeds the max size, the buffer will be flushed to the
writer.
-struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
- shuffle_block_writer: S,
- writer: W,
- buffer: Vec<u8>,
- buffer_max_size: usize,
-}
-
-impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
- fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self
{
- Self {
- shuffle_block_writer,
- writer,
- buffer: vec![],
- buffer_max_size,
- }
- }
-
- fn write(
- &mut self,
- batch: &RecordBatch,
- encode_time: &Time,
- write_time: &Time,
- ) -> Result<usize> {
- let mut cursor = Cursor::new(&mut self.buffer);
- cursor.seek(SeekFrom::End(0))?;
- let bytes_written =
- self.shuffle_block_writer
- .borrow()
- .write_batch(batch, &mut cursor, encode_time)?;
- let pos = cursor.position();
- if pos >= self.buffer_max_size as u64 {
- let mut write_timer = write_time.timer();
- self.writer.write_all(&self.buffer)?;
- write_timer.stop();
- self.buffer.clear();
- }
- Ok(bytes_written)
- }
-
- fn flush(&mut self, write_time: &Time) -> Result<()> {
- let mut write_timer = write_time.timer();
- if !self.buffer.is_empty() {
- self.writer.write_all(&self.buffer)?;
- }
- self.writer.flush()?;
- write_timer.stop();
- self.buffer.clear();
- Ok(())
- }
-}
-
fn pmod(hash: u32, n: usize) -> usize {
let hash = hash as i32;
let n = n as i32;
@@ -1371,14 +1226,14 @@ mod test {
assert_eq!(2, repartitioner.partition_writers.len());
- assert!(repartitioner.partition_writers[0].spill_file.is_none());
- assert!(repartitioner.partition_writers[1].spill_file.is_none());
+ assert!(!repartitioner.partition_writers[0].has_spill_file());
+ assert!(!repartitioner.partition_writers[1].has_spill_file());
repartitioner.spill().unwrap();
// after spill, there should be spill files
- assert!(repartitioner.partition_writers[0].spill_file.is_some());
- assert!(repartitioner.partition_writers[1].spill_file.is_some());
+ assert!(repartitioner.partition_writers[0].has_spill_file());
+ assert!(repartitioner.partition_writers[1].has_spill_file());
// insert another batch after spilling
repartitioner.insert_batch(batch.clone()).await.unwrap();
diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
new file mode 100644
index 000000000..8428151dd
--- /dev/null
+++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::ShuffleBlockWriter;
+use arrow::array::RecordBatch;
+use datafusion::physical_plan::metrics::Time;
+use std::borrow::Borrow;
+use std::io::{Cursor, Seek, SeekFrom, Write};
+
+/// Write batches to writer while using a buffer to avoid frequent system
calls.
+/// The record batches were first written by ShuffleBlockWriter into an
internal buffer.
+/// Once the buffer exceeds the max size, the buffer will be flushed to the
writer.
+pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
+ shuffle_block_writer: S,
+ writer: W,
+ buffer: Vec<u8>,
+ buffer_max_size: usize,
+}
+
+impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
+ pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size:
usize) -> Self {
+ Self {
+ shuffle_block_writer,
+ writer,
+ buffer: vec![],
+ buffer_max_size,
+ }
+ }
+
+ pub(crate) fn write(
+ &mut self,
+ batch: &RecordBatch,
+ encode_time: &Time,
+ write_time: &Time,
+ ) -> datafusion::common::Result<usize> {
+ let mut cursor = Cursor::new(&mut self.buffer);
+ cursor.seek(SeekFrom::End(0))?;
+ let bytes_written =
+ self.shuffle_block_writer
+ .borrow()
+ .write_batch(batch, &mut cursor, encode_time)?;
+ let pos = cursor.position();
+ if pos >= self.buffer_max_size as u64 {
+ let mut write_timer = write_time.timer();
+ self.writer.write_all(&self.buffer)?;
+ write_timer.stop();
+ self.buffer.clear();
+ }
+ Ok(bytes_written)
+ }
+
+ pub(crate) fn flush(&mut self, write_time: &Time) ->
datafusion::common::Result<()> {
+ let mut write_timer = write_time.timer();
+ if !self.buffer.is_empty() {
+ self.writer.write_all(&self.buffer)?;
+ }
+ self.writer.flush()?;
+ write_timer.stop();
+ self.buffer.clear();
+ Ok(())
+ }
+}
+
+impl<S: Borrow<ShuffleBlockWriter>, W: Write + Seek> BufBatchWriter<S, W> {
+ pub(crate) fn writer_stream_position(&mut self) ->
datafusion::common::Result<u64> {
+ self.writer.stream_position().map_err(Into::into)
+ }
+}
diff --git a/native/core/src/execution/shuffle/mod.rs
b/native/core/src/execution/shuffle/writers/mod.rs
similarity index 75%
copy from native/core/src/execution/shuffle/mod.rs
copy to native/core/src/execution/shuffle/writers/mod.rs
index a72258322..d41363b7f 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/writers/mod.rs
@@ -15,12 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod codec;
-mod comet_partitioning;
-mod metrics;
-mod shuffle_writer;
-pub mod spark_unsafe;
+mod buf_batch_writer;
+mod partition_writer;
-pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
-pub use comet_partitioning::CometPartitioning;
-pub use shuffle_writer::ShuffleWriterExec;
+pub(super) use buf_batch_writer::BufBatchWriter;
+pub(super) use partition_writer::PartitionWriter;
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs
b/native/core/src/execution/shuffle/writers/partition_writer.rs
new file mode 100644
index 000000000..8b2555e09
--- /dev/null
+++ b/native/core/src/execution/shuffle/writers/partition_writer.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator;
+use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter;
+use crate::execution::shuffle::ShuffleBlockWriter;
+use datafusion::common::DataFusionError;
+use datafusion::execution::disk_manager::RefCountedTempFile;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use std::fs::{File, OpenOptions};
+
+struct SpillFile {
+ temp_file: RefCountedTempFile,
+ file: File,
+}
+
+pub(crate) struct PartitionWriter {
+ /// Spill file for intermediate shuffle output for this partition. Each
spill event
+ /// will append to this file and the contents will be copied to the
shuffle file at
+ /// the end of processing.
+ spill_file: Option<SpillFile>,
+ /// Writer that performs encoding and compression
+ shuffle_block_writer: ShuffleBlockWriter,
+}
+
+impl PartitionWriter {
+ pub(crate) fn try_new(
+ shuffle_block_writer: ShuffleBlockWriter,
+ ) -> datafusion::common::Result<Self> {
+ Ok(Self {
+ spill_file: None,
+ shuffle_block_writer,
+ })
+ }
+
+ fn ensure_spill_file_created(
+ &mut self,
+ runtime: &RuntimeEnv,
+ ) -> datafusion::common::Result<()> {
+ if self.spill_file.is_none() {
+ // Spill file is not yet created, create it
+ let spill_file = runtime
+ .disk_manager
+ .create_tmp_file("shuffle writer spill")?;
+ let spill_data = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(spill_file.path())
+ .map_err(|e| {
+ DataFusionError::Execution(format!("Error occurred while
spilling {e}"))
+ })?;
+ self.spill_file = Some(SpillFile {
+ temp_file: spill_file,
+ file: spill_data,
+ });
+ }
+ Ok(())
+ }
+
+ pub(crate) fn spill(
+ &mut self,
+ iter: &mut PartitionedBatchIterator,
+ runtime: &RuntimeEnv,
+ metrics: &ShufflePartitionerMetrics,
+ write_buffer_size: usize,
+ ) -> datafusion::common::Result<usize> {
+ if let Some(batch) = iter.next() {
+ self.ensure_spill_file_created(runtime)?;
+
+ let total_bytes_written = {
+ let mut buf_batch_writer = BufBatchWriter::new(
+ &mut self.shuffle_block_writer,
+ &mut self.spill_file.as_mut().unwrap().file,
+ write_buffer_size,
+ );
+ let mut bytes_written =
+ buf_batch_writer.write(&batch?, &metrics.encode_time,
&metrics.write_time)?;
+ for batch in iter {
+ let batch = batch?;
+ bytes_written += buf_batch_writer.write(
+ &batch,
+ &metrics.encode_time,
+ &metrics.write_time,
+ )?;
+ }
+ buf_batch_writer.flush(&metrics.write_time)?;
+ bytes_written
+ };
+
+ Ok(total_bytes_written)
+ } else {
+ Ok(0)
+ }
+ }
+
+ pub(crate) fn path(&self) -> Option<&std::path::Path> {
+ self.spill_file
+ .as_ref()
+ .map(|spill_file| spill_file.temp_file.path())
+ }
+
+ #[cfg(test)]
+ pub(crate) fn has_spill_file(&self) -> bool {
+ self.spill_file.is_some()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]