SubhamSinghal opened a new pull request, #22371:
URL: https://github.com/apache/datafusion/pull/22371

    ## Which issue does this PR close?
   
     - Resolves the TODO at 
`datafusion/physical-plan/src/joins/cross_join.rs:164-165`                      
                            
      
     ## Rationale for this change                                               
                                                        
                                                               
     `CrossJoinExec` currently forces the left child into a single partition 
via `Distribution::SinglePartition`, limiting parallelism to N (right partition 
count). On a machine with 32 cores and N=4 right partitions, 28 cores sit idle 
during the cross join.
                                                                                
                                                        
     With M×N partitioning, a cross join with 8 left partitions and 4 right 
partitions produces 32 independent output tasks instead of  4, providing 8× 
more parallelism.
                                                                                
                                                        
     ## What changes are included in this PR?                                   
                                                        
      
     - **M×N output partitioning**: `required_input_distribution` returns 
`[Unspecified, Unspecified]`, output is                       
     `UnknownPartitioning(M*N)`. No `CoalescePartitionsExec` inserted on the 
left.
     - **Buffer both sides**: Both left and right partitions buffered via 
per-partition `OnceAsync` vectors (`left_futs[M]` and         
     `right_futs[N]`). Tasks sharing the same partition index share the buffer 
through `Arc`, eliminating redundant I/O.                
     - **Synchronous right iteration**: `CrossJoinStream` replaces the live 
right stream with buffered `Arc<Vec<RecordBatch>>` + index advancement. 
`FetchProbeBatch` becomes synchronous — no `Poll::Pending` after initial load.  
                                      
     - **Concurrent future polling**: `collect_build_side` polls both left and 
right futures on every call, ensuring they load in
     parallel (`max(left_time, right_time)` instead of sequential).             
                                                        
     - **Partition statistics**: `partition_statistics(Some(p))` decomposes `p` 
into `(p/N, p%N)` for per-partition estimates.
                                                                                
                                                        
     ## Are these changes tested?                              
                                                                                
                                                        
     - Existing unit tests pass (6/6 cross_join tests, 1415 total in 
physical-plan crate)                                               
     - `cross_join.slt` passes
     - `explain_tree.slt` updated to reflect new plan shape (RepartitionExec 
for parallelism instead of CoalescePartitionsExec)         
     - OOM test updated for new memory consumer name (`CrossJoinExec[left]`)    
                                                        
                                                                                
                                                        
     ## Are there any user-facing changes?                                      
                                                        
                                                                                
                                                        
     - Cross join queries will automatically use more parallelism (M×N output 
partitions instead of N)                                  
     - Peak memory increases from `|L| + N×batch_size` to `|L| + |R|`, which is 
negligible for any feasible cross join (since output is `|L|×|R|` rows)         
                                                                                
                           
     - `EXPLAIN` plans for cross joins will no longer show 
`CoalescePartitionsExec` below the left child
     - Output partitioning changes from the right side's hash/distribution to 
`UnknownPartitioning(M*N)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to