UBarney opened a new issue, #16364:
URL: https://github.com/apache/datafusion/issues/16364

   ### Is your feature request related to a problem or challenge?
   
   The current Nested Loop Join implementation follows this simplified logic:
   1. Buffer the Build Side: All data from the left (build) side of the join is 
collected and held in memory.
   2. Iterate the Probe Side: The operator iterates through the right (probe) 
side, processing one RecordBatch at a time.
   3. Process Batch Pairs: For each buffered RecordBatch from the left side and 
the incoming RecordBatch from the right side, the core [join 
logic](https://github.com/apache/datafusion/blob/69dfe6c499d39563f4e6d9835fcdf3793f7d98c8/datafusion/physical-plan/src/joins/nested_loop_join.rs#L975-L1021)
 is executed: 
        1. [Creates a Cartesian product of two input batches and apply filter 
-> (left_side_indices, 
right_side_indices)](https://github.com/apache/datafusion/blob/69dfe6c499d39563f4e6d9835fcdf3793f7d98c8/datafusion/physical-plan/src/joins/nested_loop_join.rs#L752)
        2. adjust_indices_by_join_type
        3. `build_batch_from_indices(left_side_indices, right_side_indices)`
   
   
   It has following problems
   - It put all indices of Cartesian product of two input batches in memory 
with length of `left_batch.num_rows() * right_batch.num_rows()`
   - It may create extreme large record_batch at a time
        - In 3.i if query has filter, it will call 
`build_batch_from_indices(left_side_indices, right_side_indices) -> 
RecordBatch`. It will return 
[**`left_side_indices.len()*right_side_indices().len()`**](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/utils.rs#L830-L838)
 rows
        - In 3.iii it will return at most 
`left_side_indices.len()*right_side_indices().len()` rows
   
   I add some log to show that
   ```
   > select t1.value from range(40960) t1 join range(8192) t2 on t1.value + 
t2.value < t1.value * t2.value;
   [datafusion/physical-plan/src/joins/utils.rs:864:5] 
intermediate_batch.num_rows() = 335544320
   [datafusion/physical-plan/src/joins/utils.rs:864:5] 
intermediate_batch.get_array_memory_size() = 5368709312
   [datafusion/physical-plan/src/joins/nested_loop_join.rs:1018:5] 
&left_side.len() = 335446019
   [datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] 
result.num_rows() = 335446019
   [datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] 
result.get_array_memory_size() = 2683568248
   ```
   
   
   ### Describe the solution you'd like
   
   1. Process the Cartesian Product Incrementally. At any given time, it will 
only generate the Cartesian product for a slice of the left batch against a 
slice of the right batch
        1. Special Handling for Right Joins: For Right, RightSemi, and 
RightAnti joins, a `probe_side_bitmap` must be maintained. This bitmap is 
populated as each chunk is processed to track which rows from the right side 
have found a match. After all chunks have been evaluated, this bitmap is used 
to generate the final output for the unmatched right-side rows.
   
   2. Limit `intermediate_batch` Size During Filtering: When applying the join 
filter in `apply_join_filter_to_indices`, avoid creating a single, massive 
`intermediate_batch` for evaluation. Instead, process the indices in batches:
        1. Iteratively build smaller intermediate batches by calling 
build_batch_from_indices on slices of the input indices (e.g., 
build_indices.slice(i, N) and probe_indices.slice(i, N)).
        2. Apply the filter expression to each small intermediate batch.
        3. Concatenate the filtered results from each chunk to produce the 
final set of matched indices.
   3. Yield Partial Batches on Demand: On each call, the stream will use its 
cursor to process the next chunk of indices (e.g., from i to i + N). It will 
then build and return a small RecordBatch from only that slice
   
   (we can do 2, 3 first)
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to