This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ec5df97b4 feat: Add support for round-robin partitioning in native
shuffle (#3076)
ec5df97b4 is described below
commit ec5df97b40c366646aa7c59514f49c8ba64d588a
Author: Andy Grove <[email protected]>
AuthorDate: Sun Jan 25 05:41:41 2026 -0700
feat: Add support for round-robin partitioning in native shuffle (#3076)
---
.../main/scala/org/apache/comet/CometConf.scala | 30 ++++
docs/source/contributor-guide/jvm_shuffle.md | 8 +-
docs/source/contributor-guide/native_shuffle.md | 22 ++-
docs/source/user-guide/latest/compatibility.md | 26 ++++
native/core/src/execution/planner.rs | 12 ++
.../src/execution/shuffle/comet_partitioning.rs | 6 +-
.../core/src/execution/shuffle/shuffle_writer.rs | 161 ++++++++++++++++++++-
native/proto/src/proto/partitioning.proto | 7 +
.../shuffle/CometNativeShuffleWriter.scala | 12 +-
.../shuffle/CometShuffleExchangeExec.scala | 31 +++-
.../comet/exec/CometNativeShuffleSuite.scala | 49 +++++++
11 files changed, 348 insertions(+), 16 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 656dbc9a5..1e7adfd58 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -375,6 +375,36 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)
+ val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED:
ConfigEntry[Boolean] =
+ conf("spark.comet.native.shuffle.partitioning.roundrobin.enabled")
+ .category(CATEGORY_SHUFFLE)
+ .doc(
+ "Whether to enable round robin partitioning for Comet native shuffle.
" +
+ "This is disabled by default because Comet's round-robin produces
different " +
+ "partition assignments than Spark. Spark sorts rows by their binary
UnsafeRow " +
+ "representation before assigning partitions, but Comet uses Arrow
format which " +
+ "has a different binary layout. Instead, Comet implements
round-robin as hash " +
+ "partitioning on all columns, which achieves the same goals: even
distribution, " +
+ "deterministic output (for fault tolerance), and no semantic
grouping. " +
+ "Sorted output will be identical to Spark, but unsorted row ordering
may differ.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS:
ConfigEntry[Int] =
+ conf("spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns")
+ .category(CATEGORY_SHUFFLE)
+ .doc(
+ "The maximum number of columns to hash for round robin partitioning. "
+
+ "When set to 0 (the default), all columns are hashed. " +
+ "When set to a positive value, only the first N columns are used for
hashing, " +
+ "which can improve performance for wide tables while still providing
" +
+ "reasonable distribution.")
+ .intConf
+ .checkValue(
+ v => v >= 0,
+ "The maximum number of columns to hash for round robin partitioning
must be non-negative.")
+ .createWithDefault(0)
+
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
diff --git a/docs/source/contributor-guide/jvm_shuffle.md
b/docs/source/contributor-guide/jvm_shuffle.md
index e011651d2..2145c82eb 100644
--- a/docs/source/contributor-guide/jvm_shuffle.md
+++ b/docs/source/contributor-guide/jvm_shuffle.md
@@ -46,12 +46,10 @@ JVM shuffle (`CometColumnarExchange`) is used instead of
native shuffle (`CometE
(not a `CometPlan`), JVM shuffle is the only option since native shuffle
requires columnar input
from Comet operators.
-3. **Unsupported partitioning type**: Native shuffle only supports
`HashPartitioning`, `RangePartitioning`,
- and `SinglePartition`. JVM shuffle additionally supports
`RoundRobinPartitioning`.
-
-4. **Unsupported partition key types**: For `HashPartitioning` and
`RangePartitioning`, native shuffle
+3. **Unsupported partition key types**: For `HashPartitioning` and
`RangePartitioning`, native shuffle
only supports primitive types as partition keys. Complex types (struct,
array, map) cannot be used
- as partition keys in native shuffle, though they are fully supported as
data columns in both implementations.
+ as partition keys in native shuffle and will fall back to JVM columnar
shuffle. Note that complex types are
+ fully supported as data columns in both implementations.
## Input Handling
diff --git a/docs/source/contributor-guide/native_shuffle.md
b/docs/source/contributor-guide/native_shuffle.md
index e3d2dea47..18e80a90c 100644
--- a/docs/source/contributor-guide/native_shuffle.md
+++ b/docs/source/contributor-guide/native_shuffle.md
@@ -52,8 +52,7 @@ Native shuffle (`CometExchange`) is selected when all of the
following condition
- `HashPartitioning`
- `RangePartitioning`
- `SinglePartition`
-
- `RoundRobinPartitioning` requires JVM shuffle.
+ - `RoundRobinPartitioning`
4. **Supported partition key types**: For `HashPartitioning` and
`RangePartitioning`, partition
keys must be primitive types. Complex types (struct, array, map) as
partition keys require
@@ -131,7 +130,7 @@ Native shuffle (`CometExchange`) is selected when all of
the following condition
2. **Native execution**: `CometExec.getCometIterator()` executes the plan in
Rust.
3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the
appropriate partitioner:
- - `MultiPartitionShuffleRepartitioner`: For hash/range partitioning
+ - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin
partitioning
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)
4. **Buffering and spilling**: The partitioner buffers rows per partition.
When memory pressure
@@ -187,6 +186,19 @@ For range partitioning:
The simplest case: all rows go to partition 0. Uses
`SinglePartitionShufflePartitioner` which
simply concatenates batches to reach the configured batch size.
+### Round Robin Partitioning
+
+Comet implements round robin partitioning using hash-based assignment for
determinism:
+
+1. Computes a Murmur3 hash of columns (using seed 42)
+2. Assigns partitions directly using the hash: `partition_id = hash %
num_partitions`
+
+This approach guarantees determinism across retries, which is critical for
fault tolerance.
+However, unlike true round robin which cycles through partitions row-by-row,
hash-based
+assignment only provides even distribution when the data has sufficient
variation in the
+hashed columns. Data with low cardinality or identical values may result in
skewed partition
+sizes.
+
## Memory Management
Native shuffle uses DataFusion's memory management with spilling support:
@@ -235,8 +247,8 @@ independently compressed, allowing parallel decompression
during reads.
| ------------------- | -------------------------------------- |
--------------------------------- |
| Input format | Columnar (direct from Comet operators) | Row-based
(via ColumnarToRowExec) |
| Partitioning logic | Rust implementation | Spark's
partitioner |
-| Supported schemes | Hash, Range, Single | Hash, Range,
Single, RoundRobin |
-| Partition key types | Primitives only | Any type
|
+| Supported schemes | Hash, Range, Single, RoundRobin | Hash, Range,
Single, RoundRobin |
+| Partition key types | Primitives only (Hash, Range) | Any type
|
| Performance | Higher (no format conversion) | Lower
(columnar→row→columnar) |
| Writer variants | Single path | Bypass (hash)
and sort-based |
diff --git a/docs/source/user-guide/latest/compatibility.md
b/docs/source/user-guide/latest/compatibility.md
index 64bd9d2bc..c09f6a61e 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -69,6 +69,32 @@ this can be overridden by setting
`spark.comet.regexp.allowIncompatible=true`.
Comet's support for window functions is incomplete and known to be incorrect.
It is disabled by default and
should not be used in production. The feature will be enabled in a future
release. Tracking issue:
[#2721](https://github.com/apache/datafusion-comet/issues/2721).
+## Round-Robin Partitioning
+
+Comet's native shuffle implementation of round-robin partitioning
(`df.repartition(n)`) is not compatible with
+Spark's implementation and is disabled by default. It can be enabled by setting
+`spark.comet.native.shuffle.partitioning.roundrobin.enabled=true`.
+
+**Why the incompatibility exists:**
+
+Spark's round-robin partitioning sorts rows by their binary `UnsafeRow`
representation before assigning them to
+partitions. This ensures deterministic output for fault tolerance (task
retries produce identical results).
+Comet uses Arrow format internally, which has a completely different binary
layout than `UnsafeRow`, making it
+impossible to match Spark's exact partition assignments.
+
+**Comet's approach:**
+
+Instead of true round-robin assignment, Comet implements round-robin as hash
partitioning on ALL columns. This
+achieves the same semantic goals:
+
+- **Even distribution**: Rows are distributed evenly across partitions (as
long as the hash varies sufficiently -
+ in some cases there could be skew)
+- **Deterministic**: Same input always produces the same partition assignments
(important for fault tolerance)
+- **No semantic grouping**: Unlike hash partitioning on specific columns, this
doesn't group related rows together
+
+The only difference is that Comet's partition assignments will differ from
Spark's. When results are sorted,
+they will be identical to Spark. Unsorted results may have different row
ordering.
+
## Cast
Cast operations in Comet fall into three levels of support:
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 6a4ad97f8..44ff20a44 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2342,6 +2342,18 @@ impl PhysicalPlanner {
))
}
PartitioningStruct::SinglePartition(_) =>
Ok(CometPartitioning::SinglePartition),
+ PartitioningStruct::RoundRobinPartition(rr_partition) => {
+ // Treat negative max_hash_columns as 0 (no limit)
+ let max_hash_columns = if rr_partition.max_hash_columns <= 0 {
+ 0
+ } else {
+ rr_partition.max_hash_columns as usize
+ };
+ Ok(CometPartitioning::RoundRobin(
+ rr_partition.num_partitions as usize,
+ max_hash_columns,
+ ))
+ }
}
}
diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs
b/native/core/src/execution/shuffle/comet_partitioning.rs
index a2422cf9e..b7ad15879 100644
--- a/native/core/src/execution/shuffle/comet_partitioning.rs
+++ b/native/core/src/execution/shuffle/comet_partitioning.rs
@@ -31,6 +31,10 @@ pub enum CometPartitioning {
/// Rows for comparing to 4) OwnedRows that represent the boundaries of
each partition, used with
/// LexOrdering to bin each value in the RecordBatch to a partition.
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
+ /// Round robin partitioning. Distributes rows across partitions by
sorting them by hash
+ /// (computed from columns) and then assigning partitions sequentially.
Args are:
+ /// 1) number of partitions, 2) max columns to hash (0 means no limit).
+ RoundRobin(usize, usize),
}
impl CometPartitioning {
@@ -38,7 +42,7 @@ impl CometPartitioning {
use CometPartitioning::*;
match self {
SinglePartition => 1,
- Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
+ Hash(_, n) | RangePartitioning(_, n, _, _) | RoundRobin(n, _) =>
*n,
}
}
}
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index f21cde2ba..55d6a9ef9 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -382,8 +382,11 @@ impl MultiPartitionShuffleRepartitioner {
// The initial values are not used.
let scratch = ScratchSpace {
hashes_buf: match partitioning {
- // Only allocate the hashes_buf if hash partitioning.
- CometPartitioning::Hash(_, _) => vec![0; batch_size],
+ // Allocate hashes_buf for hash and round robin partitioning.
+ // Round robin hashes all columns to achieve even,
deterministic distribution.
+ CometPartitioning::Hash(_, _) |
CometPartitioning::RoundRobin(_, _) => {
+ vec![0; batch_size]
+ }
_ => vec![],
},
partition_ids: vec![0; batch_size],
@@ -598,6 +601,68 @@ impl MultiPartitionShuffleRepartitioner {
.await?;
self.scratch = scratch;
}
+ CometPartitioning::RoundRobin(num_output_partitions,
max_hash_columns) => {
+ // Comet implements "round robin" as hash partitioning on
columns.
+ // This achieves the same goal as Spark's round robin (even
distribution
+ // without semantic grouping) while being deterministic for
fault tolerance.
+ //
+ // Note: This produces different partition assignments than
Spark's round robin,
+ // which sorts by UnsafeRow binary representation before
assigning partitions.
+ // However, both approaches provide even distribution and
determinism.
+ let mut scratch = std::mem::take(&mut self.scratch);
+ let (partition_starts, partition_row_indices): (&Vec<u32>,
&Vec<u32>) = {
+ let mut timer = self.metrics.repart_time.timer();
+
+ let num_rows = input.num_rows();
+
+ // Collect columns for hashing, respecting
max_hash_columns limit
+ // max_hash_columns of 0 means no limit (hash all columns)
+ // Negative values are normalized to 0 in the planner
+ let num_columns_to_hash = if *max_hash_columns == 0 {
+ input.num_columns()
+ } else {
+ (*max_hash_columns).min(input.num_columns())
+ };
+ let columns_to_hash: Vec<ArrayRef> =
(0..num_columns_to_hash)
+ .map(|i| Arc::clone(input.column(i)))
+ .collect();
+
+ // Use identical seed as Spark hash partitioning.
+ let hashes_buf = &mut scratch.hashes_buf[..num_rows];
+ hashes_buf.fill(42_u32);
+
+ // Compute hash for selected columns
+ create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
+
+ // Assign partition IDs based on hash (same as hash
partitioning)
+ let partition_ids = &mut scratch.partition_ids[..num_rows];
+ hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
+ partition_ids[idx] = pmod(*hash,
*num_output_partitions) as u32;
+ });
+
+ // We now have partition ids for every input row, map that
to partition starts
+ // and partition indices to eventually write these rows to
partition buffers.
+ map_partition_ids_to_starts_and_indices(
+ &mut scratch,
+ *num_output_partitions,
+ num_rows,
+ );
+
+ timer.stop();
+ Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+ &scratch.partition_starts,
+ &scratch.partition_row_indices,
+ ))
+ }?;
+
+ self.buffer_partitioned_batch_may_spill(
+ input,
+ partition_row_indices,
+ partition_starts,
+ )
+ .await?;
+ self.scratch = scratch;
+ }
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
@@ -1431,6 +1496,7 @@ mod test {
Arc::new(row_converter),
owned_rows,
),
+ CometPartitioning::RoundRobin(num_partitions, 0),
] {
let batches = (0..num_batches).map(|_|
batch.clone()).collect::<Vec<_>>();
@@ -1483,4 +1549,95 @@ mod test {
let expected = vec![69, 5, 193, 171, 115];
assert_eq!(result, expected);
}
+
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn test_round_robin_deterministic() {
+ // Test that round robin partitioning produces identical results when
run multiple times
+ use std::fs;
+ use std::io::Read;
+
+ let batch_size = 1000;
+ let num_batches = 10;
+ let num_partitions = 8;
+
+ let batch = create_batch(batch_size);
+ let batches = (0..num_batches).map(|_|
batch.clone()).collect::<Vec<_>>();
+
+ // Run shuffle twice and compare results
+ for run in 0..2 {
+ let data_file = format!("/tmp/rr_data_{}.out", run);
+ let index_file = format!("/tmp/rr_index_{}.out", run);
+
+ let partitions = std::slice::from_ref(&batches);
+ let exec = ShuffleWriterExec::try_new(
+ Arc::new(DataSourceExec::new(Arc::new(
+ MemorySourceConfig::try_new(partitions, batch.schema(),
None).unwrap(),
+ ))),
+ CometPartitioning::RoundRobin(num_partitions, 0),
+ CompressionCodec::Zstd(1),
+ data_file.clone(),
+ index_file.clone(),
+ false,
+ 1024 * 1024,
+ )
+ .unwrap();
+
+ let config = SessionConfig::new();
+ let runtime_env = Arc::new(
+ RuntimeEnvBuilder::new()
+ .with_memory_limit(10 * 1024 * 1024, 1.0)
+ .build()
+ .unwrap(),
+ );
+ let session_ctx =
Arc::new(SessionContext::new_with_config_rt(config, runtime_env));
+ let task_ctx = Arc::new(TaskContext::from(session_ctx.as_ref()));
+
+ // Execute the shuffle
+ futures::executor::block_on(async {
+ let mut stream = exec.execute(0,
Arc::clone(&task_ctx)).unwrap();
+ while stream.next().await.is_some() {}
+ });
+
+ if run == 1 {
+ // Compare data files
+ let mut data0 = Vec::new();
+ fs::File::open("/tmp/rr_data_0.out")
+ .unwrap()
+ .read_to_end(&mut data0)
+ .unwrap();
+ let mut data1 = Vec::new();
+ fs::File::open("/tmp/rr_data_1.out")
+ .unwrap()
+ .read_to_end(&mut data1)
+ .unwrap();
+ assert_eq!(
+ data0, data1,
+ "Round robin shuffle data should be identical across runs"
+ );
+
+ // Compare index files
+ let mut index0 = Vec::new();
+ fs::File::open("/tmp/rr_index_0.out")
+ .unwrap()
+ .read_to_end(&mut index0)
+ .unwrap();
+ let mut index1 = Vec::new();
+ fs::File::open("/tmp/rr_index_1.out")
+ .unwrap()
+ .read_to_end(&mut index1)
+ .unwrap();
+ assert_eq!(
+ index0, index1,
+ "Round robin shuffle index should be identical across runs"
+ );
+ }
+ }
+
+ // Clean up
+ let _ = fs::remove_file("/tmp/rr_data_0.out");
+ let _ = fs::remove_file("/tmp/rr_index_0.out");
+ let _ = fs::remove_file("/tmp/rr_data_1.out");
+ let _ = fs::remove_file("/tmp/rr_index_1.out");
+ }
}
diff --git a/native/proto/src/proto/partitioning.proto
b/native/proto/src/proto/partitioning.proto
index e11d7a384..e70b8264f 100644
--- a/native/proto/src/proto/partitioning.proto
+++ b/native/proto/src/proto/partitioning.proto
@@ -31,6 +31,7 @@ message Partitioning {
HashPartition hash_partition = 1;
SinglePartition single_partition = 2;
RangePartition range_partition = 3;
+ RoundRobinPartition round_robin_partition = 4;
}
}
@@ -51,3 +52,9 @@ message RangePartition {
int32 num_partitions = 2;
repeated BoundaryRow boundary_rows = 4;
}
+
+message RoundRobinPartition {
+ int32 num_partitions = 1;
+ // Maximum number of columns to hash. 0 means no limit (hash all columns).
+ int32 max_hash_columns = 2;
+}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
index b5d15b41f..3fc222bd1 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.{IndexShuffleBlockResolver,
ShuffleWriteMetricsReporter, ShuffleWriter}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
Literal}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, RangePartitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.comet.{CometExec, CometMetricNode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -292,6 +292,16 @@ class CometNativeShuffleWriter[K, V](
shuffleWriterBuilder.setPartitioning(
partitioningBuilder.setRangePartition(partitioning).build())
+ case _: RoundRobinPartitioning =>
+ val partitioning =
PartitioningOuterClass.RoundRobinPartition.newBuilder()
+ partitioning.setNumPartitions(outputPartitioning.numPartitions)
+ partitioning.setMaxHashColumns(
+
CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS.get())
+
+ val partitioningBuilder =
PartitioningOuterClass.Partitioning.newBuilder()
+ shuffleWriterBuilder.setPartitioning(
+ partitioningBuilder.setRoundRobinPartition(partitioning).build())
+
case _ =>
throw new UnsupportedOperationException(
s"Partitioning $outputPartitioning is not supported.")
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 1805711d0..d65a6b21f 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -266,12 +266,31 @@ object CometShuffleExchangeExec
def supportedHashPartitioningDataType(dt: DataType): Boolean = dt match {
case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _:
LongType |
_: FloatType | _: DoubleType | _: StringType | _: BinaryType | _:
TimestampType |
- _: TimestampNTZType | _: DecimalType | _: DateType =>
+ _: TimestampNTZType | _: DateType =>
+ true
+ case _: DecimalType =>
+ // TODO enforce this check
+ // https://github.com/apache/datafusion-comet/issues/3079
+ // Decimals with precision > 18 require Java BigDecimal conversion
before hashing
+ // d.precision <= 18
true
case _ =>
false
}
+ /**
+ * Check if a data type contains a decimal with precision > 18. Such
decimals require
+ * conversion to Java BigDecimal before hashing, which is not supported in
native shuffle.
+ */
+ def containsHighPrecisionDecimal(dt: DataType): Boolean = dt match {
+ case d: DecimalType => d.precision > 18
+ case StructType(fields) => fields.exists(f =>
containsHighPrecisionDecimal(f.dataType))
+ case ArrayType(elementType, _) =>
containsHighPrecisionDecimal(elementType)
+ case MapType(keyType, valueType, _) =>
+ containsHighPrecisionDecimal(keyType) ||
containsHighPrecisionDecimal(valueType)
+ case _ => false
+ }
+
/**
* Determine which data types are supported as partition columns in native
shuffle.
*
@@ -384,6 +403,14 @@ object CometShuffleExchangeExec
}
}
supported
+ case RoundRobinPartitioning(_) =>
+ val config =
CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED
+ if (!config.get(conf)) {
+ withInfo(s, s"${config.key} is disabled")
+ return false
+ }
+ // RoundRobin partitioning uses position-based distribution matching
Spark's behavior
+ true
case _ =>
withInfo(
s,
@@ -395,7 +422,7 @@ object CometShuffleExchangeExec
/**
* Check if JVM-based columnar shuffle (CometColumnarExchange) can be used
for this shuffle. JVM
* shuffle is used when the child plan is not a Comet native operator, or
when native shuffle
- * doesn't support the required partitioning type (e.g.,
RoundRobinPartitioning).
+ * doesn't support the required partitioning type.
*/
def columnarShuffleSupported(s: ShuffleExchangeExec): Boolean = {
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index a682ff91a..1cf43ea59 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -388,4 +388,53 @@ class CometNativeShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper
checkSparkAnswer(df)
}
}
+
+ test("native shuffle: round robin partitioning") {
+ withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key
-> "true") {
+ withParquetTable((0 until 100).map(i => (i, (i + 1).toLong, s"str$i")),
"tbl") {
+ val df = sql("SELECT * FROM tbl")
+
+ // Test basic round robin repartitioning
+ val shuffled = df.repartition(10)
+
+ // Just collect and verify row count - simpler test
+ val result = shuffled.collect()
+ assert(result.length == 100, s"Expected 100 rows, got
${result.length}")
+ }
+ }
+ }
+
+ test("native shuffle: round robin deterministic behavior") {
+ // Test that round robin produces consistent results across multiple
executions
+ withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key
-> "true") {
+ withParquetTable((0 until 1000).map(i => (i, (i + 1).toLong, s"str$i")),
"tbl") {
+ val df = sql("SELECT * FROM tbl")
+
+ // Execute shuffle twice and compare results
+ val result1 = df.repartition(8).collect().toSeq
+ val result2 = df.repartition(8).collect().toSeq
+
+ // Results should be identical (deterministic ordering)
+ assert(result1 == result2, "Round robin shuffle should produce
deterministic results")
+ }
+ }
+ }
+
+ test("native shuffle: round robin with filter") {
+ withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key
-> "true") {
+ withParquetTable((0 until 100).map(i => (i, (i + 1).toLong)), "tbl") {
+ val df = sql("SELECT * FROM tbl")
+ val shuffled = df
+ .filter($"_1" < 50)
+ .repartition(10)
+
+ // Just collect and verify - simpler test
+ val result = shuffled.collect()
+ assert(result.length == 50, s"Expected 50 rows after filter, got
${result.length}")
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]