This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e24695d2 chore: Clean up and split shuffle module (#3395)
2e24695d2 is described below

commit 2e24695d2a7853b43cc85357f6e67061006fe3f2
Author: Emily Matheys <[email protected]>
AuthorDate: Thu Feb 5 15:23:13 2026 +0200

    chore: Clean up and split shuffle module (#3395)
---
 .../src/execution/shuffle/comet_partitioning.rs    |  23 +
 native/core/src/execution/shuffle/mod.rs           |   1 +
 .../execution/shuffle/{ => partitioners}/mod.rs    |  26 +-
 .../shuffle/partitioners/multi_partition.rs        | 635 ++++++++++++++
 .../partitioners/partitioned_batch_iterator.rs     | 110 +++
 .../shuffle/partitioners/single_partition.rs       | 187 +++++
 .../core/src/execution/shuffle/shuffle_writer.rs   | 913 +--------------------
 .../execution/shuffle/writers/partition_writer.rs  |   2 +-
 8 files changed, 1000 insertions(+), 897 deletions(-)

diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs 
b/native/core/src/execution/shuffle/comet_partitioning.rs
index b7ad15879..b8d68cd21 100644
--- a/native/core/src/execution/shuffle/comet_partitioning.rs
+++ b/native/core/src/execution/shuffle/comet_partitioning.rs
@@ -46,3 +46,26 @@ impl CometPartitioning {
         }
     }
 }
+
+pub(super) fn pmod(hash: u32, n: usize) -> usize {
+    let hash = hash as i32;
+    let n = n as i32;
+    let r = hash % n;
+    let result = if r < 0 { (r + n) % n } else { r };
+    result as usize
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_pmod() {
+        let i: Vec<u32> = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 
0xcd1e64fb];
+        let result = i.into_iter().map(|i| pmod(i, 
200)).collect::<Vec<usize>>();
+
+        // expected partition from Spark with n=200
+        let expected = vec![69, 5, 193, 171, 115];
+        assert_eq!(result, expected);
+    }
+}
diff --git a/native/core/src/execution/shuffle/mod.rs 
b/native/core/src/execution/shuffle/mod.rs
index a41d269d8..6018cff50 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -18,6 +18,7 @@
 pub(crate) mod codec;
 mod comet_partitioning;
 mod metrics;
+mod partitioners;
 mod shuffle_writer;
 pub mod spark_unsafe;
 mod writers;
diff --git a/native/core/src/execution/shuffle/mod.rs 
b/native/core/src/execution/shuffle/partitioners/mod.rs
similarity index 55%
copy from native/core/src/execution/shuffle/mod.rs
copy to native/core/src/execution/shuffle/partitioners/mod.rs
index a41d269d8..b9058f66f 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/partitioners/mod.rs
@@ -15,13 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod codec;
-mod comet_partitioning;
-mod metrics;
-mod shuffle_writer;
-pub mod spark_unsafe;
-mod writers;
+mod multi_partition;
+mod partitioned_batch_iterator;
+mod single_partition;
 
-pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
-pub use comet_partitioning::CometPartitioning;
-pub use shuffle_writer::ShuffleWriterExec;
+use arrow::record_batch::RecordBatch;
+use datafusion::common::Result;
+
+pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
+pub(super) use partitioned_batch_iterator::PartitionedBatchIterator;
+pub(super) use single_partition::SinglePartitionShufflePartitioner;
+
+#[async_trait::async_trait]
+pub(super) trait ShufflePartitioner: Send + Sync {
+    /// Insert a batch into the partitioner
+    async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
+    /// Write shuffle data and shuffle index file to disk
+    fn shuffle_write(&mut self) -> Result<()>;
+}
diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs 
b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
new file mode 100644
index 000000000..35f754695
--- /dev/null
+++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{
+    PartitionedBatchIterator, PartitionedBatchesProducer,
+};
+use crate::execution::shuffle::partitioners::ShufflePartitioner;
+use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
+use crate::execution::shuffle::{
+    comet_partitioning, CometPartitioning, CompressionCodec, 
ShuffleBlockWriter,
+};
+use crate::execution::tracing::{with_trace, with_trace_async};
+use arrow::array::{ArrayRef, RecordBatch};
+use arrow::datatypes::SchemaRef;
+use datafusion::common::utils::proxy::VecAllocExt;
+use datafusion::common::DataFusionError;
+use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::physical_plan::metrics::Time;
+use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes;
+use itertools::Itertools;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::{File, OpenOptions};
+use std::io::{BufReader, BufWriter, Seek, Write};
+use std::sync::Arc;
+use tokio::time::Instant;
+
+#[derive(Default)]
+struct ScratchSpace {
+    /// Hashes for each row in the current batch.
+    hashes_buf: Vec<u32>,
+    /// Partition ids for each row in the current batch.
+    partition_ids: Vec<u32>,
+    /// The row indices of the rows in each partition. This array is 
conceptually divided into
+    /// partitions, where each partition contains the row indices of the rows 
in that partition.
+    /// The length of this array is the same as the number of rows in the 
batch.
+    partition_row_indices: Vec<u32>,
+    /// The start indices of partitions in partition_row_indices. 
partition_starts[K] and
+    /// partition_starts[K + 1] are the start and end indices of partition K 
in partition_row_indices.
+    /// The length of this array is 1 + the number of partitions.
+    partition_starts: Vec<u32>,
+}
+
+impl ScratchSpace {
+    fn map_partition_ids_to_starts_and_indices(
+        &mut self,
+        num_output_partitions: usize,
+        num_rows: usize,
+    ) {
+        let partition_ids = &mut self.partition_ids[..num_rows];
+
+        // count each partition size, while leaving the last extra element as 0
+        let partition_counters = &mut self.partition_starts;
+        partition_counters.resize(num_output_partitions + 1, 0);
+        partition_counters.fill(0);
+        partition_ids
+            .iter()
+            .for_each(|partition_id| partition_counters[*partition_id as 
usize] += 1);
+
+        // accumulate partition counters into partition ends
+        // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7]
+        let partition_ends = partition_counters;
+        let mut accum = 0;
+        partition_ends.iter_mut().for_each(|v| {
+            *v += accum;
+            accum = *v;
+        });
+
+        // calculate partition row indices and partition starts
+        // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the 
following partition_row_indices
+        // and partition_starts arrays:
+        //
+        //  partition_row_indices: [6, 1, 2, 3, 4, 5, 0]
+        //  partition_starts: [0, 1, 4, 6, 7]
+        //
+        // partition_starts conceptually splits partition_row_indices into 
smaller slices.
+        // Each slice 
partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the
+        // row indices of the input batch that are partitioned into partition 
K. For example,
+        // first partition 0 has one row index [6], partition 1 has row 
indices [1, 2, 3], etc.
+        let partition_row_indices = &mut self.partition_row_indices;
+        partition_row_indices.resize(num_rows, 0);
+        for (index, partition_id) in partition_ids.iter().enumerate().rev() {
+            partition_ends[*partition_id as usize] -= 1;
+            let end = partition_ends[*partition_id as usize];
+            partition_row_indices[end as usize] = index as u32;
+        }
+
+        // after calculating, partition ends become partition starts
+    }
+}
+
+/// A partitioner that uses a hash function to partition data into multiple 
partitions
+pub(crate) struct MultiPartitionShuffleRepartitioner {
+    output_data_file: String,
+    output_index_file: String,
+    buffered_batches: Vec<RecordBatch>,
+    partition_indices: Vec<Vec<(u32, u32)>>,
+    partition_writers: Vec<PartitionWriter>,
+    shuffle_block_writer: ShuffleBlockWriter,
+    /// Partitioning scheme to use
+    partitioning: CometPartitioning,
+    runtime: Arc<RuntimeEnv>,
+    metrics: ShufflePartitionerMetrics,
+    /// Reused scratch space for computing partition indices
+    scratch: ScratchSpace,
+    /// The configured batch size
+    batch_size: usize,
+    /// Reservation for repartitioning
+    reservation: MemoryReservation,
+    tracing_enabled: bool,
+    /// Size of the write buffer in bytes
+    write_buffer_size: usize,
+}
+
+impl MultiPartitionShuffleRepartitioner {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn try_new(
+        partition: usize,
+        output_data_file: String,
+        output_index_file: String,
+        schema: SchemaRef,
+        partitioning: CometPartitioning,
+        metrics: ShufflePartitionerMetrics,
+        runtime: Arc<RuntimeEnv>,
+        batch_size: usize,
+        codec: CompressionCodec,
+        tracing_enabled: bool,
+        write_buffer_size: usize,
+    ) -> datafusion::common::Result<Self> {
+        let num_output_partitions = partitioning.partition_count();
+        assert_ne!(
+            num_output_partitions, 1,
+            "Use SinglePartitionShufflePartitioner for 1 output partition."
+        );
+
+        // Vectors in the scratch space will be filled with valid values 
before being used, this
+        // initialization code is simply initializing the vectors to the 
desired size.
+        // The initial values are not used.
+        let scratch = ScratchSpace {
+            hashes_buf: match partitioning {
+                // Allocate hashes_buf for hash and round robin partitioning.
+                // Round robin hashes all columns to achieve even, 
deterministic distribution.
+                CometPartitioning::Hash(_, _) | 
CometPartitioning::RoundRobin(_, _) => {
+                    vec![0; batch_size]
+                }
+                _ => vec![],
+            },
+            partition_ids: vec![0; batch_size],
+            partition_row_indices: vec![0; batch_size],
+            partition_starts: vec![0; num_output_partitions + 1],
+        };
+
+        let shuffle_block_writer = 
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
+
+        let partition_writers = (0..num_output_partitions)
+            .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone()))
+            .collect::<datafusion::common::Result<Vec<_>>>()?;
+
+        let reservation = 
MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]"))
+            .with_can_spill(true)
+            .register(&runtime.memory_pool);
+
+        Ok(Self {
+            output_data_file,
+            output_index_file,
+            buffered_batches: vec![],
+            partition_indices: vec![vec![]; num_output_partitions],
+            partition_writers,
+            shuffle_block_writer,
+            partitioning,
+            runtime,
+            metrics,
+            scratch,
+            batch_size,
+            reservation,
+            tracing_enabled,
+            write_buffer_size,
+        })
+    }
+
+    /// Shuffles rows in input batch into corresponding partition buffer.
+    /// This function first calculates hashes for rows and then takes rows in 
same
+    /// partition as a record batch which is appended into partition buffer.
+    /// This should not be called directly. Use `insert_batch` instead.
+    async fn partitioning_batch(&mut self, input: RecordBatch) -> 
datafusion::common::Result<()> {
+        if input.num_rows() == 0 {
+            // skip empty batch
+            return Ok(());
+        }
+
+        if input.num_rows() > self.batch_size {
+            return Err(DataFusionError::Internal(
+                "Input batch size exceeds configured batch size. Call 
`insert_batch` instead."
+                    .to_string(),
+            ));
+        }
+
+        // Update data size metric
+        self.metrics.data_size.add(input.get_array_memory_size());
+
+        // NOTE: in shuffle writer exec, the output_rows metrics represents the
+        // number of rows those are written to output data file.
+        self.metrics.baseline.record_output(input.num_rows());
+
+        match &self.partitioning {
+            CometPartitioning::Hash(exprs, num_output_partitions) => {
+                let mut scratch = std::mem::take(&mut self.scratch);
+                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
+                    let mut timer = self.metrics.repart_time.timer();
+
+                    // Evaluate partition expressions to get rows to apply 
partitioning scheme.
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| 
expr.evaluate(&input)?.into_array(input.num_rows()))
+                        .collect::<datafusion::common::Result<Vec<_>>>()?;
+
+                    let num_rows = arrays[0].len();
+
+                    // Use identical seed as Spark hash partitioning.
+                    let hashes_buf = &mut scratch.hashes_buf[..num_rows];
+                    hashes_buf.fill(42_u32);
+
+                    // Generate partition ids for every row.
+                    {
+                        // Hash arrays and compute partition ids based on 
number of partitions.
+                        let partition_ids = &mut 
scratch.partition_ids[..num_rows];
+                        create_murmur3_hashes(&arrays, hashes_buf)?
+                            .iter()
+                            .enumerate()
+                            .for_each(|(idx, hash)| {
+                                partition_ids[idx] =
+                                    comet_partitioning::pmod(*hash, 
*num_output_partitions) as u32;
+                            });
+                    }
+
+                    // We now have partition ids for every input row, map that 
to partition starts
+                    // and partition indices to eventually right these rows to 
partition buffers.
+                    scratch
+                        
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
+
+                    timer.stop();
+                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+                        &scratch.partition_starts,
+                        &scratch.partition_row_indices,
+                    ))
+                }?;
+
+                self.buffer_partitioned_batch_may_spill(
+                    input,
+                    partition_row_indices,
+                    partition_starts,
+                )
+                .await?;
+                self.scratch = scratch;
+            }
+            CometPartitioning::RangePartitioning(
+                lex_ordering,
+                num_output_partitions,
+                row_converter,
+                bounds,
+            ) => {
+                let mut scratch = std::mem::take(&mut self.scratch);
+                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
+                    let mut timer = self.metrics.repart_time.timer();
+
+                    // Evaluate partition expressions for values to apply 
partitioning scheme on.
+                    let arrays = lex_ordering
+                        .iter()
+                        .map(|expr| 
expr.expr.evaluate(&input)?.into_array(input.num_rows()))
+                        .collect::<datafusion::common::Result<Vec<_>>>()?;
+
+                    let num_rows = arrays[0].len();
+
+                    // Generate partition ids for every row, first by 
converting the partition
+                    // arrays to Rows, and then doing binary search for each 
Row against the
+                    // bounds Rows.
+                    {
+                        let row_batch = 
row_converter.convert_columns(arrays.as_slice())?;
+                        let partition_ids = &mut 
scratch.partition_ids[..num_rows];
+
+                        row_batch.iter().enumerate().for_each(|(row_idx, row)| 
{
+                            partition_ids[row_idx] = bounds
+                                .as_slice()
+                                .partition_point(|bound| bound.row() <= row)
+                                as u32
+                        });
+                    }
+
+                    // We now have partition ids for every input row, map that 
to partition starts
+                    // and partition indices to eventually right these rows to 
partition buffers.
+                    scratch
+                        
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
+
+                    timer.stop();
+                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+                        &scratch.partition_starts,
+                        &scratch.partition_row_indices,
+                    ))
+                }?;
+
+                self.buffer_partitioned_batch_may_spill(
+                    input,
+                    partition_row_indices,
+                    partition_starts,
+                )
+                .await?;
+                self.scratch = scratch;
+            }
+            CometPartitioning::RoundRobin(num_output_partitions, 
max_hash_columns) => {
+                // Comet implements "round robin" as hash partitioning on 
columns.
+                // This achieves the same goal as Spark's round robin (even 
distribution
+                // without semantic grouping) while being deterministic for 
fault tolerance.
+                //
+                // Note: This produces different partition assignments than 
Spark's round robin,
+                // which sorts by UnsafeRow binary representation before 
assigning partitions.
+                // However, both approaches provide even distribution and 
determinism.
+                let mut scratch = std::mem::take(&mut self.scratch);
+                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
+                    let mut timer = self.metrics.repart_time.timer();
+
+                    let num_rows = input.num_rows();
+
+                    // Collect columns for hashing, respecting 
max_hash_columns limit
+                    // max_hash_columns of 0 means no limit (hash all columns)
+                    // Negative values are normalized to 0 in the planner
+                    let num_columns_to_hash = if *max_hash_columns == 0 {
+                        input.num_columns()
+                    } else {
+                        (*max_hash_columns).min(input.num_columns())
+                    };
+                    let columns_to_hash: Vec<ArrayRef> = 
(0..num_columns_to_hash)
+                        .map(|i| Arc::clone(input.column(i)))
+                        .collect();
+
+                    // Use identical seed as Spark hash partitioning.
+                    let hashes_buf = &mut scratch.hashes_buf[..num_rows];
+                    hashes_buf.fill(42_u32);
+
+                    // Compute hash for selected columns
+                    create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
+
+                    // Assign partition IDs based on hash (same as hash 
partitioning)
+                    let partition_ids = &mut scratch.partition_ids[..num_rows];
+                    hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
+                        partition_ids[idx] =
+                            comet_partitioning::pmod(*hash, 
*num_output_partitions) as u32;
+                    });
+
+                    // We now have partition ids for every input row, map that 
to partition starts
+                    // and partition indices to eventually write these rows to 
partition buffers.
+                    scratch
+                        
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
+
+                    timer.stop();
+                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+                        &scratch.partition_starts,
+                        &scratch.partition_row_indices,
+                    ))
+                }?;
+
+                self.buffer_partitioned_batch_may_spill(
+                    input,
+                    partition_row_indices,
+                    partition_starts,
+                )
+                .await?;
+                self.scratch = scratch;
+            }
+            other => {
+                // this should be unreachable as long as the validation logic
+                // in the constructor is kept up-to-date
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Unsupported shuffle partitioning scheme {other:?}"
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    async fn buffer_partitioned_batch_may_spill(
+        &mut self,
+        input: RecordBatch,
+        partition_row_indices: &[u32],
+        partition_starts: &[u32],
+    ) -> datafusion::common::Result<()> {
+        let mut mem_growth: usize = input.get_array_memory_size();
+        let buffered_partition_idx = self.buffered_batches.len() as u32;
+        self.buffered_batches.push(input);
+
+        // partition_starts conceptually slices partition_row_indices into 
smaller slices,
+        // each slice contains the indices of rows in input that will go into 
the corresponding
+        // partition. The following loop iterates over the slices and put the 
row indices into
+        // the indices array of the corresponding partition.
+        for (partition_id, (&start, &end)) in partition_starts
+            .iter()
+            .tuple_windows()
+            .enumerate()
+            .filter(|(_, (start, end))| start < end)
+        {
+            let row_indices = &partition_row_indices[start as usize..end as 
usize];
+
+            // Put row indices for the current partition into the indices 
array of that partition.
+            // This indices array will be used for calling 
interleave_record_batch to produce
+            // shuffled batches.
+            let indices = &mut self.partition_indices[partition_id];
+            let before_size = indices.allocated_size();
+            indices.reserve(row_indices.len());
+            for row_idx in row_indices {
+                indices.push((buffered_partition_idx, *row_idx));
+            }
+            let after_size = indices.allocated_size();
+            mem_growth += after_size.saturating_sub(before_size);
+        }
+
+        if self.reservation.try_grow(mem_growth).is_err() {
+            self.spill()?;
+        }
+
+        Ok(())
+    }
+
+    fn shuffle_write_partition(
+        partition_iter: &mut PartitionedBatchIterator,
+        shuffle_block_writer: &mut ShuffleBlockWriter,
+        output_data: &mut BufWriter<File>,
+        encode_time: &Time,
+        write_time: &Time,
+        write_buffer_size: usize,
+    ) -> datafusion::common::Result<()> {
+        let mut buf_batch_writer =
+            BufBatchWriter::new(shuffle_block_writer, output_data, 
write_buffer_size);
+        for batch in partition_iter {
+            let batch = batch?;
+            buf_batch_writer.write(&batch, encode_time, write_time)?;
+        }
+        buf_batch_writer.flush(write_time)?;
+        Ok(())
+    }
+
+    fn used(&self) -> usize {
+        self.reservation.size()
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.metrics.spilled_bytes.value()
+    }
+
+    fn spill_count(&self) -> usize {
+        self.metrics.spill_count.value()
+    }
+
+    fn data_size(&self) -> usize {
+        self.metrics.data_size.value()
+    }
+
+    /// This function transfers the ownership of the buffered batches and 
partition indices from the
+    /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned 
PartitionedBatches struct
+    /// can be used to produce shuffled batches.
+    fn partitioned_batches(&mut self) -> PartitionedBatchesProducer {
+        let num_output_partitions = self.partition_indices.len();
+        let buffered_batches = std::mem::take(&mut self.buffered_batches);
+        // let indices = std::mem::take(&mut self.partition_indices);
+        let indices = std::mem::replace(
+            &mut self.partition_indices,
+            vec![vec![]; num_output_partitions],
+        );
+        PartitionedBatchesProducer::new(buffered_batches, indices, 
self.batch_size)
+    }
+
+    pub(crate) fn spill(&mut self) -> datafusion::common::Result<()> {
+        log::info!(
+            "ShuffleRepartitioner spilling shuffle data of {} to disk while 
inserting ({} time(s) so far)",
+            self.used(),
+            self.spill_count()
+        );
+
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if self.buffered_batches.is_empty() {
+            return Ok(());
+        }
+
+        with_trace("shuffle_spill", self.tracing_enabled, || {
+            let num_output_partitions = self.partition_writers.len();
+            let mut partitioned_batches = self.partitioned_batches();
+            let mut spilled_bytes = 0;
+
+            for partition_id in 0..num_output_partitions {
+                let partition_writer = &mut 
self.partition_writers[partition_id];
+                let mut iter = partitioned_batches.produce(partition_id);
+                spilled_bytes += partition_writer.spill(
+                    &mut iter,
+                    &self.runtime,
+                    &self.metrics,
+                    self.write_buffer_size,
+                )?;
+            }
+
+            self.reservation.free();
+            self.metrics.spill_count.add(1);
+            self.metrics.spilled_bytes.add(spilled_bytes);
+            Ok(())
+        })
+    }
+
+    #[cfg(test)]
+    pub(crate) fn partition_writers(&self) -> &[PartitionWriter] {
+        &self.partition_writers
+    }
+}
+
+#[async_trait::async_trait]
+impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
+    /// Shuffles rows in input batch into corresponding partition buffer.
+    /// This function will slice input batch according to configured batch 
size and then
+    /// shuffle rows into corresponding partition buffer.
+    async fn insert_batch(&mut self, batch: RecordBatch) -> 
datafusion::common::Result<()> {
+        with_trace_async("shuffle_insert_batch", self.tracing_enabled, || 
async {
+            let start_time = Instant::now();
+            let mut start = 0;
+            while start < batch.num_rows() {
+                let end = (start + self.batch_size).min(batch.num_rows());
+                let batch = batch.slice(start, end - start);
+                self.partitioning_batch(batch).await?;
+                start = end;
+            }
+            self.metrics.input_batches.add(1);
+            self.metrics
+                .baseline
+                .elapsed_compute()
+                .add_duration(start_time.elapsed());
+            Ok(())
+        })
+        .await
+    }
+
+    /// Writes buffered shuffled record batches into Arrow IPC bytes.
+    fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
+        with_trace("shuffle_write", self.tracing_enabled, || {
+            let start_time = Instant::now();
+
+            let mut partitioned_batches = self.partitioned_batches();
+            let num_output_partitions = self.partition_indices.len();
+            let mut offsets = vec![0; num_output_partitions + 1];
+
+            let data_file = self.output_data_file.clone();
+            let index_file = self.output_index_file.clone();
+
+            let output_data = OpenOptions::new()
+                .write(true)
+                .create(true)
+                .truncate(true)
+                .open(data_file)
+                .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
+
+            let mut output_data = BufWriter::new(output_data);
+
+            #[allow(clippy::needless_range_loop)]
+            for i in 0..num_output_partitions {
+                offsets[i] = output_data.stream_position()?;
+
+                // if we wrote a spill file for this partition then copy the
+                // contents into the shuffle file
+                if let Some(spill_path) = self.partition_writers[i].path() {
+                    let mut spill_file = 
BufReader::new(File::open(spill_path)?);
+                    let mut write_timer = self.metrics.write_time.timer();
+                    std::io::copy(&mut spill_file, &mut output_data)?;
+                    write_timer.stop();
+                }
+
+                // Write in memory batches to output data file
+                let mut partition_iter = partitioned_batches.produce(i);
+                Self::shuffle_write_partition(
+                    &mut partition_iter,
+                    &mut self.shuffle_block_writer,
+                    &mut output_data,
+                    &self.metrics.encode_time,
+                    &self.metrics.write_time,
+                    self.write_buffer_size,
+                )?;
+            }
+
+            let mut write_timer = self.metrics.write_time.timer();
+            output_data.flush()?;
+            write_timer.stop();
+
+            // add one extra offset at last to ease partition length 
computation
+            offsets[num_output_partitions] = output_data.stream_position()?;
+
+            let mut write_timer = self.metrics.write_time.timer();
+            let mut output_index =
+                BufWriter::new(File::create(index_file).map_err(|e| {
+                    DataFusionError::Execution(format!("shuffle write error: 
{e:?}"))
+                })?);
+            for offset in offsets {
+                output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
+            }
+            output_index.flush()?;
+            write_timer.stop();
+
+            self.metrics
+                .baseline
+                .elapsed_compute()
+                .add_duration(start_time.elapsed());
+
+            Ok(())
+        })
+    }
+}
+
+impl Debug for MultiPartitionShuffleRepartitioner {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ShuffleRepartitioner")
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spill_count())
+            .field("data_size", &self.data_size())
+            .finish()
+    }
+}
diff --git 
a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs 
b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
new file mode 100644
index 000000000..77010938c
--- /dev/null
+++ 
b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::RecordBatch;
+use arrow::compute::interleave_record_batch;
+use datafusion::common::DataFusionError;
+
+/// A helper struct to produce shuffled batches.
+/// This struct takes ownership of the buffered batches and partition indices 
from the
+/// ShuffleRepartitioner, and provides an iterator over the batches in the 
specified partitions.
+pub(super) struct PartitionedBatchesProducer {
+    buffered_batches: Vec<RecordBatch>,
+    partition_indices: Vec<Vec<(u32, u32)>>,
+    batch_size: usize,
+}
+
+impl PartitionedBatchesProducer {
+    pub(super) fn new(
+        buffered_batches: Vec<RecordBatch>,
+        indices: Vec<Vec<(u32, u32)>>,
+        batch_size: usize,
+    ) -> Self {
+        Self {
+            partition_indices: indices,
+            buffered_batches,
+            batch_size,
+        }
+    }
+
+    pub(super) fn produce(&mut self, partition_id: usize) -> 
PartitionedBatchIterator<'_> {
+        PartitionedBatchIterator::new(
+            &self.partition_indices[partition_id],
+            &self.buffered_batches,
+            self.batch_size,
+        )
+    }
+}
+
+pub(crate) struct PartitionedBatchIterator<'a> {
+    record_batches: Vec<&'a RecordBatch>,
+    batch_size: usize,
+    indices: Vec<(usize, usize)>,
+    pos: usize,
+}
+
+impl<'a> PartitionedBatchIterator<'a> {
+    fn new(
+        indices: &'a [(u32, u32)],
+        buffered_batches: &'a [RecordBatch],
+        batch_size: usize,
+    ) -> Self {
+        if indices.is_empty() {
+            // Avoid unnecessary allocations when the partition is empty
+            return Self {
+                record_batches: vec![],
+                batch_size,
+                indices: vec![],
+                pos: 0,
+            };
+        }
+        let record_batches = buffered_batches.iter().collect::<Vec<_>>();
+        let current_indices = indices
+            .iter()
+            .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize))
+            .collect::<Vec<_>>();
+        Self {
+            record_batches,
+            batch_size,
+            indices: current_indices,
+            pos: 0,
+        }
+    }
+}
+
+impl Iterator for PartitionedBatchIterator<'_> {
+    type Item = datafusion::common::Result<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.pos >= self.indices.len() {
+            return None;
+        }
+
+        let indices_end = std::cmp::min(self.pos + self.batch_size, 
self.indices.len());
+        let indices = &self.indices[self.pos..indices_end];
+        match interleave_record_batch(&self.record_batches, indices) {
+            Ok(batch) => {
+                self.pos = indices_end;
+                Some(Ok(batch))
+            }
+            Err(e) => Some(Err(DataFusionError::ArrowError(
+                Box::from(e),
+                Some(DataFusionError::get_back_trace()),
+            ))),
+        }
+    }
+}
diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs 
b/native/core/src/execution/shuffle/partitioners/single_partition.rs
new file mode 100644
index 000000000..4ee5bd2bf
--- /dev/null
+++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::partitioners::ShufflePartitioner;
+use crate::execution::shuffle::writers::BufBatchWriter;
+use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use datafusion::common::DataFusionError;
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Write};
+use tokio::time::Instant;
+
+/// A partitioner that writes all shuffle data to a single file and a single 
index file
+pub(crate) struct SinglePartitionShufflePartitioner {
+    // output_data_file: File,
+    output_data_writer: BufBatchWriter<ShuffleBlockWriter, File>,
+    output_index_path: String,
+    /// Batches that are smaller than the batch size and to be concatenated
+    buffered_batches: Vec<RecordBatch>,
+    /// Number of rows in the concatenating batches
+    num_buffered_rows: usize,
+    /// Metrics for the repartitioner
+    metrics: ShufflePartitionerMetrics,
+    /// The configured batch size
+    batch_size: usize,
+}
+
+impl SinglePartitionShufflePartitioner {
+    pub(crate) fn try_new(
+        output_data_path: String,
+        output_index_path: String,
+        schema: SchemaRef,
+        metrics: ShufflePartitionerMetrics,
+        batch_size: usize,
+        codec: CompressionCodec,
+        write_buffer_size: usize,
+    ) -> datafusion::common::Result<Self> {
+        let shuffle_block_writer = 
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
+
+        let output_data_file = OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(output_data_path)?;
+
+        let output_data_writer =
+            BufBatchWriter::new(shuffle_block_writer, output_data_file, 
write_buffer_size);
+
+        Ok(Self {
+            output_data_writer,
+            output_index_path,
+            buffered_batches: vec![],
+            num_buffered_rows: 0,
+            metrics,
+            batch_size,
+        })
+    }
+
+    /// Add a batch to the buffer of the partitioner, these buffered batches 
will be concatenated
+    /// and written to the output data file when the number of rows in the 
buffer reaches the batch size.
+    fn add_buffered_batch(&mut self, batch: RecordBatch) {
+        self.num_buffered_rows += batch.num_rows();
+        self.buffered_batches.push(batch);
+    }
+
+    /// Consumes buffered batches and return a concatenated batch if successful
+    fn concat_buffered_batches(&mut self) -> 
datafusion::common::Result<Option<RecordBatch>> {
+        if self.buffered_batches.is_empty() {
+            Ok(None)
+        } else if self.buffered_batches.len() == 1 {
+            let batch = self.buffered_batches.remove(0);
+            self.num_buffered_rows = 0;
+            Ok(Some(batch))
+        } else {
+            let schema = &self.buffered_batches[0].schema();
+            match arrow::compute::concat_batches(schema, 
self.buffered_batches.iter()) {
+                Ok(concatenated) => {
+                    self.buffered_batches.clear();
+                    self.num_buffered_rows = 0;
+                    Ok(Some(concatenated))
+                }
+                Err(e) => Err(DataFusionError::ArrowError(
+                    Box::from(e),
+                    Some(DataFusionError::get_back_trace()),
+                )),
+            }
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl ShufflePartitioner for SinglePartitionShufflePartitioner {
+    async fn insert_batch(&mut self, batch: RecordBatch) -> 
datafusion::common::Result<()> {
+        let start_time = Instant::now();
+        let num_rows = batch.num_rows();
+
+        if num_rows > 0 {
+            self.metrics.data_size.add(batch.get_array_memory_size());
+            self.metrics.baseline.record_output(num_rows);
+
+            if num_rows >= self.batch_size || num_rows + 
self.num_buffered_rows > self.batch_size {
+                let concatenated_batch = self.concat_buffered_batches()?;
+
+                // Write the concatenated buffered batch
+                if let Some(batch) = concatenated_batch {
+                    self.output_data_writer.write(
+                        &batch,
+                        &self.metrics.encode_time,
+                        &self.metrics.write_time,
+                    )?;
+                }
+
+                if num_rows >= self.batch_size {
+                    // Write the new batch
+                    self.output_data_writer.write(
+                        &batch,
+                        &self.metrics.encode_time,
+                        &self.metrics.write_time,
+                    )?;
+                } else {
+                    // Add the new batch to the buffer
+                    self.add_buffered_batch(batch);
+                }
+            } else {
+                self.add_buffered_batch(batch);
+            }
+        }
+
+        self.metrics.input_batches.add(1);
+        self.metrics
+            .baseline
+            .elapsed_compute()
+            .add_duration(start_time.elapsed());
+        Ok(())
+    }
+
+    fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
+        let start_time = Instant::now();
+        let concatenated_batch = self.concat_buffered_batches()?;
+
+        // Write the concatenated buffered batch
+        if let Some(batch) = concatenated_batch {
+            self.output_data_writer.write(
+                &batch,
+                &self.metrics.encode_time,
+                &self.metrics.write_time,
+            )?;
+        }
+        self.output_data_writer.flush(&self.metrics.write_time)?;
+
+        // Write index file. It should only contain 2 entries: 0 and the total 
number of bytes written
+        let index_file = OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(self.output_index_path.clone())
+            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
+        let mut index_buf_writer = BufWriter::new(index_file);
+        let data_file_length = 
self.output_data_writer.writer_stream_position()?;
+        for offset in [0, data_file_length] {
+            index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
+        }
+        index_buf_writer.flush()?;
+
+        self.metrics
+            .baseline
+            .elapsed_compute()
+            .add_duration(start_time.elapsed());
+        Ok(())
+    }
+}
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 669a6df97..a7e689a69 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -18,43 +18,34 @@
 //! Defines the External shuffle repartition plan.
 
 use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
-use crate::execution::shuffle::{CometPartitioning, CompressionCodec, 
ShuffleBlockWriter};
-use crate::execution::tracing::{with_trace, with_trace_async};
-use arrow::compute::interleave_record_batch;
+use crate::execution::shuffle::partitioners::{
+    MultiPartitionShuffleRepartitioner, ShufflePartitioner, 
SinglePartitionShufflePartitioner,
+};
+use crate::execution::shuffle::{CometPartitioning, CompressionCodec};
+use crate::execution::tracing::with_trace_async;
 use async_trait::async_trait;
 use datafusion::common::exec_datafusion_err;
-use datafusion::common::utils::proxy::VecAllocExt;
 use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::EmptyRecordBatchStream;
 use datafusion::{
-    arrow::{array::*, datatypes::SchemaRef, error::ArrowError, 
record_batch::RecordBatch},
-    error::{DataFusionError, Result},
-    execution::{
-        context::TaskContext,
-        memory_pool::{MemoryConsumer, MemoryReservation},
-        runtime_env::RuntimeEnv,
-    },
+    arrow::{datatypes::SchemaRef, error::ArrowError},
+    error::Result,
+    execution::context::TaskContext,
     physical_plan::{
-        metrics::{ExecutionPlanMetricsSet, MetricsSet, Time},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
         DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
         Statistics,
     },
 };
-use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
 use futures::{StreamExt, TryFutureExt, TryStreamExt};
-use itertools::Itertools;
 use std::{
     any::Any,
     fmt,
     fmt::{Debug, Formatter},
-    fs::{File, OpenOptions},
-    io::{BufReader, BufWriter, Seek, Write},
     sync::Arc,
 };
-use tokio::time::Instant;
 
 /// The shuffle writer operator maps each input partition to M output 
partitions based on a
 /// partitioning scheme. No guarantees are made about the order of the 
resulting partitions.
@@ -271,872 +262,24 @@ async fn external_shuffle(
     .await
 }
 
-#[async_trait::async_trait]
-trait ShufflePartitioner: Send + Sync {
-    /// Insert a batch into the partitioner
-    async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
-    /// Write shuffle data and shuffle index file to disk
-    fn shuffle_write(&mut self) -> Result<()>;
-}
-
-/// A partitioner that uses a hash function to partition data into multiple 
partitions
-struct MultiPartitionShuffleRepartitioner {
-    output_data_file: String,
-    output_index_file: String,
-    buffered_batches: Vec<RecordBatch>,
-    partition_indices: Vec<Vec<(u32, u32)>>,
-    partition_writers: Vec<PartitionWriter>,
-    shuffle_block_writer: ShuffleBlockWriter,
-    /// Partitioning scheme to use
-    partitioning: CometPartitioning,
-    runtime: Arc<RuntimeEnv>,
-    metrics: ShufflePartitionerMetrics,
-    /// Reused scratch space for computing partition indices
-    scratch: ScratchSpace,
-    /// The configured batch size
-    batch_size: usize,
-    /// Reservation for repartitioning
-    reservation: MemoryReservation,
-    tracing_enabled: bool,
-    /// Size of the write buffer in bytes
-    write_buffer_size: usize,
-}
-
-#[derive(Default)]
-struct ScratchSpace {
-    /// Hashes for each row in the current batch.
-    hashes_buf: Vec<u32>,
-    /// Partition ids for each row in the current batch.
-    partition_ids: Vec<u32>,
-    /// The row indices of the rows in each partition. This array is 
conceptually divided into
-    /// partitions, where each partition contains the row indices of the rows 
in that partition.
-    /// The length of this array is the same as the number of rows in the 
batch.
-    partition_row_indices: Vec<u32>,
-    /// The start indices of partitions in partition_row_indices. 
partition_starts[K] and
-    /// partition_starts[K + 1] are the start and end indices of partition K 
in partition_row_indices.
-    /// The length of this array is 1 + the number of partitions.
-    partition_starts: Vec<u32>,
-}
-
-impl ScratchSpace {
-    fn map_partition_ids_to_starts_and_indices(
-        &mut self,
-        num_output_partitions: usize,
-        num_rows: usize,
-    ) {
-        let partition_ids = &mut self.partition_ids[..num_rows];
-
-        // count each partition size, while leaving the last extra element as 0
-        let partition_counters = &mut self.partition_starts;
-        partition_counters.resize(num_output_partitions + 1, 0);
-        partition_counters.fill(0);
-        partition_ids
-            .iter()
-            .for_each(|partition_id| partition_counters[*partition_id as 
usize] += 1);
-
-        // accumulate partition counters into partition ends
-        // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7]
-        let partition_ends = partition_counters;
-        let mut accum = 0;
-        partition_ends.iter_mut().for_each(|v| {
-            *v += accum;
-            accum = *v;
-        });
-
-        // calculate partition row indices and partition starts
-        // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the 
following partition_row_indices
-        // and partition_starts arrays:
-        //
-        //  partition_row_indices: [6, 1, 2, 3, 4, 5, 0]
-        //  partition_starts: [0, 1, 4, 6, 7]
-        //
-        // partition_starts conceptually splits partition_row_indices into 
smaller slices.
-        // Each slice 
partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the
-        // row indices of the input batch that are partitioned into partition 
K. For example,
-        // first partition 0 has one row index [6], partition 1 has row 
indices [1, 2, 3], etc.
-        let partition_row_indices = &mut self.partition_row_indices;
-        partition_row_indices.resize(num_rows, 0);
-        for (index, partition_id) in partition_ids.iter().enumerate().rev() {
-            partition_ends[*partition_id as usize] -= 1;
-            let end = partition_ends[*partition_id as usize];
-            partition_row_indices[end as usize] = index as u32;
-        }
-
-        // after calculating, partition ends become partition starts
-    }
-}
-
-impl MultiPartitionShuffleRepartitioner {
-    #[allow(clippy::too_many_arguments)]
-    pub fn try_new(
-        partition: usize,
-        output_data_file: String,
-        output_index_file: String,
-        schema: SchemaRef,
-        partitioning: CometPartitioning,
-        metrics: ShufflePartitionerMetrics,
-        runtime: Arc<RuntimeEnv>,
-        batch_size: usize,
-        codec: CompressionCodec,
-        tracing_enabled: bool,
-        write_buffer_size: usize,
-    ) -> Result<Self> {
-        let num_output_partitions = partitioning.partition_count();
-        assert_ne!(
-            num_output_partitions, 1,
-            "Use SinglePartitionShufflePartitioner for 1 output partition."
-        );
-
-        // Vectors in the scratch space will be filled with valid values 
before being used, this
-        // initialization code is simply initializing the vectors to the 
desired size.
-        // The initial values are not used.
-        let scratch = ScratchSpace {
-            hashes_buf: match partitioning {
-                // Allocate hashes_buf for hash and round robin partitioning.
-                // Round robin hashes all columns to achieve even, 
deterministic distribution.
-                CometPartitioning::Hash(_, _) | 
CometPartitioning::RoundRobin(_, _) => {
-                    vec![0; batch_size]
-                }
-                _ => vec![],
-            },
-            partition_ids: vec![0; batch_size],
-            partition_row_indices: vec![0; batch_size],
-            partition_starts: vec![0; num_output_partitions + 1],
-        };
-
-        let shuffle_block_writer = 
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
-
-        let partition_writers = (0..num_output_partitions)
-            .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone()))
-            .collect::<Result<Vec<_>>>()?;
-
-        let reservation = 
MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]"))
-            .with_can_spill(true)
-            .register(&runtime.memory_pool);
-
-        Ok(Self {
-            output_data_file,
-            output_index_file,
-            buffered_batches: vec![],
-            partition_indices: vec![vec![]; num_output_partitions],
-            partition_writers,
-            shuffle_block_writer,
-            partitioning,
-            runtime,
-            metrics,
-            scratch,
-            batch_size,
-            reservation,
-            tracing_enabled,
-            write_buffer_size,
-        })
-    }
-
-    /// Shuffles rows in input batch into corresponding partition buffer.
-    /// This function first calculates hashes for rows and then takes rows in 
same
-    /// partition as a record batch which is appended into partition buffer.
-    /// This should not be called directly. Use `insert_batch` instead.
-    async fn partitioning_batch(&mut self, input: RecordBatch) -> Result<()> {
-        if input.num_rows() == 0 {
-            // skip empty batch
-            return Ok(());
-        }
-
-        if input.num_rows() > self.batch_size {
-            return Err(DataFusionError::Internal(
-                "Input batch size exceeds configured batch size. Call 
`insert_batch` instead."
-                    .to_string(),
-            ));
-        }
-
-        // Update data size metric
-        self.metrics.data_size.add(input.get_array_memory_size());
-
-        // NOTE: in shuffle writer exec, the output_rows metrics represents the
-        // number of rows those are written to output data file.
-        self.metrics.baseline.record_output(input.num_rows());
-
-        match &self.partitioning {
-            CometPartitioning::Hash(exprs, num_output_partitions) => {
-                let mut scratch = std::mem::take(&mut self.scratch);
-                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
-                    let mut timer = self.metrics.repart_time.timer();
-
-                    // Evaluate partition expressions to get rows to apply 
partitioning scheme.
-                    let arrays = exprs
-                        .iter()
-                        .map(|expr| 
expr.evaluate(&input)?.into_array(input.num_rows()))
-                        .collect::<Result<Vec<_>>>()?;
-
-                    let num_rows = arrays[0].len();
-
-                    // Use identical seed as Spark hash partitioning.
-                    let hashes_buf = &mut scratch.hashes_buf[..num_rows];
-                    hashes_buf.fill(42_u32);
-
-                    // Generate partition ids for every row.
-                    {
-                        // Hash arrays and compute partition ids based on 
number of partitions.
-                        let partition_ids = &mut 
scratch.partition_ids[..num_rows];
-                        create_murmur3_hashes(&arrays, hashes_buf)?
-                            .iter()
-                            .enumerate()
-                            .for_each(|(idx, hash)| {
-                                partition_ids[idx] = pmod(*hash, 
*num_output_partitions) as u32;
-                            });
-                    }
-
-                    // We now have partition ids for every input row, map that 
to partition starts
-                    // and partition indices to eventually right these rows to 
partition buffers.
-                    scratch
-                        
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
-
-                    timer.stop();
-                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
-                        &scratch.partition_starts,
-                        &scratch.partition_row_indices,
-                    ))
-                }?;
-
-                self.buffer_partitioned_batch_may_spill(
-                    input,
-                    partition_row_indices,
-                    partition_starts,
-                )
-                .await?;
-                self.scratch = scratch;
-            }
-            CometPartitioning::RangePartitioning(
-                lex_ordering,
-                num_output_partitions,
-                row_converter,
-                bounds,
-            ) => {
-                let mut scratch = std::mem::take(&mut self.scratch);
-                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
-                    let mut timer = self.metrics.repart_time.timer();
-
-                    // Evaluate partition expressions for values to apply 
partitioning scheme on.
-                    let arrays = lex_ordering
-                        .iter()
-                        .map(|expr| 
expr.expr.evaluate(&input)?.into_array(input.num_rows()))
-                        .collect::<Result<Vec<_>>>()?;
-
-                    let num_rows = arrays[0].len();
-
-                    // Generate partition ids for every row, first by 
converting the partition
-                    // arrays to Rows, and then doing binary search for each 
Row against the
-                    // bounds Rows.
-                    {
-                        let row_batch = 
row_converter.convert_columns(arrays.as_slice())?;
-                        let partition_ids = &mut 
scratch.partition_ids[..num_rows];
-
-                        row_batch.iter().enumerate().for_each(|(row_idx, row)| 
{
-                            partition_ids[row_idx] = bounds
-                                .as_slice()
-                                .partition_point(|bound| bound.row() <= row)
-                                as u32
-                        });
-                    }
-
-                    // We now have partition ids for every input row, map that 
to partition starts
-                    // and partition indices to eventually right these rows to 
partition buffers.
-                    scratch
-                        
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
-
-                    timer.stop();
-                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
-                        &scratch.partition_starts,
-                        &scratch.partition_row_indices,
-                    ))
-                }?;
-
-                self.buffer_partitioned_batch_may_spill(
-                    input,
-                    partition_row_indices,
-                    partition_starts,
-                )
-                .await?;
-                self.scratch = scratch;
-            }
-            CometPartitioning::RoundRobin(num_output_partitions, 
max_hash_columns) => {
-                // Comet implements "round robin" as hash partitioning on 
columns.
-                // This achieves the same goal as Spark's round robin (even 
distribution
-                // without semantic grouping) while being deterministic for 
fault tolerance.
-                //
-                // Note: This produces different partition assignments than 
Spark's round robin,
-                // which sorts by UnsafeRow binary representation before 
assigning partitions.
-                // However, both approaches provide even distribution and 
determinism.
-                let mut scratch = std::mem::take(&mut self.scratch);
-                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
-                    let mut timer = self.metrics.repart_time.timer();
-
-                    let num_rows = input.num_rows();
-
-                    // Collect columns for hashing, respecting 
max_hash_columns limit
-                    // max_hash_columns of 0 means no limit (hash all columns)
-                    // Negative values are normalized to 0 in the planner
-                    let num_columns_to_hash = if *max_hash_columns == 0 {
-                        input.num_columns()
-                    } else {
-                        (*max_hash_columns).min(input.num_columns())
-                    };
-                    let columns_to_hash: Vec<ArrayRef> = 
(0..num_columns_to_hash)
-                        .map(|i| Arc::clone(input.column(i)))
-                        .collect();
-
-                    // Use identical seed as Spark hash partitioning.
-                    let hashes_buf = &mut scratch.hashes_buf[..num_rows];
-                    hashes_buf.fill(42_u32);
-
-                    // Compute hash for selected columns
-                    create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
-
-                    // Assign partition IDs based on hash (same as hash 
partitioning)
-                    let partition_ids = &mut scratch.partition_ids[..num_rows];
-                    hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
-                        partition_ids[idx] = pmod(*hash, 
*num_output_partitions) as u32;
-                    });
-
-                    // We now have partition ids for every input row, map that 
to partition starts
-                    // and partition indices to eventually write these rows to 
partition buffers.
-                    scratch
-                        
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
-
-                    timer.stop();
-                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
-                        &scratch.partition_starts,
-                        &scratch.partition_row_indices,
-                    ))
-                }?;
-
-                self.buffer_partitioned_batch_may_spill(
-                    input,
-                    partition_row_indices,
-                    partition_starts,
-                )
-                .await?;
-                self.scratch = scratch;
-            }
-            other => {
-                // this should be unreachable as long as the validation logic
-                // in the constructor is kept up-to-date
-                return Err(DataFusionError::NotImplemented(format!(
-                    "Unsupported shuffle partitioning scheme {other:?}"
-                )));
-            }
-        }
-        Ok(())
-    }
-
-    async fn buffer_partitioned_batch_may_spill(
-        &mut self,
-        input: RecordBatch,
-        partition_row_indices: &[u32],
-        partition_starts: &[u32],
-    ) -> Result<()> {
-        let mut mem_growth: usize = input.get_array_memory_size();
-        let buffered_partition_idx = self.buffered_batches.len() as u32;
-        self.buffered_batches.push(input);
-
-        // partition_starts conceptually slices partition_row_indices into 
smaller slices,
-        // each slice contains the indices of rows in input that will go into 
the corresponding
-        // partition. The following loop iterates over the slices and put the 
row indices into
-        // the indices array of the corresponding partition.
-        for (partition_id, (&start, &end)) in partition_starts
-            .iter()
-            .tuple_windows()
-            .enumerate()
-            .filter(|(_, (start, end))| start < end)
-        {
-            let row_indices = &partition_row_indices[start as usize..end as 
usize];
-
-            // Put row indices for the current partition into the indices 
array of that partition.
-            // This indices array will be used for calling 
interleave_record_batch to produce
-            // shuffled batches.
-            let indices = &mut self.partition_indices[partition_id];
-            let before_size = indices.allocated_size();
-            indices.reserve(row_indices.len());
-            for row_idx in row_indices {
-                indices.push((buffered_partition_idx, *row_idx));
-            }
-            let after_size = indices.allocated_size();
-            mem_growth += after_size.saturating_sub(before_size);
-        }
-
-        if self.reservation.try_grow(mem_growth).is_err() {
-            self.spill()?;
-        }
-
-        Ok(())
-    }
-
-    fn shuffle_write_partition(
-        partition_iter: &mut PartitionedBatchIterator,
-        shuffle_block_writer: &mut ShuffleBlockWriter,
-        output_data: &mut BufWriter<File>,
-        encode_time: &Time,
-        write_time: &Time,
-        write_buffer_size: usize,
-    ) -> Result<()> {
-        let mut buf_batch_writer =
-            BufBatchWriter::new(shuffle_block_writer, output_data, 
write_buffer_size);
-        for batch in partition_iter {
-            let batch = batch?;
-            buf_batch_writer.write(&batch, encode_time, write_time)?;
-        }
-        buf_batch_writer.flush(write_time)?;
-        Ok(())
-    }
-
-    fn used(&self) -> usize {
-        self.reservation.size()
-    }
-
-    fn spilled_bytes(&self) -> usize {
-        self.metrics.spilled_bytes.value()
-    }
-
-    fn spill_count(&self) -> usize {
-        self.metrics.spill_count.value()
-    }
-
-    fn data_size(&self) -> usize {
-        self.metrics.data_size.value()
-    }
-
-    /// This function transfers the ownership of the buffered batches and 
partition indices from the
-    /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned 
PartitionedBatches struct
-    /// can be used to produce shuffled batches.
-    fn partitioned_batches(&mut self) -> PartitionedBatchesProducer {
-        let num_output_partitions = self.partition_indices.len();
-        let buffered_batches = std::mem::take(&mut self.buffered_batches);
-        // let indices = std::mem::take(&mut self.partition_indices);
-        let indices = std::mem::replace(
-            &mut self.partition_indices,
-            vec![vec![]; num_output_partitions],
-        );
-        PartitionedBatchesProducer::new(buffered_batches, indices, 
self.batch_size)
-    }
-
-    fn spill(&mut self) -> Result<()> {
-        log::info!(
-            "ShuffleRepartitioner spilling shuffle data of {} to disk while 
inserting ({} time(s) so far)",
-            self.used(),
-            self.spill_count()
-        );
-
-        // we could always get a chance to free some memory as long as we are 
holding some
-        if self.buffered_batches.is_empty() {
-            return Ok(());
-        }
-
-        with_trace("shuffle_spill", self.tracing_enabled, || {
-            let num_output_partitions = self.partition_writers.len();
-            let mut partitioned_batches = self.partitioned_batches();
-            let mut spilled_bytes = 0;
-
-            for partition_id in 0..num_output_partitions {
-                let partition_writer = &mut 
self.partition_writers[partition_id];
-                let mut iter = partitioned_batches.produce(partition_id);
-                spilled_bytes += partition_writer.spill(
-                    &mut iter,
-                    &self.runtime,
-                    &self.metrics,
-                    self.write_buffer_size,
-                )?;
-            }
-
-            self.reservation.free();
-            self.metrics.spill_count.add(1);
-            self.metrics.spilled_bytes.add(spilled_bytes);
-            Ok(())
-        })
-    }
-}
-
-#[async_trait::async_trait]
-impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
-    /// Shuffles rows in input batch into corresponding partition buffer.
-    /// This function will slice input batch according to configured batch 
size and then
-    /// shuffle rows into corresponding partition buffer.
-    async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
-        with_trace_async("shuffle_insert_batch", self.tracing_enabled, || 
async {
-            let start_time = Instant::now();
-            let mut start = 0;
-            while start < batch.num_rows() {
-                let end = (start + self.batch_size).min(batch.num_rows());
-                let batch = batch.slice(start, end - start);
-                self.partitioning_batch(batch).await?;
-                start = end;
-            }
-            self.metrics.input_batches.add(1);
-            self.metrics
-                .baseline
-                .elapsed_compute()
-                .add_duration(start_time.elapsed());
-            Ok(())
-        })
-        .await
-    }
-
-    /// Writes buffered shuffled record batches into Arrow IPC bytes.
-    fn shuffle_write(&mut self) -> Result<()> {
-        with_trace("shuffle_write", self.tracing_enabled, || {
-            let start_time = Instant::now();
-
-            let mut partitioned_batches = self.partitioned_batches();
-            let num_output_partitions = self.partition_indices.len();
-            let mut offsets = vec![0; num_output_partitions + 1];
-
-            let data_file = self.output_data_file.clone();
-            let index_file = self.output_index_file.clone();
-
-            let output_data = OpenOptions::new()
-                .write(true)
-                .create(true)
-                .truncate(true)
-                .open(data_file)
-                .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
-
-            let mut output_data = BufWriter::new(output_data);
-
-            #[allow(clippy::needless_range_loop)]
-            for i in 0..num_output_partitions {
-                offsets[i] = output_data.stream_position()?;
-
-                // if we wrote a spill file for this partition then copy the
-                // contents into the shuffle file
-                if let Some(spill_path) = self.partition_writers[i].path() {
-                    let mut spill_file = 
BufReader::new(File::open(spill_path)?);
-                    let mut write_timer = self.metrics.write_time.timer();
-                    std::io::copy(&mut spill_file, &mut output_data)?;
-                    write_timer.stop();
-                }
-
-                // Write in memory batches to output data file
-                let mut partition_iter = partitioned_batches.produce(i);
-                Self::shuffle_write_partition(
-                    &mut partition_iter,
-                    &mut self.shuffle_block_writer,
-                    &mut output_data,
-                    &self.metrics.encode_time,
-                    &self.metrics.write_time,
-                    self.write_buffer_size,
-                )?;
-            }
-
-            let mut write_timer = self.metrics.write_time.timer();
-            output_data.flush()?;
-            write_timer.stop();
-
-            // add one extra offset at last to ease partition length 
computation
-            offsets[num_output_partitions] = output_data.stream_position()?;
-
-            let mut write_timer = self.metrics.write_time.timer();
-            let mut output_index =
-                BufWriter::new(File::create(index_file).map_err(|e| {
-                    DataFusionError::Execution(format!("shuffle write error: 
{e:?}"))
-                })?);
-            for offset in offsets {
-                output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
-            }
-            output_index.flush()?;
-            write_timer.stop();
-
-            self.metrics
-                .baseline
-                .elapsed_compute()
-                .add_duration(start_time.elapsed());
-
-            Ok(())
-        })
-    }
-}
-
-impl Debug for MultiPartitionShuffleRepartitioner {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        f.debug_struct("ShuffleRepartitioner")
-            .field("memory_used", &self.used())
-            .field("spilled_bytes", &self.spilled_bytes())
-            .field("spilled_count", &self.spill_count())
-            .field("data_size", &self.data_size())
-            .finish()
-    }
-}
-
-/// A partitioner that writes all shuffle data to a single file and a single 
index file
-struct SinglePartitionShufflePartitioner {
-    // output_data_file: File,
-    output_data_writer: BufBatchWriter<ShuffleBlockWriter, File>,
-    output_index_path: String,
-    /// Batches that are smaller than the batch size and to be concatenated
-    buffered_batches: Vec<RecordBatch>,
-    /// Number of rows in the concatenating batches
-    num_buffered_rows: usize,
-    /// Metrics for the repartitioner
-    metrics: ShufflePartitionerMetrics,
-    /// The configured batch size
-    batch_size: usize,
-}
-
-impl SinglePartitionShufflePartitioner {
-    fn try_new(
-        output_data_path: String,
-        output_index_path: String,
-        schema: SchemaRef,
-        metrics: ShufflePartitionerMetrics,
-        batch_size: usize,
-        codec: CompressionCodec,
-        write_buffer_size: usize,
-    ) -> Result<Self> {
-        let shuffle_block_writer = 
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
-
-        let output_data_file = OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .open(output_data_path)?;
-
-        let output_data_writer =
-            BufBatchWriter::new(shuffle_block_writer, output_data_file, 
write_buffer_size);
-
-        Ok(Self {
-            output_data_writer,
-            output_index_path,
-            buffered_batches: vec![],
-            num_buffered_rows: 0,
-            metrics,
-            batch_size,
-        })
-    }
-
-    /// Add a batch to the buffer of the partitioner, these buffered batches 
will be concatenated
-    /// and written to the output data file when the number of rows in the 
buffer reaches the batch size.
-    fn add_buffered_batch(&mut self, batch: RecordBatch) {
-        self.num_buffered_rows += batch.num_rows();
-        self.buffered_batches.push(batch);
-    }
-
-    /// Consumes buffered batches and return a concatenated batch if successful
-    fn concat_buffered_batches(&mut self) -> Result<Option<RecordBatch>> {
-        if self.buffered_batches.is_empty() {
-            Ok(None)
-        } else if self.buffered_batches.len() == 1 {
-            let batch = self.buffered_batches.remove(0);
-            self.num_buffered_rows = 0;
-            Ok(Some(batch))
-        } else {
-            let schema = &self.buffered_batches[0].schema();
-            match arrow::compute::concat_batches(schema, 
self.buffered_batches.iter()) {
-                Ok(concatenated) => {
-                    self.buffered_batches.clear();
-                    self.num_buffered_rows = 0;
-                    Ok(Some(concatenated))
-                }
-                Err(e) => Err(DataFusionError::ArrowError(
-                    Box::from(e),
-                    Some(DataFusionError::get_back_trace()),
-                )),
-            }
-        }
-    }
-}
-
-#[async_trait::async_trait]
-impl ShufflePartitioner for SinglePartitionShufflePartitioner {
-    async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
-        let start_time = Instant::now();
-        let num_rows = batch.num_rows();
-
-        if num_rows > 0 {
-            self.metrics.data_size.add(batch.get_array_memory_size());
-            self.metrics.baseline.record_output(num_rows);
-
-            if num_rows >= self.batch_size || num_rows + 
self.num_buffered_rows > self.batch_size {
-                let concatenated_batch = self.concat_buffered_batches()?;
-
-                // Write the concatenated buffered batch
-                if let Some(batch) = concatenated_batch {
-                    self.output_data_writer.write(
-                        &batch,
-                        &self.metrics.encode_time,
-                        &self.metrics.write_time,
-                    )?;
-                }
-
-                if num_rows >= self.batch_size {
-                    // Write the new batch
-                    self.output_data_writer.write(
-                        &batch,
-                        &self.metrics.encode_time,
-                        &self.metrics.write_time,
-                    )?;
-                } else {
-                    // Add the new batch to the buffer
-                    self.add_buffered_batch(batch);
-                }
-            } else {
-                self.add_buffered_batch(batch);
-            }
-        }
-
-        self.metrics.input_batches.add(1);
-        self.metrics
-            .baseline
-            .elapsed_compute()
-            .add_duration(start_time.elapsed());
-        Ok(())
-    }
-
-    fn shuffle_write(&mut self) -> Result<()> {
-        let start_time = Instant::now();
-        let concatenated_batch = self.concat_buffered_batches()?;
-
-        // Write the concatenated buffered batch
-        if let Some(batch) = concatenated_batch {
-            self.output_data_writer.write(
-                &batch,
-                &self.metrics.encode_time,
-                &self.metrics.write_time,
-            )?;
-        }
-        self.output_data_writer.flush(&self.metrics.write_time)?;
-
-        // Write index file. It should only contain 2 entries: 0 and the total 
number of bytes written
-        let index_file = OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .open(self.output_index_path.clone())
-            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
-        let mut index_buf_writer = BufWriter::new(index_file);
-        let data_file_length = 
self.output_data_writer.writer_stream_position()?;
-        for offset in [0, data_file_length] {
-            index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
-        }
-        index_buf_writer.flush()?;
-
-        self.metrics
-            .baseline
-            .elapsed_compute()
-            .add_duration(start_time.elapsed());
-        Ok(())
-    }
-}
-
-/// A helper struct to produce shuffled batches.
-/// This struct takes ownership of the buffered batches and partition indices 
from the
-/// ShuffleRepartitioner, and provides an iterator over the batches in the 
specified partitions.
-pub(crate) struct PartitionedBatchesProducer {
-    buffered_batches: Vec<RecordBatch>,
-    partition_indices: Vec<Vec<(u32, u32)>>,
-    batch_size: usize,
-}
-
-impl PartitionedBatchesProducer {
-    pub(crate) fn new(
-        buffered_batches: Vec<RecordBatch>,
-        indices: Vec<Vec<(u32, u32)>>,
-        batch_size: usize,
-    ) -> Self {
-        Self {
-            partition_indices: indices,
-            buffered_batches,
-            batch_size,
-        }
-    }
-
-    fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> 
{
-        PartitionedBatchIterator::new(
-            &self.partition_indices[partition_id],
-            &self.buffered_batches,
-            self.batch_size,
-        )
-    }
-}
-
-pub(crate) struct PartitionedBatchIterator<'a> {
-    record_batches: Vec<&'a RecordBatch>,
-    batch_size: usize,
-    indices: Vec<(usize, usize)>,
-    pos: usize,
-}
-
-impl<'a> PartitionedBatchIterator<'a> {
-    fn new(
-        indices: &'a [(u32, u32)],
-        buffered_batches: &'a [RecordBatch],
-        batch_size: usize,
-    ) -> Self {
-        if indices.is_empty() {
-            // Avoid unnecessary allocations when the partition is empty
-            return Self {
-                record_batches: vec![],
-                batch_size,
-                indices: vec![],
-                pos: 0,
-            };
-        }
-        let record_batches = buffered_batches.iter().collect::<Vec<_>>();
-        let current_indices = indices
-            .iter()
-            .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize))
-            .collect::<Vec<_>>();
-        Self {
-            record_batches,
-            batch_size,
-            indices: current_indices,
-            pos: 0,
-        }
-    }
-}
-
-impl Iterator for PartitionedBatchIterator<'_> {
-    type Item = Result<RecordBatch>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.pos >= self.indices.len() {
-            return None;
-        }
-
-        let indices_end = std::cmp::min(self.pos + self.batch_size, 
self.indices.len());
-        let indices = &self.indices[self.pos..indices_end];
-        match interleave_record_batch(&self.record_batches, indices) {
-            Ok(batch) => {
-                self.pos = indices_end;
-                Some(Ok(batch))
-            }
-            Err(e) => Some(Err(DataFusionError::ArrowError(
-                Box::from(e),
-                Some(DataFusionError::get_back_trace()),
-            ))),
-        }
-    }
-}
-
-fn pmod(hash: u32, n: usize) -> usize {
-    let hash = hash as i32;
-    let n = n as i32;
-    let r = hash % n;
-    let result = if r < 0 { (r + n) % n } else { r };
-    result as usize
-}
-
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::execution::shuffle::read_ipc_compressed;
+    use crate::execution::shuffle::{read_ipc_compressed, ShuffleBlockWriter};
+    use arrow::array::{Array, StringArray, StringBuilder};
     use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::RecordBatch;
     use arrow::row::{RowConverter, SortField};
     use datafusion::datasource::memory::MemorySourceConfig;
     use datafusion::datasource::source::DataSourceExec;
     use datafusion::execution::config::SessionConfig;
