andygrove opened a new pull request, #3693: URL: https://github.com/apache/datafusion-comet/pull/3693
## Which issue does this PR close? Closes #3692. ## Rationale for this change CometBroadcastHashJoin has two performance bottlenecks: 1. **Native side always uses `PartitionMode::Partitioned`** — All hash joins are hardcoded to `PartitionMode::Partitioned` in `planner.rs`, even broadcast joins. DataFusion supports `PartitionMode::CollectLeft` which is specifically designed for broadcast joins and avoids unnecessary repartitioning. 2. **Per-task decompression of broadcast data** — Every task independently decompresses (LZ4) the entire broadcast payload. With N partitions on an executor, the same broadcast data is decompressed N times. ## What changes are included in this PR? **Fix 1 — Use `PartitionMode::CollectLeft` for broadcast joins:** - Added `bool is_broadcast = 6` field to `HashJoin` protobuf message - Set `is_broadcast = true` in Scala serde when the join is a `BroadcastHashJoinExec` - Use `PartitionMode::CollectLeft` in Rust planner when `is_broadcast` is true **Fix 2 — Cache decompressed broadcast bytes at executor level:** - Added `ConcurrentHashMap` in `CometBatchRDD` companion object keyed by broadcast ID - First task on an executor decompresses the broadcast data; subsequent tasks reuse cached decompressed Arrow IPC bytes - Each task still performs Arrow IPC parsing (for proper memory management) but skips LZ4 decompression **Benchmark:** - Added `CometBroadcastHashJoinBenchmark` for measuring broadcast join performance across join types and broadcast sizes ## How are these changes tested? - All 9 existing `CometJoinSuite` tests pass - New `CometBroadcastHashJoinBenchmark` validates performance (see benchmark results below) - Clippy passes with no warnings ### Benchmark results ``` OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.2, Apple M3 Ultra Broadcast Hash Join (INNER, stream=2097152, broadcast=1000): Spark 45ms 1.0X Comet (Scan + Exec) 51ms 0.9X Broadcast Hash Join (LEFT, stream=2097152, broadcast=1000): Spark 39ms 1.0X Comet (Scan + Exec) 48ms 0.8X Broadcast Hash Join (RIGHT, stream=2097152, broadcast=1000): Spark 244ms 1.0X Comet (Scan + Exec) 104ms 2.3X Broadcast Hash Join (INNER, stream=2097152, broadcast=10000): Spark 40ms 1.0X Comet (Scan + Exec) 47ms 0.9X ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
