This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b3329cdd chore: Move writer-related logic to "writers" module (#3385)
0b3329cdd is described below

commit 0b3329cdd45e4d6894cba774c56716537081786d
Author: Emily Matheys <[email protected]>
AuthorDate: Wed Feb 4 19:43:10 2026 +0200

    chore: Move writer-related logic to "writers" module (#3385)
---
 native/core/src/execution/shuffle/mod.rs           |   1 +
 .../core/src/execution/shuffle/shuffle_writer.rs   | 193 +++------------------
 .../execution/shuffle/writers/buf_batch_writer.rs  |  82 +++++++++
 .../src/execution/shuffle/{ => writers}/mod.rs     |  12 +-
 .../execution/shuffle/writers/partition_writer.rs  | 122 +++++++++++++
 5 files changed, 233 insertions(+), 177 deletions(-)

diff --git a/native/core/src/execution/shuffle/mod.rs 
b/native/core/src/execution/shuffle/mod.rs
index a72258322..a41d269d8 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -20,6 +20,7 @@ mod comet_partitioning;
 mod metrics;
 mod shuffle_writer;
 pub mod spark_unsafe;
+mod writers;
 
 pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
 pub use comet_partitioning::CometPartitioning;
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 5c68940b9..669a6df97 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -18,10 +18,12 @@
 //! Defines the External shuffle repartition plan.
 
 use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
 use crate::execution::shuffle::{CometPartitioning, CompressionCodec, 
ShuffleBlockWriter};
 use crate::execution::tracing::{with_trace, with_trace_async};
 use arrow::compute::interleave_record_batch;
 use async_trait::async_trait;
+use datafusion::common::exec_datafusion_err;
 use datafusion::common::utils::proxy::VecAllocExt;
 use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -31,7 +33,6 @@ use datafusion::{
     error::{DataFusionError, Result},
     execution::{
         context::TaskContext,
-        disk_manager::RefCountedTempFile,
         memory_pool::{MemoryConsumer, MemoryReservation},
         runtime_env::RuntimeEnv,
     },
@@ -45,8 +46,6 @@ use datafusion::{
 use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
 use futures::{StreamExt, TryFutureExt, TryStreamExt};
 use itertools::Itertools;
-use std::borrow::Borrow;
-use std::io::{Cursor, Error, SeekFrom};
 use std::{
     any::Any,
     fmt,
@@ -256,10 +255,15 @@ async fn external_shuffle(
             // into the corresponding partition buffer.
             // Otherwise, pull the next batch from the input stream might 
overwrite the
             // current batch in the repartitioner.
-            repartitioner.insert_batch(batch?).await?;
+            repartitioner
+                .insert_batch(batch?)
+                .await
+                .map_err(|err| exec_datafusion_err!("Error inserting batch: 
{err}"))?;
         }
 
-        repartitioner.shuffle_write()?;
+        repartitioner
+            .shuffle_write()
+            .map_err(|err| exec_datafusion_err!("Error in shuffle write: 
{err}"))?;
 
         // shuffle writer always has empty output
         Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema))) as 
SendableRecordBatchStream)
@@ -803,11 +807,10 @@ impl ShufflePartitioner for 
MultiPartitionShuffleRepartitioner {
 
                 // if we wrote a spill file for this partition then copy the
                 // contents into the shuffle file
-                if let Some(spill_data) = 
self.partition_writers[i].spill_file.as_ref() {
-                    let mut spill_file =
-                        
BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?);
+                if let Some(spill_path) = self.partition_writers[i].path() {
+                    let mut spill_file = 
BufReader::new(File::open(spill_path)?);
                     let mut write_timer = self.metrics.write_time.timer();
-                    std::io::copy(&mut spill_file, &mut 
output_data).map_err(to_df_err)?;
+                    std::io::copy(&mut spill_file, &mut output_data)?;
                     write_timer.stop();
                 }
 
@@ -828,7 +831,7 @@ impl ShufflePartitioner for 
MultiPartitionShuffleRepartitioner {
             write_timer.stop();
 
             // add one extra offset at last to ease partition length 
computation
-            offsets[num_output_partitions] = 
output_data.stream_position().map_err(to_df_err)?;
+            offsets[num_output_partitions] = output_data.stream_position()?;
 
             let mut write_timer = self.metrics.write_time.timer();
             let mut output_index =
@@ -836,9 +839,7 @@ impl ShufflePartitioner for 
MultiPartitionShuffleRepartitioner {
                     DataFusionError::Execution(format!("shuffle write error: 
{e:?}"))
                 })?);
             for offset in offsets {
-                output_index
-                    .write_all(&(offset as i64).to_le_bytes()[..])
-                    .map_err(to_df_err)?;
+                output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
             }
             output_index.flush()?;
             write_timer.stop();
@@ -895,8 +896,7 @@ impl SinglePartitionShufflePartitioner {
             .write(true)
             .create(true)
             .truncate(true)
-            .open(output_data_path)
-            .map_err(to_df_err)?;
+            .open(output_data_path)?;
 
         let output_data_writer =
             BufBatchWriter::new(shuffle_block_writer, output_data_file, 
write_buffer_size);
@@ -1011,15 +1011,9 @@ impl ShufflePartitioner for 
SinglePartitionShufflePartitioner {
             .open(self.output_index_path.clone())
             .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
         let mut index_buf_writer = BufWriter::new(index_file);
-        let data_file_length = self
-            .output_data_writer
-            .writer
-            .stream_position()
-            .map_err(to_df_err)?;
+        let data_file_length = 
self.output_data_writer.writer_stream_position()?;
         for offset in [0, data_file_length] {
-            index_buf_writer
-                .write_all(&(offset as i64).to_le_bytes()[..])
-                .map_err(to_df_err)?;
+            index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
         }
         index_buf_writer.flush()?;
 
@@ -1031,21 +1025,17 @@ impl ShufflePartitioner for 
SinglePartitionShufflePartitioner {
     }
 }
 
-fn to_df_err(e: Error) -> DataFusionError {
-    DataFusionError::Execution(format!("shuffle write error: {e:?}"))
-}
-
 /// A helper struct to produce shuffled batches.
 /// This struct takes ownership of the buffered batches and partition indices 
from the
 /// ShuffleRepartitioner, and provides an iterator over the batches in the 
specified partitions.
-struct PartitionedBatchesProducer {
+pub(crate) struct PartitionedBatchesProducer {
     buffered_batches: Vec<RecordBatch>,
     partition_indices: Vec<Vec<(u32, u32)>>,
     batch_size: usize,
 }
 
 impl PartitionedBatchesProducer {
-    fn new(
+    pub(crate) fn new(
         buffered_batches: Vec<RecordBatch>,
         indices: Vec<Vec<(u32, u32)>>,
         batch_size: usize,
@@ -1066,7 +1056,7 @@ impl PartitionedBatchesProducer {
     }
 }
 
-struct PartitionedBatchIterator<'a> {
+pub(crate) struct PartitionedBatchIterator<'a> {
     record_batches: Vec<&'a RecordBatch>,
     batch_size: usize,
     indices: Vec<(usize, usize)>,
@@ -1125,141 +1115,6 @@ impl Iterator for PartitionedBatchIterator<'_> {
     }
 }
 
-struct PartitionWriter {
-    /// Spill file for intermediate shuffle output for this partition. Each 
spill event
-    /// will append to this file and the contents will be copied to the 
shuffle file at
-    /// the end of processing.
-    spill_file: Option<SpillFile>,
-    /// Writer that performs encoding and compression
-    shuffle_block_writer: ShuffleBlockWriter,
-}
-
-struct SpillFile {
-    temp_file: RefCountedTempFile,
-    file: File,
-}
-
-impl PartitionWriter {
-    fn try_new(shuffle_block_writer: ShuffleBlockWriter) -> Result<Self> {
-        Ok(Self {
-            spill_file: None,
-            shuffle_block_writer,
-        })
-    }
-
-    fn spill(
-        &mut self,
-        iter: &mut PartitionedBatchIterator,
-        runtime: &RuntimeEnv,
-        metrics: &ShufflePartitionerMetrics,
-        write_buffer_size: usize,
-    ) -> Result<usize> {
-        if let Some(batch) = iter.next() {
-            self.ensure_spill_file_created(runtime)?;
-
-            let total_bytes_written = {
-                let mut buf_batch_writer = BufBatchWriter::new(
-                    &mut self.shuffle_block_writer,
-                    &mut self.spill_file.as_mut().unwrap().file,
-                    write_buffer_size,
-                );
-                let mut bytes_written =
-                    buf_batch_writer.write(&batch?, &metrics.encode_time, 
&metrics.write_time)?;
-                for batch in iter {
-                    let batch = batch?;
-                    bytes_written += buf_batch_writer.write(
-                        &batch,
-                        &metrics.encode_time,
-                        &metrics.write_time,
-                    )?;
-                }
-                buf_batch_writer.flush(&metrics.write_time)?;
-                bytes_written
-            };
-
-            Ok(total_bytes_written)
-        } else {
-            Ok(0)
-        }
-    }
-
-    fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) -> 
Result<()> {
-        if self.spill_file.is_none() {
-            // Spill file is not yet created, create it
-            let spill_file = runtime
-                .disk_manager
-                .create_tmp_file("shuffle writer spill")?;
-            let spill_data = OpenOptions::new()
-                .write(true)
-                .create(true)
-                .truncate(true)
-                .open(spill_file.path())
-                .map_err(|e| {
-                    DataFusionError::Execution(format!("Error occurred while 
spilling {e}"))
-                })?;
-            self.spill_file = Some(SpillFile {
-                temp_file: spill_file,
-                file: spill_data,
-            });
-        }
-        Ok(())
-    }
-}
-
-/// Write batches to writer while using a buffer to avoid frequent system 
calls.
-/// The record batches were first written by ShuffleBlockWriter into an 
internal buffer.
-/// Once the buffer exceeds the max size, the buffer will be flushed to the 
writer.
-struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
-    shuffle_block_writer: S,
-    writer: W,
-    buffer: Vec<u8>,
-    buffer_max_size: usize,
-}
-
-impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
-    fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self 
{
-        Self {
-            shuffle_block_writer,
-            writer,
-            buffer: vec![],
-            buffer_max_size,
-        }
-    }
-
-    fn write(
-        &mut self,
-        batch: &RecordBatch,
-        encode_time: &Time,
-        write_time: &Time,
-    ) -> Result<usize> {
-        let mut cursor = Cursor::new(&mut self.buffer);
-        cursor.seek(SeekFrom::End(0))?;
-        let bytes_written =
-            self.shuffle_block_writer
-                .borrow()
-                .write_batch(batch, &mut cursor, encode_time)?;
-        let pos = cursor.position();
-        if pos >= self.buffer_max_size as u64 {
-            let mut write_timer = write_time.timer();
-            self.writer.write_all(&self.buffer)?;
-            write_timer.stop();
-            self.buffer.clear();
-        }
-        Ok(bytes_written)
-    }
-
-    fn flush(&mut self, write_time: &Time) -> Result<()> {
-        let mut write_timer = write_time.timer();
-        if !self.buffer.is_empty() {
-            self.writer.write_all(&self.buffer)?;
-        }
-        self.writer.flush()?;
-        write_timer.stop();
-        self.buffer.clear();
-        Ok(())
-    }
-}
-
 fn pmod(hash: u32, n: usize) -> usize {
     let hash = hash as i32;
     let n = n as i32;
@@ -1371,14 +1226,14 @@ mod test {
 
         assert_eq!(2, repartitioner.partition_writers.len());
 
-        assert!(repartitioner.partition_writers[0].spill_file.is_none());
-        assert!(repartitioner.partition_writers[1].spill_file.is_none());
+        assert!(!repartitioner.partition_writers[0].has_spill_file());
+        assert!(!repartitioner.partition_writers[1].has_spill_file());
 
         repartitioner.spill().unwrap();
 
         // after spill, there should be spill files
-        assert!(repartitioner.partition_writers[0].spill_file.is_some());
-        assert!(repartitioner.partition_writers[1].spill_file.is_some());
+        assert!(repartitioner.partition_writers[0].has_spill_file());
+        assert!(repartitioner.partition_writers[1].has_spill_file());
 
         // insert another batch after spilling
         repartitioner.insert_batch(batch.clone()).await.unwrap();
diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs 
b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
new file mode 100644
index 000000000..8428151dd
--- /dev/null
+++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::ShuffleBlockWriter;
+use arrow::array::RecordBatch;
+use datafusion::physical_plan::metrics::Time;
+use std::borrow::Borrow;
+use std::io::{Cursor, Seek, SeekFrom, Write};
+
+/// Write batches to writer while using a buffer to avoid frequent system 
calls.
+/// The record batches were first written by ShuffleBlockWriter into an 
internal buffer.
+/// Once the buffer exceeds the max size, the buffer will be flushed to the 
writer.
+pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
+    shuffle_block_writer: S,
+    writer: W,
+    buffer: Vec<u8>,
+    buffer_max_size: usize,
+}
+
+impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
+    pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size: 
usize) -> Self {
+        Self {
+            shuffle_block_writer,
+            writer,
+            buffer: vec![],
+            buffer_max_size,
+        }
+    }
+
+    pub(crate) fn write(
+        &mut self,
+        batch: &RecordBatch,
+        encode_time: &Time,
+        write_time: &Time,
+    ) -> datafusion::common::Result<usize> {
+        let mut cursor = Cursor::new(&mut self.buffer);
+        cursor.seek(SeekFrom::End(0))?;
+        let bytes_written =
+            self.shuffle_block_writer
+                .borrow()
+                .write_batch(batch, &mut cursor, encode_time)?;
+        let pos = cursor.position();
+        if pos >= self.buffer_max_size as u64 {
+            let mut write_timer = write_time.timer();
+            self.writer.write_all(&self.buffer)?;
+            write_timer.stop();
+            self.buffer.clear();
+        }
+        Ok(bytes_written)
+    }
+
+    pub(crate) fn flush(&mut self, write_time: &Time) -> 
datafusion::common::Result<()> {
+        let mut write_timer = write_time.timer();
+        if !self.buffer.is_empty() {
+            self.writer.write_all(&self.buffer)?;
+        }
+        self.writer.flush()?;
+        write_timer.stop();
+        self.buffer.clear();
+        Ok(())
+    }
+}
+
+impl<S: Borrow<ShuffleBlockWriter>, W: Write + Seek> BufBatchWriter<S, W> {
+    pub(crate) fn writer_stream_position(&mut self) -> 
datafusion::common::Result<u64> {
+        self.writer.stream_position().map_err(Into::into)
+    }
+}
diff --git a/native/core/src/execution/shuffle/mod.rs 
b/native/core/src/execution/shuffle/writers/mod.rs
similarity index 75%
copy from native/core/src/execution/shuffle/mod.rs
copy to native/core/src/execution/shuffle/writers/mod.rs
index a72258322..d41363b7f 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/writers/mod.rs
@@ -15,12 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod codec;
-mod comet_partitioning;
-mod metrics;
-mod shuffle_writer;
-pub mod spark_unsafe;
+mod buf_batch_writer;
+mod partition_writer;
 
-pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
-pub use comet_partitioning::CometPartitioning;
-pub use shuffle_writer::ShuffleWriterExec;
+pub(super) use buf_batch_writer::BufBatchWriter;
+pub(super) use partition_writer::PartitionWriter;
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs 
b/native/core/src/execution/shuffle/writers/partition_writer.rs
new file mode 100644
index 000000000..8b2555e09
--- /dev/null
+++ b/native/core/src/execution/shuffle/writers/partition_writer.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator;
+use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter;
+use crate::execution::shuffle::ShuffleBlockWriter;
+use datafusion::common::DataFusionError;
+use datafusion::execution::disk_manager::RefCountedTempFile;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use std::fs::{File, OpenOptions};
+
+struct SpillFile {
+    temp_file: RefCountedTempFile,
+    file: File,
+}
+
+pub(crate) struct PartitionWriter {
+    /// Spill file for intermediate shuffle output for this partition. Each 
spill event
+    /// will append to this file and the contents will be copied to the 
shuffle file at
+    /// the end of processing.
+    spill_file: Option<SpillFile>,
+    /// Writer that performs encoding and compression
+    shuffle_block_writer: ShuffleBlockWriter,
+}
+
+impl PartitionWriter {
+    pub(crate) fn try_new(
+        shuffle_block_writer: ShuffleBlockWriter,
+    ) -> datafusion::common::Result<Self> {
+        Ok(Self {
+            spill_file: None,
+            shuffle_block_writer,
+        })
+    }
+
+    fn ensure_spill_file_created(
+        &mut self,
+        runtime: &RuntimeEnv,
+    ) -> datafusion::common::Result<()> {
+        if self.spill_file.is_none() {
+            // Spill file is not yet created, create it
+            let spill_file = runtime
+                .disk_manager
+                .create_tmp_file("shuffle writer spill")?;
+            let spill_data = OpenOptions::new()
+                .write(true)
+                .create(true)
+                .truncate(true)
+                .open(spill_file.path())
+                .map_err(|e| {
+                    DataFusionError::Execution(format!("Error occurred while 
spilling {e}"))
+                })?;
+            self.spill_file = Some(SpillFile {
+                temp_file: spill_file,
+                file: spill_data,
+            });
+        }
+        Ok(())
+    }
+
+    pub(crate) fn spill(
+        &mut self,
+        iter: &mut PartitionedBatchIterator,
+        runtime: &RuntimeEnv,
+        metrics: &ShufflePartitionerMetrics,
+        write_buffer_size: usize,
+    ) -> datafusion::common::Result<usize> {
+        if let Some(batch) = iter.next() {
+            self.ensure_spill_file_created(runtime)?;
+
+            let total_bytes_written = {
+                let mut buf_batch_writer = BufBatchWriter::new(
+                    &mut self.shuffle_block_writer,
+                    &mut self.spill_file.as_mut().unwrap().file,
+                    write_buffer_size,
+                );
+                let mut bytes_written =
+                    buf_batch_writer.write(&batch?, &metrics.encode_time, 
&metrics.write_time)?;
+                for batch in iter {
+                    let batch = batch?;
+                    bytes_written += buf_batch_writer.write(
+                        &batch,
+                        &metrics.encode_time,
+                        &metrics.write_time,
+                    )?;
+                }
+                buf_batch_writer.flush(&metrics.write_time)?;
+                bytes_written
+            };
+
+            Ok(total_bytes_written)
+        } else {
+            Ok(0)
+        }
+    }
+
+    pub(crate) fn path(&self) -> Option<&std::path::Path> {
+        self.spill_file
+            .as_ref()
+            .map(|spill_file| spill_file.temp_file.path())
+    }
+
+    #[cfg(test)]
+    pub(crate) fn has_spill_file(&self) -> bool {
+        self.spill_file.is_some()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to