-    use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+    use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
     use datafusion::physical_expr::expressions::{col, Column};
     use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
     use datafusion::physical_plan::common::collect;
+    use datafusion::physical_plan::metrics::Time;
     use datafusion::prelude::SessionContext;
+    use itertools::Itertools;
     use std::io::Cursor;
     use tokio::runtime::Runtime;
 
@@ -1224,16 +367,22 @@ mod test {
 
         repartitioner.insert_batch(batch.clone()).await.unwrap();
 
-        assert_eq!(2, repartitioner.partition_writers.len());
+        {
+            let partition_writers = repartitioner.partition_writers();
+            assert_eq!(partition_writers.len(), 2);
 
-        assert!(!repartitioner.partition_writers[0].has_spill_file());
-        assert!(!repartitioner.partition_writers[1].has_spill_file());
+            assert!(!partition_writers[0].has_spill_file());
+            assert!(!partition_writers[1].has_spill_file());
+        }
 
         repartitioner.spill().unwrap();
 
         // after spill, there should be spill files
-        assert!(repartitioner.partition_writers[0].has_spill_file());
-        assert!(repartitioner.partition_writers[1].has_spill_file());
+        {
+            let partition_writers = repartitioner.partition_writers();
+            assert!(partition_writers[0].has_spill_file());
+            assert!(partition_writers[1].has_spill_file());
+        }
 
         // insert another batch after spilling
         repartitioner.insert_batch(batch.clone()).await.unwrap();
@@ -1346,16 +495,6 @@ mod test {
         RecordBatch::try_new(Arc::clone(&schema), 
vec![Arc::new(array)]).unwrap()
     }
 
-    #[test]
-    fn test_pmod() {
-        let i: Vec<u32> = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 
0xcd1e64fb];
-        let result = i.into_iter().map(|i| pmod(i, 
200)).collect::<Vec<usize>>();
-
-        // expected partition from Spark with n=200
-        let expected = vec![69, 5, 193, 171, 115];
-        assert_eq!(result, expected);
-    }
-
     #[test]
     #[cfg_attr(miri, ignore)]
     fn test_round_robin_deterministic() {
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs 
b/native/core/src/execution/shuffle/writers/partition_writer.rs
index 8b2555e09..8b5163d44 100644
--- a/native/core/src/execution/shuffle/writers/partition_writer.rs
+++ b/native/core/src/execution/shuffle/writers/partition_writer.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator;
+use crate::execution::shuffle::partitioners::PartitionedBatchIterator;
 use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter;
 use crate::execution::shuffle::ShuffleBlockWriter;
 use datafusion::common::DataFusionError;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to