andygrove opened a new pull request, #3693:
URL: https://github.com/apache/datafusion-comet/pull/3693

   ## Which issue does this PR close?
   
   Closes #3692.
   
   ## Rationale for this change
   
   CometBroadcastHashJoin has two performance bottlenecks:
   
   1. **Native side always uses `PartitionMode::Partitioned`** — All hash joins 
are hardcoded to `PartitionMode::Partitioned` in `planner.rs`, even broadcast 
joins. DataFusion supports `PartitionMode::CollectLeft` which is specifically 
designed for broadcast joins and avoids unnecessary repartitioning.
   
   2. **Per-task decompression of broadcast data** — Every task independently 
decompresses (LZ4) the entire broadcast payload. With N partitions on an 
executor, the same broadcast data is decompressed N times.
   
   ## What changes are included in this PR?
   
   **Fix 1 — Use `PartitionMode::CollectLeft` for broadcast joins:**
   - Added `bool is_broadcast = 6` field to `HashJoin` protobuf message
   - Set `is_broadcast = true` in Scala serde when the join is a 
`BroadcastHashJoinExec`
   - Use `PartitionMode::CollectLeft` in Rust planner when `is_broadcast` is 
true
   
   **Fix 2 — Cache decompressed broadcast bytes at executor level:**
   - Added `ConcurrentHashMap` in `CometBatchRDD` companion object keyed by 
broadcast ID
   - First task on an executor decompresses the broadcast data; subsequent 
tasks reuse cached decompressed Arrow IPC bytes
   - Each task still performs Arrow IPC parsing (for proper memory management) 
but skips LZ4 decompression
   
   **Benchmark:**
   - Added `CometBroadcastHashJoinBenchmark` for measuring broadcast join 
performance across join types and broadcast sizes
   
   ## How are these changes tested?
   
   - All 9 existing `CometJoinSuite` tests pass
   - New `CometBroadcastHashJoinBenchmark` validates performance (see benchmark 
results below)
   - Clippy passes with no warnings
   
   ### Benchmark results
   
   ```
   OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.2, Apple M3 Ultra
   
   Broadcast Hash Join (INNER, stream=2097152, broadcast=1000):
   Spark                     45ms    1.0X
   Comet (Scan + Exec)       51ms    0.9X
   
   Broadcast Hash Join (LEFT, stream=2097152, broadcast=1000):
   Spark                     39ms    1.0X
   Comet (Scan + Exec)       48ms    0.8X
   
   Broadcast Hash Join (RIGHT, stream=2097152, broadcast=1000):
   Spark                    244ms    1.0X
   Comet (Scan + Exec)      104ms    2.3X
   
   Broadcast Hash Join (INNER, stream=2097152, broadcast=10000):
   Spark                     40ms    1.0X
   Comet (Scan + Exec)       47ms    0.9X
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to