andygrove opened a new pull request, #3295:
URL: https://github.com/apache/datafusion-comet/pull/3295
## Summary
This PR implements split serialization for Iceberg native scans to reduce
network transfer and deserialization overhead when scanning tables with many
partitions.
**⚠️ EXPERIMENTAL**: This is an experimental optimization that needs further
testing and benchmarking before production use.
### Problem
Currently, ALL partition metadata is serialized into a single `IcebergScan`
protobuf message sent to every executor task. Each task only uses one
partition's data but deserializes all partitions.
**Impact for 10,000 partitions:**
- Each task receives ~10,000x more `file_partitions` data than needed
- Network transfer overhead
- Unnecessary deserialization of unused partition data
### Solution
Split serialization into:
- **Common data** (`IcebergScanCommon`): pools, metadata, catalog properties
- serialized once, captured in RDD closure
- **Per-partition data** (`IcebergFilePartition[]`): file scan tasks - one
per partition, stored in Partition objects
**Data flow:**
```
Driver:
- Serialize IcebergScanCommon → commonBytes (once)
- Serialize IcebergFilePartition[i] → partitionBytes[i] (once per
partition)
Task N:
- Receives: commonBytes (from closure) + partitionBytes[N] (from Partition
object)
- Combines into IcebergScan{split_mode=true, common, partition}
- Passes to Rust
```
### Changes
| File | Changes |
|------|---------|
| `native/proto/src/proto/operator.proto` | Add `IcebergScanCommon`, add
`split_mode`/`common`/`partition` fields to `IcebergScan` |
| `native/core/src/execution/planner.rs` | Handle `split_mode`, add
`parse_file_scan_tasks_from_common()` |
| `spark/.../CometIcebergSplitRDD.scala` | **New file** - RDD with custom
Partition holding per-partition bytes |
| `spark/.../CometIcebergNativeScan.scala` | Build common bytes +
per-partition bytes, thread-local for passing to createExec |
| `spark/.../CometIcebergNativeScanExec.scala` | Add split data fields,
override `doExecuteColumnar` for split mode |
## Test plan
- [x] All existing Iceberg tests pass (49/49)
- [ ] Benchmark with large partition count to quantify improvement
- [ ] Test with different Spark versions
🤖 Generated with [Claude Code](https://claude.ai/code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]