andygrove opened a new issue, #3194:
URL: https://github.com/apache/datafusion-comet/issues/3194
### What is the problem the feature request solves?
Add support for bucketed writes in native Parquet writes.
## Problem Description
Currently, native Parquet write support is blocked for any write operation
involving bucketed tables. This is enforced in `CometDataWritingCommand.scala`
at lines 62-64:
```scala
if (cmd.bucketSpec.isDefined) {
return Unsupported(Some("Bucketed writes are not supported"))
}
```
This limitation prevents native Parquet writes from being used with bucketed
tables, which are commonly used to optimize join and aggregation performance in
Spark SQL.
Current Behavior
When attempting to write bucketed data using native Parquet writes:
- Any operation with bucket specification falls back to Spark's default
writer
- Tables created with CLUSTERED BY cannot use native writes
- Native write acceleration cannot be used for bucketed tables
Expected Behavior
Native Parquet writes should support:
- Hash-based bucketing - Distribute data across buckets using hash function
- Multiple bucket columns - Support bucketing by multiple columns
- Sort within buckets - Support SORTED BY clause for intra-bucket ordering
- Configurable bucket count - Support different numbers of buckets
- Deterministic bucketing - Ensure same bucketing behavior as Spark's
default writer
Impact
This is a medium-priority blocker for production readiness. Without bucketed
write support:
- Cannot use native writes with bucketed tables
- Cannot leverage bucketing optimization for joins (bucket pruning, bucketed
joins)
- Cannot leverage bucketing optimization for aggregations
- Tables optimized for specific query patterns cannot benefit from native
writes
While not as common as partitioning, bucketing is used in production for:
- Large fact tables in star schemas
- Tables frequently joined on the same keys
- Pre-aggregated data marts
- Performance-critical analytical workloads
Technical Context
Affected Files:
-
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:62-64
- Where the check blocks bucketed writes
- native/core/src/execution/operators/parquet_writer.rs - Native writer
implementation that needs bucketing logic
Implementation Requirements:
- Extract bucket specification (bucket columns, sort columns, number of
buckets)
- Implement Spark-compatible hash function for bucket assignment
- Distribute rows to correct bucket files based on hash of bucket columns
- Support intra-bucket sorting when specified
- Proper file naming convention for bucketed files (e.g.,
part-00000-<uuid>-c000.snappy.parquet)
- Integration with FileCommitProtocol for atomic bucketed writes
Bucketing Algorithm:
Spark uses a specific hash algorithm for bucketing:
bucket_id = hash(bucket_columns) % num_buckets
The native implementation must use the same hash function to ensure
compatibility.
Considerations:
- Bucket number must remain stable for the lifetime of the table
- Bucket file naming must follow Spark conventions
- Sorting within buckets impacts write performance but improves read
performance
- Bucketing can be combined with partitioning (bucketed partitioned tables)
- Need to handle bucket ID assignment correctly to match Spark's behavior
Related Work
This is part of making native Parquet writes production-ready. Other
blockers include:
- Complex types support (#TBD)
- Partitioned writes support (#TBD)
- Cloud storage support (S3/GCS/Azure) (#TBD)
Acceptance Criteria
- Support bucketing by single column
- Support bucketing by multiple columns
- Support configurable number of buckets
- Support SORTED BY clause (sort within buckets)
- Hash function produces same bucket assignments as Spark default writer
- Correct bucket file naming convention
- Support combination of bucketing and partitioning
- Test coverage for various bucketing configurations
- Roundtrip tests (write + read) verify bucket assignments are correct
- Join performance tests show bucket pruning works correctly
- Performance benchmarks show native bucketed writes match or exceed Spark
default
Example Use Cases
```scala
// Basic bucketed table
df.write
.bucketBy(4, "user_id")
.saveAsTable("users_bucketed")
// Bucketed and sorted
df.write
.bucketBy(8, "customer_id")
.sortBy("order_date")
.saveAsTable("orders_bucketed")
// Bucketed on multiple columns
df.write
.bucketBy(16, "year", "month")
.saveAsTable("events_bucketed")
// Combination of bucketing and partitioning
df.write
.partitionBy("date")
.bucketBy(32, "user_id")
.saveAsTable("user_events")
```
References
-
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#bucketing
- Spark's bucketing implementation:
org.apache.spark.sql.catalyst.expressions.Murmur3Hash
### Describe the potential solution
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]