This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 2e24695d2 chore: Clean up and split shuffle module (#3395)
2e24695d2 is described below
commit 2e24695d2a7853b43cc85357f6e67061006fe3f2
Author: Emily Matheys <[email protected]>
AuthorDate: Thu Feb 5 15:23:13 2026 +0200
chore: Clean up and split shuffle module (#3395)
---
.../src/execution/shuffle/comet_partitioning.rs | 23 +
native/core/src/execution/shuffle/mod.rs | 1 +
.../execution/shuffle/{ => partitioners}/mod.rs | 26 +-
.../shuffle/partitioners/multi_partition.rs | 635 ++++++++++++++
.../partitioners/partitioned_batch_iterator.rs | 110 +++
.../shuffle/partitioners/single_partition.rs | 187 +++++
.../core/src/execution/shuffle/shuffle_writer.rs | 913 +--------------------
.../execution/shuffle/writers/partition_writer.rs | 2 +-
8 files changed, 1000 insertions(+), 897 deletions(-)
diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs
b/native/core/src/execution/shuffle/comet_partitioning.rs
index b7ad15879..b8d68cd21 100644
--- a/native/core/src/execution/shuffle/comet_partitioning.rs
+++ b/native/core/src/execution/shuffle/comet_partitioning.rs
@@ -46,3 +46,26 @@ impl CometPartitioning {
}
}
}
+
+pub(super) fn pmod(hash: u32, n: usize) -> usize {
+ let hash = hash as i32;
+ let n = n as i32;
+ let r = hash % n;
+ let result = if r < 0 { (r + n) % n } else { r };
+ result as usize
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_pmod() {
+ let i: Vec<u32> = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b,
0xcd1e64fb];
+ let result = i.into_iter().map(|i| pmod(i,
200)).collect::<Vec<usize>>();
+
+ // expected partition from Spark with n=200
+ let expected = vec![69, 5, 193, 171, 115];
+ assert_eq!(result, expected);
+ }
+}
diff --git a/native/core/src/execution/shuffle/mod.rs
b/native/core/src/execution/shuffle/mod.rs
index a41d269d8..6018cff50 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -18,6 +18,7 @@
pub(crate) mod codec;
mod comet_partitioning;
mod metrics;
+mod partitioners;
mod shuffle_writer;
pub mod spark_unsafe;
mod writers;
diff --git a/native/core/src/execution/shuffle/mod.rs
b/native/core/src/execution/shuffle/partitioners/mod.rs
similarity index 55%
copy from native/core/src/execution/shuffle/mod.rs
copy to native/core/src/execution/shuffle/partitioners/mod.rs
index a41d269d8..b9058f66f 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/partitioners/mod.rs
@@ -15,13 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod codec;
-mod comet_partitioning;
-mod metrics;
-mod shuffle_writer;
-pub mod spark_unsafe;
-mod writers;
+mod multi_partition;
+mod partitioned_batch_iterator;
+mod single_partition;
-pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
-pub use comet_partitioning::CometPartitioning;
-pub use shuffle_writer::ShuffleWriterExec;
+use arrow::record_batch::RecordBatch;
+use datafusion::common::Result;
+
+pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
+pub(super) use partitioned_batch_iterator::PartitionedBatchIterator;
+pub(super) use single_partition::SinglePartitionShufflePartitioner;
+
+#[async_trait::async_trait]
+pub(super) trait ShufflePartitioner: Send + Sync {
+ /// Insert a batch into the partitioner
+ async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
+ /// Write shuffle data and shuffle index file to disk
+ fn shuffle_write(&mut self) -> Result<()>;
+}
diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs
b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
new file mode 100644
index 000000000..35f754695
--- /dev/null
+++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{
+ PartitionedBatchIterator, PartitionedBatchesProducer,
+};
+use crate::execution::shuffle::partitioners::ShufflePartitioner;
+use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
+use crate::execution::shuffle::{
+ comet_partitioning, CometPartitioning, CompressionCodec,
ShuffleBlockWriter,
+};
+use crate::execution::tracing::{with_trace, with_trace_async};
+use arrow::array::{ArrayRef, RecordBatch};
+use arrow::datatypes::SchemaRef;
+use datafusion::common::utils::proxy::VecAllocExt;
+use datafusion::common::DataFusionError;
+use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::physical_plan::metrics::Time;
+use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes;
+use itertools::Itertools;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::{File, OpenOptions};
+use std::io::{BufReader, BufWriter, Seek, Write};
+use std::sync::Arc;
+use tokio::time::Instant;
+
+#[derive(Default)]
+struct ScratchSpace {
+ /// Hashes for each row in the current batch.
+ hashes_buf: Vec<u32>,
+ /// Partition ids for each row in the current batch.
+ partition_ids: Vec<u32>,
+ /// The row indices of the rows in each partition. This array is
conceptually divided into
+ /// partitions, where each partition contains the row indices of the rows
in that partition.
+ /// The length of this array is the same as the number of rows in the
batch.
+ partition_row_indices: Vec<u32>,
+ /// The start indices of partitions in partition_row_indices.
partition_starts[K] and
+ /// partition_starts[K + 1] are the start and end indices of partition K
in partition_row_indices.
+ /// The length of this array is 1 + the number of partitions.
+ partition_starts: Vec<u32>,
+}
+
+impl ScratchSpace {
+ fn map_partition_ids_to_starts_and_indices(
+ &mut self,
+ num_output_partitions: usize,
+ num_rows: usize,
+ ) {
+ let partition_ids = &mut self.partition_ids[..num_rows];
+
+ // count each partition size, while leaving the last extra element as 0
+ let partition_counters = &mut self.partition_starts;
+ partition_counters.resize(num_output_partitions + 1, 0);
+ partition_counters.fill(0);
+ partition_ids
+ .iter()
+ .for_each(|partition_id| partition_counters[*partition_id as
usize] += 1);
+
+ // accumulate partition counters into partition ends
+ // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7]
+ let partition_ends = partition_counters;
+ let mut accum = 0;
+ partition_ends.iter_mut().for_each(|v| {
+ *v += accum;
+ accum = *v;
+ });
+
+ // calculate partition row indices and partition starts
+ // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the
following partition_row_indices
+ // and partition_starts arrays:
+ //
+ // partition_row_indices: [6, 1, 2, 3, 4, 5, 0]
+ // partition_starts: [0, 1, 4, 6, 7]
+ //
+ // partition_starts conceptually splits partition_row_indices into
smaller slices.
+ // Each slice
partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the
+ // row indices of the input batch that are partitioned into partition
K. For example,
+ // first partition 0 has one row index [6], partition 1 has row
indices [1, 2, 3], etc.
+ let partition_row_indices = &mut self.partition_row_indices;
+ partition_row_indices.resize(num_rows, 0);
+ for (index, partition_id) in partition_ids.iter().enumerate().rev() {
+ partition_ends[*partition_id as usize] -= 1;
+ let end = partition_ends[*partition_id as usize];
+ partition_row_indices[end as usize] = index as u32;
+ }
+
+ // after calculating, partition ends become partition starts
+ }
+}
+
+/// A partitioner that uses a hash function to partition data into multiple
partitions
+pub(crate) struct MultiPartitionShuffleRepartitioner {
+ output_data_file: String,
+ output_index_file: String,
+ buffered_batches: Vec<RecordBatch>,
+ partition_indices: Vec<Vec<(u32, u32)>>,
+ partition_writers: Vec<PartitionWriter>,
+ shuffle_block_writer: ShuffleBlockWriter,
+ /// Partitioning scheme to use
+ partitioning: CometPartitioning,
+ runtime: Arc<RuntimeEnv>,
+ metrics: ShufflePartitionerMetrics,
+ /// Reused scratch space for computing partition indices
+ scratch: ScratchSpace,
+ /// The configured batch size
+ batch_size: usize,
+ /// Reservation for repartitioning
+ reservation: MemoryReservation,
+ tracing_enabled: bool,
+ /// Size of the write buffer in bytes
+ write_buffer_size: usize,
+}
+
+impl MultiPartitionShuffleRepartitioner {
+ #[allow(clippy::too_many_arguments)]
+ pub(crate) fn try_new(
+ partition: usize,
+ output_data_file: String,
+ output_index_file: String,
+ schema: SchemaRef,
+ partitioning: CometPartitioning,
+ metrics: ShufflePartitionerMetrics,
+ runtime: Arc<RuntimeEnv>,
+ batch_size: usize,
+ codec: CompressionCodec,
+ tracing_enabled: bool,
+ write_buffer_size: usize,
+ ) -> datafusion::common::Result<Self> {
+ let num_output_partitions = partitioning.partition_count();
+ assert_ne!(
+ num_output_partitions, 1,
+ "Use SinglePartitionShufflePartitioner for 1 output partition."
+ );
+
+ // Vectors in the scratch space will be filled with valid values
before being used, this
+ // initialization code is simply initializing the vectors to the
desired size.
+ // The initial values are not used.
+ let scratch = ScratchSpace {
+ hashes_buf: match partitioning {
+ // Allocate hashes_buf for hash and round robin partitioning.
+ // Round robin hashes all columns to achieve even,
deterministic distribution.
+ CometPartitioning::Hash(_, _) |
CometPartitioning::RoundRobin(_, _) => {
+ vec![0; batch_size]
+ }
+ _ => vec![],
+ },
+ partition_ids: vec![0; batch_size],
+ partition_row_indices: vec![0; batch_size],
+ partition_starts: vec![0; num_output_partitions + 1],
+ };
+
+ let shuffle_block_writer =
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
+
+ let partition_writers = (0..num_output_partitions)
+ .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone()))
+ .collect::<datafusion::common::Result<Vec<_>>>()?;
+
+ let reservation =
MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]"))
+ .with_can_spill(true)
+ .register(&runtime.memory_pool);
+
+ Ok(Self {
+ output_data_file,
+ output_index_file,
+ buffered_batches: vec![],
+ partition_indices: vec![vec![]; num_output_partitions],
+ partition_writers,
+ shuffle_block_writer,
+ partitioning,
+ runtime,
+ metrics,
+ scratch,
+ batch_size,
+ reservation,
+ tracing_enabled,
+ write_buffer_size,
+ })
+ }
+
+ /// Shuffles rows in input batch into corresponding partition buffer.
+ /// This function first calculates hashes for rows and then takes rows in
same
+ /// partition as a record batch which is appended into partition buffer.
+ /// This should not be called directly. Use `insert_batch` instead.
+ async fn partitioning_batch(&mut self, input: RecordBatch) ->
datafusion::common::Result<()> {
+ if input.num_rows() == 0 {
+ // skip empty batch
+ return Ok(());
+ }
+
+ if input.num_rows() > self.batch_size {
+ return Err(DataFusionError::Internal(
+ "Input batch size exceeds configured batch size. Call
`insert_batch` instead."
+ .to_string(),
+ ));
+ }
+
+ // Update data size metric
+ self.metrics.data_size.add(input.get_array_memory_size());
+
+ // NOTE: in shuffle writer exec, the output_rows metrics represents the
+ // number of rows those are written to output data file.
+ self.metrics.baseline.record_output(input.num_rows());
+
+ match &self.partitioning {
+ CometPartitioning::Hash(exprs, num_output_partitions) => {
+ let mut scratch = std::mem::take(&mut self.scratch);
+ let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
+ let mut timer = self.metrics.repart_time.timer();
+
+ // Evaluate partition expressions to get rows to apply
partitioning scheme.
+ let arrays = exprs
+ .iter()
+ .map(|expr|
expr.evaluate(&input)?.into_array(input.num_rows()))
+ .collect::<datafusion::common::Result<Vec<_>>>()?;
+
+ let num_rows = arrays[0].len();
+
+ // Use identical seed as Spark hash partitioning.
+ let hashes_buf = &mut scratch.hashes_buf[..num_rows];
+ hashes_buf.fill(42_u32);
+
+ // Generate partition ids for every row.
+ {
+ // Hash arrays and compute partition ids based on
number of partitions.
+ let partition_ids = &mut
scratch.partition_ids[..num_rows];
+ create_murmur3_hashes(&arrays, hashes_buf)?
+ .iter()
+ .enumerate()
+ .for_each(|(idx, hash)| {
+ partition_ids[idx] =
+ comet_partitioning::pmod(*hash,
*num_output_partitions) as u32;
+ });
+ }
+
+ // We now have partition ids for every input row, map that
to partition starts
+ // and partition indices to eventually right these rows to
partition buffers.
+ scratch
+
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
+
+ timer.stop();
+ Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+ &scratch.partition_starts,
+ &scratch.partition_row_indices,
+ ))
+ }?;
+
+ self.buffer_partitioned_batch_may_spill(
+ input,
+ partition_row_indices,
+ partition_starts,
+ )
+ .await?;
+ self.scratch = scratch;
+ }
+ CometPartitioning::RangePartitioning(
+ lex_ordering,
+ num_output_partitions,
+ row_converter,
+ bounds,
+ ) => {
+ let mut scratch = std::mem::take(&mut self.scratch);
+ let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
+ let mut timer = self.metrics.repart_time.timer();
+
+ // Evaluate partition expressions for values to apply
partitioning scheme on.
+ let arrays = lex_ordering
+ .iter()
+ .map(|expr|
expr.expr.evaluate(&input)?.into_array(input.num_rows()))
+ .collect::<datafusion::common::Result<Vec<_>>>()?;
+
+ let num_rows = arrays[0].len();
+
+ // Generate partition ids for every row, first by
converting the partition
+ // arrays to Rows, and then doing binary search for each
Row against the
+ // bounds Rows.
+ {
+ let row_batch =
row_converter.convert_columns(arrays.as_slice())?;
+ let partition_ids = &mut
scratch.partition_ids[..num_rows];
+
+ row_batch.iter().enumerate().for_each(|(row_idx, row)|
{
+ partition_ids[row_idx] = bounds
+ .as_slice()
+ .partition_point(|bound| bound.row() <= row)
+ as u32
+ });
+ }
+
+ // We now have partition ids for every input row, map that
to partition starts
+ // and partition indices to eventually right these rows to
partition buffers.
+ scratch
+
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
+
+ timer.stop();
+ Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+ &scratch.partition_starts,
+ &scratch.partition_row_indices,
+ ))
+ }?;
+
+ self.buffer_partitioned_batch_may_spill(
+ input,
+ partition_row_indices,
+ partition_starts,
+ )
+ .await?;
+ self.scratch = scratch;
+ }
+ CometPartitioning::RoundRobin(num_output_partitions,
max_hash_columns) => {
+ // Comet implements "round robin" as hash partitioning on
columns.
+ // This achieves the same goal as Spark's round robin (even
distribution
+ // without semantic grouping) while being deterministic for
fault tolerance.
+ //
+ // Note: This produces different partition assignments than
Spark's round robin,
+ // which sorts by UnsafeRow binary representation before
assigning partitions.
+ // However, both approaches provide even distribution and
determinism.
+ let mut scratch = std::mem::take(&mut self.scratch);
+ let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
+ let mut timer = self.metrics.repart_time.timer();
+
+ let num_rows = input.num_rows();
+
+ // Collect columns for hashing, respecting
max_hash_columns limit
+ // max_hash_columns of 0 means no limit (hash all columns)
+ // Negative values are normalized to 0 in the planner
+ let num_columns_to_hash = if *max_hash_columns == 0 {
+ input.num_columns()
+ } else {
+ (*max_hash_columns).min(input.num_columns())
+ };
+ let columns_to_hash: Vec<ArrayRef> =
(0..num_columns_to_hash)
+ .map(|i| Arc::clone(input.column(i)))
+ .collect();
+
+ // Use identical seed as Spark hash partitioning.
+ let hashes_buf = &mut scratch.hashes_buf[..num_rows];
+ hashes_buf.fill(42_u32);
+
+ // Compute hash for selected columns
+ create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
+
+ // Assign partition IDs based on hash (same as hash
partitioning)
+ let partition_ids = &mut scratch.partition_ids[..num_rows];
+ hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
+ partition_ids[idx] =
+ comet_partitioning::pmod(*hash,
*num_output_partitions) as u32;
+ });
+
+ // We now have partition ids for every input row, map that
to partition starts
+ // and partition indices to eventually write these rows to
partition buffers.
+ scratch
+
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
+
+ timer.stop();
+ Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+ &scratch.partition_starts,
+ &scratch.partition_row_indices,
+ ))
+ }?;
+
+ self.buffer_partitioned_batch_may_spill(
+ input,
+ partition_row_indices,
+ partition_starts,
+ )
+ .await?;
+ self.scratch = scratch;
+ }
+ other => {
+ // this should be unreachable as long as the validation logic
+ // in the constructor is kept up-to-date
+ return Err(DataFusionError::NotImplemented(format!(
+ "Unsupported shuffle partitioning scheme {other:?}"
+ )));
+ }
+ }
+ Ok(())
+ }
+
+ async fn buffer_partitioned_batch_may_spill(
+ &mut self,
+ input: RecordBatch,
+ partition_row_indices: &[u32],
+ partition_starts: &[u32],
+ ) -> datafusion::common::Result<()> {
+ let mut mem_growth: usize = input.get_array_memory_size();
+ let buffered_partition_idx = self.buffered_batches.len() as u32;
+ self.buffered_batches.push(input);
+
+ // partition_starts conceptually slices partition_row_indices into
smaller slices,
+ // each slice contains the indices of rows in input that will go into
the corresponding
+ // partition. The following loop iterates over the slices and put the
row indices into
+ // the indices array of the corresponding partition.
+ for (partition_id, (&start, &end)) in partition_starts
+ .iter()
+ .tuple_windows()
+ .enumerate()
+ .filter(|(_, (start, end))| start < end)
+ {
+ let row_indices = &partition_row_indices[start as usize..end as
usize];
+
+ // Put row indices for the current partition into the indices
array of that partition.
+ // This indices array will be used for calling
interleave_record_batch to produce
+ // shuffled batches.
+ let indices = &mut self.partition_indices[partition_id];
+ let before_size = indices.allocated_size();
+ indices.reserve(row_indices.len());
+ for row_idx in row_indices {
+ indices.push((buffered_partition_idx, *row_idx));
+ }
+ let after_size = indices.allocated_size();
+ mem_growth += after_size.saturating_sub(before_size);
+ }
+
+ if self.reservation.try_grow(mem_growth).is_err() {
+ self.spill()?;
+ }
+
+ Ok(())
+ }
+
+ fn shuffle_write_partition(
+ partition_iter: &mut PartitionedBatchIterator,
+ shuffle_block_writer: &mut ShuffleBlockWriter,
+ output_data: &mut BufWriter<File>,
+ encode_time: &Time,
+ write_time: &Time,
+ write_buffer_size: usize,
+ ) -> datafusion::common::Result<()> {
+ let mut buf_batch_writer =
+ BufBatchWriter::new(shuffle_block_writer, output_data,
write_buffer_size);
+ for batch in partition_iter {
+ let batch = batch?;
+ buf_batch_writer.write(&batch, encode_time, write_time)?;
+ }
+ buf_batch_writer.flush(write_time)?;
+ Ok(())
+ }
+
+ fn used(&self) -> usize {
+ self.reservation.size()
+ }
+
+ fn spilled_bytes(&self) -> usize {
+ self.metrics.spilled_bytes.value()
+ }
+
+ fn spill_count(&self) -> usize {
+ self.metrics.spill_count.value()
+ }
+
+ fn data_size(&self) -> usize {
+ self.metrics.data_size.value()
+ }
+
+ /// This function transfers the ownership of the buffered batches and
partition indices from the
+ /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned
PartitionedBatches struct
+ /// can be used to produce shuffled batches.
+ fn partitioned_batches(&mut self) -> PartitionedBatchesProducer {
+ let num_output_partitions = self.partition_indices.len();
+ let buffered_batches = std::mem::take(&mut self.buffered_batches);
+ // let indices = std::mem::take(&mut self.partition_indices);
+ let indices = std::mem::replace(
+ &mut self.partition_indices,
+ vec![vec![]; num_output_partitions],
+ );
+ PartitionedBatchesProducer::new(buffered_batches, indices,
self.batch_size)
+ }
+
+ pub(crate) fn spill(&mut self) -> datafusion::common::Result<()> {
+ log::info!(
+ "ShuffleRepartitioner spilling shuffle data of {} to disk while
inserting ({} time(s) so far)",
+ self.used(),
+ self.spill_count()
+ );
+
+ // we could always get a chance to free some memory as long as we are
holding some
+ if self.buffered_batches.is_empty() {
+ return Ok(());
+ }
+
+ with_trace("shuffle_spill", self.tracing_enabled, || {
+ let num_output_partitions = self.partition_writers.len();
+ let mut partitioned_batches = self.partitioned_batches();
+ let mut spilled_bytes = 0;
+
+ for partition_id in 0..num_output_partitions {
+ let partition_writer = &mut
self.partition_writers[partition_id];
+ let mut iter = partitioned_batches.produce(partition_id);
+ spilled_bytes += partition_writer.spill(
+ &mut iter,
+ &self.runtime,
+ &self.metrics,
+ self.write_buffer_size,
+ )?;
+ }
+
+ self.reservation.free();
+ self.metrics.spill_count.add(1);
+ self.metrics.spilled_bytes.add(spilled_bytes);
+ Ok(())
+ })
+ }
+
+ #[cfg(test)]
+ pub(crate) fn partition_writers(&self) -> &[PartitionWriter] {
+ &self.partition_writers
+ }
+}
+
+#[async_trait::async_trait]
+impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
+ /// Shuffles rows in input batch into corresponding partition buffer.
+ /// This function will slice input batch according to configured batch
size and then
+ /// shuffle rows into corresponding partition buffer.
+ async fn insert_batch(&mut self, batch: RecordBatch) ->
datafusion::common::Result<()> {
+ with_trace_async("shuffle_insert_batch", self.tracing_enabled, ||
async {
+ let start_time = Instant::now();
+ let mut start = 0;
+ while start < batch.num_rows() {
+ let end = (start + self.batch_size).min(batch.num_rows());
+ let batch = batch.slice(start, end - start);
+ self.partitioning_batch(batch).await?;
+ start = end;
+ }
+ self.metrics.input_batches.add(1);
+ self.metrics
+ .baseline
+ .elapsed_compute()
+ .add_duration(start_time.elapsed());
+ Ok(())
+ })
+ .await
+ }
+
+ /// Writes buffered shuffled record batches into Arrow IPC bytes.
+ fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
+ with_trace("shuffle_write", self.tracing_enabled, || {
+ let start_time = Instant::now();
+
+ let mut partitioned_batches = self.partitioned_batches();
+ let num_output_partitions = self.partition_indices.len();
+ let mut offsets = vec![0; num_output_partitions + 1];
+
+ let data_file = self.output_data_file.clone();
+ let index_file = self.output_index_file.clone();
+
+ let output_data = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(data_file)
+ .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
+
+ let mut output_data = BufWriter::new(output_data);
+
+ #[allow(clippy::needless_range_loop)]
+ for i in 0..num_output_partitions {
+ offsets[i] = output_data.stream_position()?;
+
+ // if we wrote a spill file for this partition then copy the
+ // contents into the shuffle file
+ if let Some(spill_path) = self.partition_writers[i].path() {
+ let mut spill_file =
BufReader::new(File::open(spill_path)?);
+ let mut write_timer = self.metrics.write_time.timer();
+ std::io::copy(&mut spill_file, &mut output_data)?;
+ write_timer.stop();
+ }
+
+ // Write in memory batches to output data file
+ let mut partition_iter = partitioned_batches.produce(i);
+ Self::shuffle_write_partition(
+ &mut partition_iter,
+ &mut self.shuffle_block_writer,
+ &mut output_data,
+ &self.metrics.encode_time,
+ &self.metrics.write_time,
+ self.write_buffer_size,
+ )?;
+ }
+
+ let mut write_timer = self.metrics.write_time.timer();
+ output_data.flush()?;
+ write_timer.stop();
+
+ // add one extra offset at last to ease partition length
computation
+ offsets[num_output_partitions] = output_data.stream_position()?;
+
+ let mut write_timer = self.metrics.write_time.timer();
+ let mut output_index =
+ BufWriter::new(File::create(index_file).map_err(|e| {
+ DataFusionError::Execution(format!("shuffle write error:
{e:?}"))
+ })?);
+ for offset in offsets {
+ output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
+ }
+ output_index.flush()?;
+ write_timer.stop();
+
+ self.metrics
+ .baseline
+ .elapsed_compute()
+ .add_duration(start_time.elapsed());
+
+ Ok(())
+ })
+ }
+}
+
+impl Debug for MultiPartitionShuffleRepartitioner {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ShuffleRepartitioner")
+ .field("memory_used", &self.used())
+ .field("spilled_bytes", &self.spilled_bytes())
+ .field("spilled_count", &self.spill_count())
+ .field("data_size", &self.data_size())
+ .finish()
+ }
+}
diff --git
a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
new file mode 100644
index 000000000..77010938c
--- /dev/null
+++
b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::RecordBatch;
+use arrow::compute::interleave_record_batch;
+use datafusion::common::DataFusionError;
+
+/// A helper struct to produce shuffled batches.
+/// This struct takes ownership of the buffered batches and partition indices
from the
+/// ShuffleRepartitioner, and provides an iterator over the batches in the
specified partitions.
+pub(super) struct PartitionedBatchesProducer {
+ buffered_batches: Vec<RecordBatch>,
+ partition_indices: Vec<Vec<(u32, u32)>>,
+ batch_size: usize,
+}
+
+impl PartitionedBatchesProducer {
+ pub(super) fn new(
+ buffered_batches: Vec<RecordBatch>,
+ indices: Vec<Vec<(u32, u32)>>,
+ batch_size: usize,
+ ) -> Self {
+ Self {
+ partition_indices: indices,
+ buffered_batches,
+ batch_size,
+ }
+ }
+
+ pub(super) fn produce(&mut self, partition_id: usize) ->
PartitionedBatchIterator<'_> {
+ PartitionedBatchIterator::new(
+ &self.partition_indices[partition_id],
+ &self.buffered_batches,
+ self.batch_size,
+ )
+ }
+}
+
+pub(crate) struct PartitionedBatchIterator<'a> {
+ record_batches: Vec<&'a RecordBatch>,
+ batch_size: usize,
+ indices: Vec<(usize, usize)>,
+ pos: usize,
+}
+
+impl<'a> PartitionedBatchIterator<'a> {
+ fn new(
+ indices: &'a [(u32, u32)],
+ buffered_batches: &'a [RecordBatch],
+ batch_size: usize,
+ ) -> Self {
+ if indices.is_empty() {
+ // Avoid unnecessary allocations when the partition is empty
+ return Self {
+ record_batches: vec![],
+ batch_size,
+ indices: vec![],
+ pos: 0,
+ };
+ }
+ let record_batches = buffered_batches.iter().collect::<Vec<_>>();
+ let current_indices = indices
+ .iter()
+ .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize))
+ .collect::<Vec<_>>();
+ Self {
+ record_batches,
+ batch_size,
+ indices: current_indices,
+ pos: 0,
+ }
+ }
+}
+
+impl Iterator for PartitionedBatchIterator<'_> {
+ type Item = datafusion::common::Result<RecordBatch>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.pos >= self.indices.len() {
+ return None;
+ }
+
+ let indices_end = std::cmp::min(self.pos + self.batch_size,
self.indices.len());
+ let indices = &self.indices[self.pos..indices_end];
+ match interleave_record_batch(&self.record_batches, indices) {
+ Ok(batch) => {
+ self.pos = indices_end;
+ Some(Ok(batch))
+ }
+ Err(e) => Some(Err(DataFusionError::ArrowError(
+ Box::from(e),
+ Some(DataFusionError::get_back_trace()),
+ ))),
+ }
+ }
+}
diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs
b/native/core/src/execution/shuffle/partitioners/single_partition.rs
new file mode 100644
index 000000000..4ee5bd2bf
--- /dev/null
+++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
+use crate::execution::shuffle::partitioners::ShufflePartitioner;
+use crate::execution::shuffle::writers::BufBatchWriter;
+use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use datafusion::common::DataFusionError;
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Write};
+use tokio::time::Instant;
+
+/// A partitioner that writes all shuffle data to a single file and a single
index file
+pub(crate) struct SinglePartitionShufflePartitioner {
+ // output_data_file: File,
+ output_data_writer: BufBatchWriter<ShuffleBlockWriter, File>,
+ output_index_path: String,
+ /// Batches that are smaller than the batch size and to be concatenated
+ buffered_batches: Vec<RecordBatch>,
+ /// Number of rows in the concatenating batches
+ num_buffered_rows: usize,
+ /// Metrics for the repartitioner
+ metrics: ShufflePartitionerMetrics,
+ /// The configured batch size
+ batch_size: usize,
+}
+
+impl SinglePartitionShufflePartitioner {
+ pub(crate) fn try_new(
+ output_data_path: String,
+ output_index_path: String,
+ schema: SchemaRef,
+ metrics: ShufflePartitionerMetrics,
+ batch_size: usize,
+ codec: CompressionCodec,
+ write_buffer_size: usize,
+ ) -> datafusion::common::Result<Self> {
+ let shuffle_block_writer =
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
+
+ let output_data_file = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(output_data_path)?;
+
+ let output_data_writer =
+ BufBatchWriter::new(shuffle_block_writer, output_data_file,
write_buffer_size);
+
+ Ok(Self {
+ output_data_writer,
+ output_index_path,
+ buffered_batches: vec![],
+ num_buffered_rows: 0,
+ metrics,
+ batch_size,
+ })
+ }
+
+ /// Add a batch to the buffer of the partitioner, these buffered batches
will be concatenated
+ /// and written to the output data file when the number of rows in the
buffer reaches the batch size.
+ fn add_buffered_batch(&mut self, batch: RecordBatch) {
+ self.num_buffered_rows += batch.num_rows();
+ self.buffered_batches.push(batch);
+ }
+
+ /// Consumes buffered batches and return a concatenated batch if successful
+ fn concat_buffered_batches(&mut self) ->
datafusion::common::Result<Option<RecordBatch>> {
+ if self.buffered_batches.is_empty() {
+ Ok(None)
+ } else if self.buffered_batches.len() == 1 {
+ let batch = self.buffered_batches.remove(0);
+ self.num_buffered_rows = 0;
+ Ok(Some(batch))
+ } else {
+ let schema = &self.buffered_batches[0].schema();
+ match arrow::compute::concat_batches(schema,
self.buffered_batches.iter()) {
+ Ok(concatenated) => {
+ self.buffered_batches.clear();
+ self.num_buffered_rows = 0;
+ Ok(Some(concatenated))
+ }
+ Err(e) => Err(DataFusionError::ArrowError(
+ Box::from(e),
+ Some(DataFusionError::get_back_trace()),
+ )),
+ }
+ }
+ }
+}
+
+#[async_trait::async_trait]
+impl ShufflePartitioner for SinglePartitionShufflePartitioner {
+ async fn insert_batch(&mut self, batch: RecordBatch) ->
datafusion::common::Result<()> {
+ let start_time = Instant::now();
+ let num_rows = batch.num_rows();
+
+ if num_rows > 0 {
+ self.metrics.data_size.add(batch.get_array_memory_size());
+ self.metrics.baseline.record_output(num_rows);
+
+ if num_rows >= self.batch_size || num_rows +
self.num_buffered_rows > self.batch_size {
+ let concatenated_batch = self.concat_buffered_batches()?;
+
+ // Write the concatenated buffered batch
+ if let Some(batch) = concatenated_batch {
+ self.output_data_writer.write(
+ &batch,
+ &self.metrics.encode_time,
+ &self.metrics.write_time,
+ )?;
+ }
+
+ if num_rows >= self.batch_size {
+ // Write the new batch
+ self.output_data_writer.write(
+ &batch,
+ &self.metrics.encode_time,
+ &self.metrics.write_time,
+ )?;
+ } else {
+ // Add the new batch to the buffer
+ self.add_buffered_batch(batch);
+ }
+ } else {
+ self.add_buffered_batch(batch);
+ }
+ }
+
+ self.metrics.input_batches.add(1);
+ self.metrics
+ .baseline
+ .elapsed_compute()
+ .add_duration(start_time.elapsed());
+ Ok(())
+ }
+
+ fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
+ let start_time = Instant::now();
+ let concatenated_batch = self.concat_buffered_batches()?;
+
+ // Write the concatenated buffered batch
+ if let Some(batch) = concatenated_batch {
+ self.output_data_writer.write(
+ &batch,
+ &self.metrics.encode_time,
+ &self.metrics.write_time,
+ )?;
+ }
+ self.output_data_writer.flush(&self.metrics.write_time)?;
+
+ // Write index file. It should only contain 2 entries: 0 and the total
number of bytes written
+ let index_file = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(self.output_index_path.clone())
+ .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
+ let mut index_buf_writer = BufWriter::new(index_file);
+ let data_file_length =
self.output_data_writer.writer_stream_position()?;
+ for offset in [0, data_file_length] {
+ index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
+ }
+ index_buf_writer.flush()?;
+
+ self.metrics
+ .baseline
+ .elapsed_compute()
+ .add_duration(start_time.elapsed());
+ Ok(())
+ }
+}
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 669a6df97..a7e689a69 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -18,43 +18,34 @@
//! Defines the External shuffle repartition plan.
use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
-use crate::execution::shuffle::{CometPartitioning, CompressionCodec,
ShuffleBlockWriter};
-use crate::execution::tracing::{with_trace, with_trace_async};
-use arrow::compute::interleave_record_batch;
+use crate::execution::shuffle::partitioners::{
+ MultiPartitionShuffleRepartitioner, ShufflePartitioner,
SinglePartitionShufflePartitioner,
+};
+use crate::execution::shuffle::{CometPartitioning, CompressionCodec};
+use crate::execution::tracing::with_trace_async;
use async_trait::async_trait;
use datafusion::common::exec_datafusion_err;
-use datafusion::common::utils::proxy::VecAllocExt;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::EmptyRecordBatchStream;
use datafusion::{
- arrow::{array::*, datatypes::SchemaRef, error::ArrowError,
record_batch::RecordBatch},
- error::{DataFusionError, Result},
- execution::{
- context::TaskContext,
- memory_pool::{MemoryConsumer, MemoryReservation},
- runtime_env::RuntimeEnv,
- },
+ arrow::{datatypes::SchemaRef, error::ArrowError},
+ error::Result,
+ execution::context::TaskContext,
physical_plan::{
- metrics::{ExecutionPlanMetricsSet, MetricsSet, Time},
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
SendableRecordBatchStream,
Statistics,
},
};
-use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
-use itertools::Itertools;
use std::{
any::Any,
fmt,
fmt::{Debug, Formatter},
- fs::{File, OpenOptions},
- io::{BufReader, BufWriter, Seek, Write},
sync::Arc,
};
-use tokio::time::Instant;
/// The shuffle writer operator maps each input partition to M output
partitions based on a
/// partitioning scheme. No guarantees are made about the order of the
resulting partitions.
@@ -271,872 +262,24 @@ async fn external_shuffle(
.await
}
-#[async_trait::async_trait]
-trait ShufflePartitioner: Send + Sync {
- /// Insert a batch into the partitioner
- async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
- /// Write shuffle data and shuffle index file to disk
- fn shuffle_write(&mut self) -> Result<()>;
-}
-
-/// A partitioner that uses a hash function to partition data into multiple
partitions
-struct MultiPartitionShuffleRepartitioner {
- output_data_file: String,
- output_index_file: String,
- buffered_batches: Vec<RecordBatch>,
- partition_indices: Vec<Vec<(u32, u32)>>,
- partition_writers: Vec<PartitionWriter>,
- shuffle_block_writer: ShuffleBlockWriter,
- /// Partitioning scheme to use
- partitioning: CometPartitioning,
- runtime: Arc<RuntimeEnv>,
- metrics: ShufflePartitionerMetrics,
- /// Reused scratch space for computing partition indices
- scratch: ScratchSpace,
- /// The configured batch size
- batch_size: usize,
- /// Reservation for repartitioning
- reservation: MemoryReservation,
- tracing_enabled: bool,
- /// Size of the write buffer in bytes
- write_buffer_size: usize,
-}
-
-#[derive(Default)]
-struct ScratchSpace {
- /// Hashes for each row in the current batch.
- hashes_buf: Vec<u32>,
- /// Partition ids for each row in the current batch.
- partition_ids: Vec<u32>,
- /// The row indices of the rows in each partition. This array is
conceptually divided into
- /// partitions, where each partition contains the row indices of the rows
in that partition.
- /// The length of this array is the same as the number of rows in the
batch.
- partition_row_indices: Vec<u32>,
- /// The start indices of partitions in partition_row_indices.
partition_starts[K] and
- /// partition_starts[K + 1] are the start and end indices of partition K
in partition_row_indices.
- /// The length of this array is 1 + the number of partitions.
- partition_starts: Vec<u32>,
-}
-
-impl ScratchSpace {
- fn map_partition_ids_to_starts_and_indices(
- &mut self,
- num_output_partitions: usize,
- num_rows: usize,
- ) {
- let partition_ids = &mut self.partition_ids[..num_rows];
-
- // count each partition size, while leaving the last extra element as 0
- let partition_counters = &mut self.partition_starts;
- partition_counters.resize(num_output_partitions + 1, 0);
- partition_counters.fill(0);
- partition_ids
- .iter()
- .for_each(|partition_id| partition_counters[*partition_id as
usize] += 1);
-
- // accumulate partition counters into partition ends
- // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7]
- let partition_ends = partition_counters;
- let mut accum = 0;
- partition_ends.iter_mut().for_each(|v| {
- *v += accum;
- accum = *v;
- });
-
- // calculate partition row indices and partition starts
- // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the
following partition_row_indices
- // and partition_starts arrays:
- //
- // partition_row_indices: [6, 1, 2, 3, 4, 5, 0]
- // partition_starts: [0, 1, 4, 6, 7]
- //
- // partition_starts conceptually splits partition_row_indices into
smaller slices.
- // Each slice
partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the
- // row indices of the input batch that are partitioned into partition
K. For example,
- // first partition 0 has one row index [6], partition 1 has row
indices [1, 2, 3], etc.
- let partition_row_indices = &mut self.partition_row_indices;
- partition_row_indices.resize(num_rows, 0);
- for (index, partition_id) in partition_ids.iter().enumerate().rev() {
- partition_ends[*partition_id as usize] -= 1;
- let end = partition_ends[*partition_id as usize];
- partition_row_indices[end as usize] = index as u32;
- }
-
- // after calculating, partition ends become partition starts
- }
-}
-
-impl MultiPartitionShuffleRepartitioner {
- #[allow(clippy::too_many_arguments)]
- pub fn try_new(
- partition: usize,
- output_data_file: String,
- output_index_file: String,
- schema: SchemaRef,
- partitioning: CometPartitioning,
- metrics: ShufflePartitionerMetrics,
- runtime: Arc<RuntimeEnv>,
- batch_size: usize,
- codec: CompressionCodec,
- tracing_enabled: bool,
- write_buffer_size: usize,
- ) -> Result<Self> {
- let num_output_partitions = partitioning.partition_count();
- assert_ne!(
- num_output_partitions, 1,
- "Use SinglePartitionShufflePartitioner for 1 output partition."
- );
-
- // Vectors in the scratch space will be filled with valid values
before being used, this
- // initialization code is simply initializing the vectors to the
desired size.
- // The initial values are not used.
- let scratch = ScratchSpace {
- hashes_buf: match partitioning {
- // Allocate hashes_buf for hash and round robin partitioning.
- // Round robin hashes all columns to achieve even,
deterministic distribution.
- CometPartitioning::Hash(_, _) |
CometPartitioning::RoundRobin(_, _) => {
- vec![0; batch_size]
- }
- _ => vec![],
- },
- partition_ids: vec![0; batch_size],
- partition_row_indices: vec![0; batch_size],
- partition_starts: vec![0; num_output_partitions + 1],
- };
-
- let shuffle_block_writer =
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
-
- let partition_writers = (0..num_output_partitions)
- .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone()))
- .collect::<Result<Vec<_>>>()?;
-
- let reservation =
MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]"))
- .with_can_spill(true)
- .register(&runtime.memory_pool);
-
- Ok(Self {
- output_data_file,
- output_index_file,
- buffered_batches: vec![],
- partition_indices: vec![vec![]; num_output_partitions],
- partition_writers,
- shuffle_block_writer,
- partitioning,
- runtime,
- metrics,
- scratch,
- batch_size,
- reservation,
- tracing_enabled,
- write_buffer_size,
- })
- }
-
- /// Shuffles rows in input batch into corresponding partition buffer.
- /// This function first calculates hashes for rows and then takes rows in
same
- /// partition as a record batch which is appended into partition buffer.
- /// This should not be called directly. Use `insert_batch` instead.
- async fn partitioning_batch(&mut self, input: RecordBatch) -> Result<()> {
- if input.num_rows() == 0 {
- // skip empty batch
- return Ok(());
- }
-
- if input.num_rows() > self.batch_size {
- return Err(DataFusionError::Internal(
- "Input batch size exceeds configured batch size. Call
`insert_batch` instead."
- .to_string(),
- ));
- }
-
- // Update data size metric
- self.metrics.data_size.add(input.get_array_memory_size());
-
- // NOTE: in shuffle writer exec, the output_rows metrics represents the
- // number of rows those are written to output data file.
- self.metrics.baseline.record_output(input.num_rows());
-
- match &self.partitioning {
- CometPartitioning::Hash(exprs, num_output_partitions) => {
- let mut scratch = std::mem::take(&mut self.scratch);
- let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
- let mut timer = self.metrics.repart_time.timer();
-
- // Evaluate partition expressions to get rows to apply
partitioning scheme.
- let arrays = exprs
- .iter()
- .map(|expr|
expr.evaluate(&input)?.into_array(input.num_rows()))
- .collect::<Result<Vec<_>>>()?;
-
- let num_rows = arrays[0].len();
-
- // Use identical seed as Spark hash partitioning.
- let hashes_buf = &mut scratch.hashes_buf[..num_rows];
- hashes_buf.fill(42_u32);
-
- // Generate partition ids for every row.
- {
- // Hash arrays and compute partition ids based on
number of partitions.
- let partition_ids = &mut
scratch.partition_ids[..num_rows];
- create_murmur3_hashes(&arrays, hashes_buf)?
- .iter()
- .enumerate()
- .for_each(|(idx, hash)| {
- partition_ids[idx] = pmod(*hash,
*num_output_partitions) as u32;
- });
- }
-
- // We now have partition ids for every input row, map that
to partition starts
- // and partition indices to eventually right these rows to
partition buffers.
- scratch
-
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
-
- timer.stop();
- Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
- &scratch.partition_starts,
- &scratch.partition_row_indices,
- ))
- }?;
-
- self.buffer_partitioned_batch_may_spill(
- input,
- partition_row_indices,
- partition_starts,
- )
- .await?;
- self.scratch = scratch;
- }
- CometPartitioning::RangePartitioning(
- lex_ordering,
- num_output_partitions,
- row_converter,
- bounds,
- ) => {
- let mut scratch = std::mem::take(&mut self.scratch);
- let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
- let mut timer = self.metrics.repart_time.timer();
-
- // Evaluate partition expressions for values to apply
partitioning scheme on.
- let arrays = lex_ordering
- .iter()
- .map(|expr|
expr.expr.evaluate(&input)?.into_array(input.num_rows()))
- .collect::<Result<Vec<_>>>()?;
-
- let num_rows = arrays[0].len();
-
- // Generate partition ids for every row, first by
converting the partition
- // arrays to Rows, and then doing binary search for each
Row against the
- // bounds Rows.
- {
- let row_batch =
row_converter.convert_columns(arrays.as_slice())?;
- let partition_ids = &mut
scratch.partition_ids[..num_rows];
-
- row_batch.iter().enumerate().for_each(|(row_idx, row)|
{
- partition_ids[row_idx] = bounds
- .as_slice()
- .partition_point(|bound| bound.row() <= row)
- as u32
- });
- }
-
- // We now have partition ids for every input row, map that
to partition starts
- // and partition indices to eventually right these rows to
partition buffers.
- scratch
-
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
-
- timer.stop();
- Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
- &scratch.partition_starts,
- &scratch.partition_row_indices,
- ))
- }?;
-
- self.buffer_partitioned_batch_may_spill(
- input,
- partition_row_indices,
- partition_starts,
- )
- .await?;
- self.scratch = scratch;
- }
- CometPartitioning::RoundRobin(num_output_partitions,
max_hash_columns) => {
- // Comet implements "round robin" as hash partitioning on
columns.
- // This achieves the same goal as Spark's round robin (even
distribution
- // without semantic grouping) while being deterministic for
fault tolerance.
- //
- // Note: This produces different partition assignments than
Spark's round robin,
- // which sorts by UnsafeRow binary representation before
assigning partitions.
- // However, both approaches provide even distribution and
determinism.
- let mut scratch = std::mem::take(&mut self.scratch);
- let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
- let mut timer = self.metrics.repart_time.timer();
-
- let num_rows = input.num_rows();
-
- // Collect columns for hashing, respecting
max_hash_columns limit
- // max_hash_columns of 0 means no limit (hash all columns)
- // Negative values are normalized to 0 in the planner
- let num_columns_to_hash = if *max_hash_columns == 0 {
- input.num_columns()
- } else {
- (*max_hash_columns).min(input.num_columns())
- };
- let columns_to_hash: Vec<ArrayRef> =
(0..num_columns_to_hash)
- .map(|i| Arc::clone(input.column(i)))
- .collect();
-
- // Use identical seed as Spark hash partitioning.
- let hashes_buf = &mut scratch.hashes_buf[..num_rows];
- hashes_buf.fill(42_u32);
-
- // Compute hash for selected columns
- create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
-
- // Assign partition IDs based on hash (same as hash
partitioning)
- let partition_ids = &mut scratch.partition_ids[..num_rows];
- hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
- partition_ids[idx] = pmod(*hash,
*num_output_partitions) as u32;
- });
-
- // We now have partition ids for every input row, map that
to partition starts
- // and partition indices to eventually write these rows to
partition buffers.
- scratch
-
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);
-
- timer.stop();
- Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
- &scratch.partition_starts,
- &scratch.partition_row_indices,
- ))
- }?;
-
- self.buffer_partitioned_batch_may_spill(
- input,
- partition_row_indices,
- partition_starts,
- )
- .await?;
- self.scratch = scratch;
- }
- other => {
- // this should be unreachable as long as the validation logic
- // in the constructor is kept up-to-date
- return Err(DataFusionError::NotImplemented(format!(
- "Unsupported shuffle partitioning scheme {other:?}"
- )));
- }
- }
- Ok(())
- }
-
- async fn buffer_partitioned_batch_may_spill(
- &mut self,
- input: RecordBatch,
- partition_row_indices: &[u32],
- partition_starts: &[u32],
- ) -> Result<()> {
- let mut mem_growth: usize = input.get_array_memory_size();
- let buffered_partition_idx = self.buffered_batches.len() as u32;
- self.buffered_batches.push(input);
-
- // partition_starts conceptually slices partition_row_indices into
smaller slices,
- // each slice contains the indices of rows in input that will go into
the corresponding
- // partition. The following loop iterates over the slices and put the
row indices into
- // the indices array of the corresponding partition.
- for (partition_id, (&start, &end)) in partition_starts
- .iter()
- .tuple_windows()
- .enumerate()
- .filter(|(_, (start, end))| start < end)
- {
- let row_indices = &partition_row_indices[start as usize..end as
usize];
-
- // Put row indices for the current partition into the indices
array of that partition.
- // This indices array will be used for calling
interleave_record_batch to produce
- // shuffled batches.
- let indices = &mut self.partition_indices[partition_id];
- let before_size = indices.allocated_size();
- indices.reserve(row_indices.len());
- for row_idx in row_indices {
- indices.push((buffered_partition_idx, *row_idx));
- }
- let after_size = indices.allocated_size();
- mem_growth += after_size.saturating_sub(before_size);
- }
-
- if self.reservation.try_grow(mem_growth).is_err() {
- self.spill()?;
- }
-
- Ok(())
- }
-
- fn shuffle_write_partition(
- partition_iter: &mut PartitionedBatchIterator,
- shuffle_block_writer: &mut ShuffleBlockWriter,
- output_data: &mut BufWriter<File>,
- encode_time: &Time,
- write_time: &Time,
- write_buffer_size: usize,
- ) -> Result<()> {
- let mut buf_batch_writer =
- BufBatchWriter::new(shuffle_block_writer, output_data,
write_buffer_size);
- for batch in partition_iter {
- let batch = batch?;
- buf_batch_writer.write(&batch, encode_time, write_time)?;
- }
- buf_batch_writer.flush(write_time)?;
- Ok(())
- }
-
- fn used(&self) -> usize {
- self.reservation.size()
- }
-
- fn spilled_bytes(&self) -> usize {
- self.metrics.spilled_bytes.value()
- }
-
- fn spill_count(&self) -> usize {
- self.metrics.spill_count.value()
- }
-
- fn data_size(&self) -> usize {
- self.metrics.data_size.value()
- }
-
- /// This function transfers the ownership of the buffered batches and
partition indices from the
- /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned
PartitionedBatches struct
- /// can be used to produce shuffled batches.
- fn partitioned_batches(&mut self) -> PartitionedBatchesProducer {
- let num_output_partitions = self.partition_indices.len();
- let buffered_batches = std::mem::take(&mut self.buffered_batches);
- // let indices = std::mem::take(&mut self.partition_indices);
- let indices = std::mem::replace(
- &mut self.partition_indices,
- vec![vec![]; num_output_partitions],
- );
- PartitionedBatchesProducer::new(buffered_batches, indices,
self.batch_size)
- }
-
- fn spill(&mut self) -> Result<()> {
- log::info!(
- "ShuffleRepartitioner spilling shuffle data of {} to disk while
inserting ({} time(s) so far)",
- self.used(),
- self.spill_count()
- );
-
- // we could always get a chance to free some memory as long as we are
holding some
- if self.buffered_batches.is_empty() {
- return Ok(());
- }
-
- with_trace("shuffle_spill", self.tracing_enabled, || {
- let num_output_partitions = self.partition_writers.len();
- let mut partitioned_batches = self.partitioned_batches();
- let mut spilled_bytes = 0;
-
- for partition_id in 0..num_output_partitions {
- let partition_writer = &mut
self.partition_writers[partition_id];
- let mut iter = partitioned_batches.produce(partition_id);
- spilled_bytes += partition_writer.spill(
- &mut iter,
- &self.runtime,
- &self.metrics,
- self.write_buffer_size,
- )?;
- }
-
- self.reservation.free();
- self.metrics.spill_count.add(1);
- self.metrics.spilled_bytes.add(spilled_bytes);
- Ok(())
- })
- }
-}
-
-#[async_trait::async_trait]
-impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
- /// Shuffles rows in input batch into corresponding partition buffer.
- /// This function will slice input batch according to configured batch
size and then
- /// shuffle rows into corresponding partition buffer.
- async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
- with_trace_async("shuffle_insert_batch", self.tracing_enabled, ||
async {
- let start_time = Instant::now();
- let mut start = 0;
- while start < batch.num_rows() {
- let end = (start + self.batch_size).min(batch.num_rows());
- let batch = batch.slice(start, end - start);
- self.partitioning_batch(batch).await?;
- start = end;
- }
- self.metrics.input_batches.add(1);
- self.metrics
- .baseline
- .elapsed_compute()
- .add_duration(start_time.elapsed());
- Ok(())
- })
- .await
- }
-
- /// Writes buffered shuffled record batches into Arrow IPC bytes.
- fn shuffle_write(&mut self) -> Result<()> {
- with_trace("shuffle_write", self.tracing_enabled, || {
- let start_time = Instant::now();
-
- let mut partitioned_batches = self.partitioned_batches();
- let num_output_partitions = self.partition_indices.len();
- let mut offsets = vec![0; num_output_partitions + 1];
-
- let data_file = self.output_data_file.clone();
- let index_file = self.output_index_file.clone();
-
- let output_data = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(data_file)
- .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
-
- let mut output_data = BufWriter::new(output_data);
-
- #[allow(clippy::needless_range_loop)]
- for i in 0..num_output_partitions {
- offsets[i] = output_data.stream_position()?;
-
- // if we wrote a spill file for this partition then copy the
- // contents into the shuffle file
- if let Some(spill_path) = self.partition_writers[i].path() {
- let mut spill_file =
BufReader::new(File::open(spill_path)?);
- let mut write_timer = self.metrics.write_time.timer();
- std::io::copy(&mut spill_file, &mut output_data)?;
- write_timer.stop();
- }
-
- // Write in memory batches to output data file
- let mut partition_iter = partitioned_batches.produce(i);
- Self::shuffle_write_partition(
- &mut partition_iter,
- &mut self.shuffle_block_writer,
- &mut output_data,
- &self.metrics.encode_time,
- &self.metrics.write_time,
- self.write_buffer_size,
- )?;
- }
-
- let mut write_timer = self.metrics.write_time.timer();
- output_data.flush()?;
- write_timer.stop();
-
- // add one extra offset at last to ease partition length
computation
- offsets[num_output_partitions] = output_data.stream_position()?;
-
- let mut write_timer = self.metrics.write_time.timer();
- let mut output_index =
- BufWriter::new(File::create(index_file).map_err(|e| {
- DataFusionError::Execution(format!("shuffle write error:
{e:?}"))
- })?);
- for offset in offsets {
- output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
- }
- output_index.flush()?;
- write_timer.stop();
-
- self.metrics
- .baseline
- .elapsed_compute()
- .add_duration(start_time.elapsed());
-
- Ok(())
- })
- }
-}
-
-impl Debug for MultiPartitionShuffleRepartitioner {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- f.debug_struct("ShuffleRepartitioner")
- .field("memory_used", &self.used())
- .field("spilled_bytes", &self.spilled_bytes())
- .field("spilled_count", &self.spill_count())
- .field("data_size", &self.data_size())
- .finish()
- }
-}
-
-/// A partitioner that writes all shuffle data to a single file and a single
index file
-struct SinglePartitionShufflePartitioner {
- // output_data_file: File,
- output_data_writer: BufBatchWriter<ShuffleBlockWriter, File>,
- output_index_path: String,
- /// Batches that are smaller than the batch size and to be concatenated
- buffered_batches: Vec<RecordBatch>,
- /// Number of rows in the concatenating batches
- num_buffered_rows: usize,
- /// Metrics for the repartitioner
- metrics: ShufflePartitionerMetrics,
- /// The configured batch size
- batch_size: usize,
-}
-
-impl SinglePartitionShufflePartitioner {
- fn try_new(
- output_data_path: String,
- output_index_path: String,
- schema: SchemaRef,
- metrics: ShufflePartitionerMetrics,
- batch_size: usize,
- codec: CompressionCodec,
- write_buffer_size: usize,
- ) -> Result<Self> {
- let shuffle_block_writer =
ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
-
- let output_data_file = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(output_data_path)?;
-
- let output_data_writer =
- BufBatchWriter::new(shuffle_block_writer, output_data_file,
write_buffer_size);
-
- Ok(Self {
- output_data_writer,
- output_index_path,
- buffered_batches: vec![],
- num_buffered_rows: 0,
- metrics,
- batch_size,
- })
- }
-
- /// Add a batch to the buffer of the partitioner, these buffered batches
will be concatenated
- /// and written to the output data file when the number of rows in the
buffer reaches the batch size.
- fn add_buffered_batch(&mut self, batch: RecordBatch) {
- self.num_buffered_rows += batch.num_rows();
- self.buffered_batches.push(batch);
- }
-
- /// Consumes buffered batches and return a concatenated batch if successful
- fn concat_buffered_batches(&mut self) -> Result<Option<RecordBatch>> {
- if self.buffered_batches.is_empty() {
- Ok(None)
- } else if self.buffered_batches.len() == 1 {
- let batch = self.buffered_batches.remove(0);
- self.num_buffered_rows = 0;
- Ok(Some(batch))
- } else {
- let schema = &self.buffered_batches[0].schema();
- match arrow::compute::concat_batches(schema,
self.buffered_batches.iter()) {
- Ok(concatenated) => {
- self.buffered_batches.clear();
- self.num_buffered_rows = 0;
- Ok(Some(concatenated))
- }
- Err(e) => Err(DataFusionError::ArrowError(
- Box::from(e),
- Some(DataFusionError::get_back_trace()),
- )),
- }
- }
- }
-}
-
-#[async_trait::async_trait]
-impl ShufflePartitioner for SinglePartitionShufflePartitioner {
- async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
- let start_time = Instant::now();
- let num_rows = batch.num_rows();
-
- if num_rows > 0 {
- self.metrics.data_size.add(batch.get_array_memory_size());
- self.metrics.baseline.record_output(num_rows);
-
- if num_rows >= self.batch_size || num_rows +
self.num_buffered_rows > self.batch_size {
- let concatenated_batch = self.concat_buffered_batches()?;
-
- // Write the concatenated buffered batch
- if let Some(batch) = concatenated_batch {
- self.output_data_writer.write(
- &batch,
- &self.metrics.encode_time,
- &self.metrics.write_time,
- )?;
- }
-
- if num_rows >= self.batch_size {
- // Write the new batch
- self.output_data_writer.write(
- &batch,
- &self.metrics.encode_time,
- &self.metrics.write_time,
- )?;
- } else {
- // Add the new batch to the buffer
- self.add_buffered_batch(batch);
- }
- } else {
- self.add_buffered_batch(batch);
- }
- }
-
- self.metrics.input_batches.add(1);
- self.metrics
- .baseline
- .elapsed_compute()
- .add_duration(start_time.elapsed());
- Ok(())
- }
-
- fn shuffle_write(&mut self) -> Result<()> {
- let start_time = Instant::now();
- let concatenated_batch = self.concat_buffered_batches()?;
-
- // Write the concatenated buffered batch
- if let Some(batch) = concatenated_batch {
- self.output_data_writer.write(
- &batch,
- &self.metrics.encode_time,
- &self.metrics.write_time,
- )?;
- }
- self.output_data_writer.flush(&self.metrics.write_time)?;
-
- // Write index file. It should only contain 2 entries: 0 and the total
number of bytes written
- let index_file = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(self.output_index_path.clone())
- .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
- let mut index_buf_writer = BufWriter::new(index_file);
- let data_file_length =
self.output_data_writer.writer_stream_position()?;
- for offset in [0, data_file_length] {
- index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
- }
- index_buf_writer.flush()?;
-
- self.metrics
- .baseline
- .elapsed_compute()
- .add_duration(start_time.elapsed());
- Ok(())
- }
-}
-
-/// A helper struct to produce shuffled batches.
-/// This struct takes ownership of the buffered batches and partition indices
from the
-/// ShuffleRepartitioner, and provides an iterator over the batches in the
specified partitions.
-pub(crate) struct PartitionedBatchesProducer {
- buffered_batches: Vec<RecordBatch>,
- partition_indices: Vec<Vec<(u32, u32)>>,
- batch_size: usize,
-}
-
-impl PartitionedBatchesProducer {
- pub(crate) fn new(
- buffered_batches: Vec<RecordBatch>,
- indices: Vec<Vec<(u32, u32)>>,
- batch_size: usize,
- ) -> Self {
- Self {
- partition_indices: indices,
- buffered_batches,
- batch_size,
- }
- }
-
- fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_>
{
- PartitionedBatchIterator::new(
- &self.partition_indices[partition_id],
- &self.buffered_batches,
- self.batch_size,
- )
- }
-}
-
-pub(crate) struct PartitionedBatchIterator<'a> {
- record_batches: Vec<&'a RecordBatch>,
- batch_size: usize,
- indices: Vec<(usize, usize)>,
- pos: usize,
-}
-
-impl<'a> PartitionedBatchIterator<'a> {
- fn new(
- indices: &'a [(u32, u32)],
- buffered_batches: &'a [RecordBatch],
- batch_size: usize,
- ) -> Self {
- if indices.is_empty() {
- // Avoid unnecessary allocations when the partition is empty
- return Self {
- record_batches: vec![],
- batch_size,
- indices: vec![],
- pos: 0,
- };
- }
- let record_batches = buffered_batches.iter().collect::<Vec<_>>();
- let current_indices = indices
- .iter()
- .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize))
- .collect::<Vec<_>>();
- Self {
- record_batches,
- batch_size,
- indices: current_indices,
- pos: 0,
- }
- }
-}
-
-impl Iterator for PartitionedBatchIterator<'_> {
- type Item = Result<RecordBatch>;
-
- fn next(&mut self) -> Option<Self::Item> {
- if self.pos >= self.indices.len() {
- return None;
- }
-
- let indices_end = std::cmp::min(self.pos + self.batch_size,
self.indices.len());
- let indices = &self.indices[self.pos..indices_end];
- match interleave_record_batch(&self.record_batches, indices) {
- Ok(batch) => {
- self.pos = indices_end;
- Some(Ok(batch))
- }
- Err(e) => Some(Err(DataFusionError::ArrowError(
- Box::from(e),
- Some(DataFusionError::get_back_trace()),
- ))),
- }
- }
-}
-
-fn pmod(hash: u32, n: usize) -> usize {
- let hash = hash as i32;
- let n = n as i32;
- let r = hash % n;
- let result = if r < 0 { (r + n) % n } else { r };
- result as usize
-}
-
#[cfg(test)]
mod test {
use super::*;
- use crate::execution::shuffle::read_ipc_compressed;
+ use crate::execution::shuffle::{read_ipc_compressed, ShuffleBlockWriter};
+ use arrow::array::{Array, StringArray, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::config::SessionConfig;
- use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+ use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion::physical_expr::expressions::{col, Column};
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion::physical_plan::common::collect;
+ use datafusion::physical_plan::metrics::Time;
use datafusion::prelude::SessionContext;
+ use itertools::Itertools;
use std::io::Cursor;
use tokio::runtime::Runtime;
@@ -1224,16 +367,22 @@ mod test {
repartitioner.insert_batch(batch.clone()).await.unwrap();
- assert_eq!(2, repartitioner.partition_writers.len());
+ {
+ let partition_writers = repartitioner.partition_writers();
+ assert_eq!(partition_writers.len(), 2);
- assert!(!repartitioner.partition_writers[0].has_spill_file());
- assert!(!repartitioner.partition_writers[1].has_spill_file());
+ assert!(!partition_writers[0].has_spill_file());
+ assert!(!partition_writers[1].has_spill_file());
+ }
repartitioner.spill().unwrap();
// after spill, there should be spill files
- assert!(repartitioner.partition_writers[0].has_spill_file());
- assert!(repartitioner.partition_writers[1].has_spill_file());
+ {
+ let partition_writers = repartitioner.partition_writers();
+ assert!(partition_writers[0].has_spill_file());
+ assert!(partition_writers[1].has_spill_file());
+ }
// insert another batch after spilling
repartitioner.insert_batch(batch.clone()).await.unwrap();
@@ -1346,16 +495,6 @@ mod test {
RecordBatch::try_new(Arc::clone(&schema),
vec![Arc::new(array)]).unwrap()
}
- #[test]
- fn test_pmod() {
- let i: Vec<u32> = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b,
0xcd1e64fb];
- let result = i.into_iter().map(|i| pmod(i,
200)).collect::<Vec<usize>>();
-
- // expected partition from Spark with n=200
- let expected = vec![69, 5, 193, 171, 115];
- assert_eq!(result, expected);
- }
-
#[test]
#[cfg_attr(miri, ignore)]
fn test_round_robin_deterministic() {
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs
b/native/core/src/execution/shuffle/writers/partition_writer.rs
index 8b2555e09..8b5163d44 100644
--- a/native/core/src/execution/shuffle/writers/partition_writer.rs
+++ b/native/core/src/execution/shuffle/writers/partition_writer.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
-use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator;
+use crate::execution::shuffle::partitioners::PartitionedBatchIterator;
use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter;
use crate::execution::shuffle::ShuffleBlockWriter;
use datafusion::common::DataFusionError;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]