milenkovicm commented on code in PR #18972:
URL: https://github.com/apache/datafusion/pull/18972#discussion_r2595341236
##########
datafusion/physical-plan/src/joins/hash_join/stream.rs:
##########
@@ -348,6 +371,14 @@ impl HashJoinStream {
build_accumulator: Option<Arc<SharedBuildAccumulator>>,
mode: PartitionMode,
) -> Self {
+ // Create output buffer with coalescing.
+ // Use biggest_coalesce_batch_size to bypass coalescing for batches
+ // that are already close to target size (within 50%).
+ let output_buffer = Box::new(
Review Comment:
This looks consistent with `CoalesceBatches` behaviour
##########
datafusion/physical-plan/src/joins/hash_join/stream.rs:
##########
@@ -388,14 +430,20 @@ impl HashJoinStream {
handle_state!(ready!(self.fetch_probe_batch(cx)))
}
HashJoinStreamState::ProcessProbeBatch(_) => {
- let poll = handle_state!(self.process_probe_batch());
- self.join_metrics.baseline.record_poll(poll)
+ handle_state!(self.process_probe_batch())
}
HashJoinStreamState::ExhaustedProbeSide => {
- let poll =
handle_state!(self.process_unmatched_build_batch());
- self.join_metrics.baseline.record_poll(poll)
+ handle_state!(self.process_unmatched_build_batch())
+ }
+ HashJoinStreamState::Completed => {
+ // Flush any remaining buffered data
+ if !self.output_buffer.is_empty() {
+ self.output_buffer.finish_buffered_batch()?;
+ // Continue loop to emit the flushed batch
+ continue;
+ }
+ Poll::Ready(None)
}
Review Comment:
Cosmetic change, would change like this :
```suggestion
HashJoinStreamState::Completed if
!self.output_buffer.is_empty() => {
// Flush any remaining buffered data
self.output_buffer.finish_buffered_batch()?;
// Continue loop to emit the flushed batch
continue;
}
HashJoinStreamState::Completed => Poll::Ready(None),
```
make match slightly easier to read? I guess it should not have performance
implications.
btw, moving `next_completed_batch()` to the top of the method makes it much
easier to read and understand.
##########
datafusion/physical-plan/src/joins/join_hash_map.rs:
##########
@@ -388,14 +398,16 @@ pub fn get_matched_indices_with_limit_offset<T>(
hash_values: &[u64],
limit: usize,
offset: JoinHashMapOffset,
-) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>)
+ input_indices: &mut Vec<u32>,
+ match_indices: &mut Vec<u64>,
+) -> Option<JoinHashMapOffset>
where
T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
<T as TryFrom<usize>>::Error: Debug,
T: ArrowNativeType,
{
- let mut input_indices = Vec::with_capacity(limit);
- let mut match_indices = Vec::with_capacity(limit);
+ input_indices.clear();
Review Comment:
minor comment, as whole hash join code is quite complex maybe comment here
why indices are cleared would make it easier to understand in future
##########
datafusion/physical-optimizer/src/coalesce_batches.rs:
##########
@@ -57,17 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
let target_batch_size = config.execution.batch_size;
plan.transform_up(|plan| {
let plan_any = plan.as_any();
- let wrap_in_coalesce =
plan_any.downcast_ref::<HashJoinExec>().is_some()
+ let wrap_in_coalesce = plan_any
Review Comment:
one, slightly unrelated comment. There is no need to add `CoalesceBatches`
if input is `CoalesceBatches` as well. I have experience multiple chained CBs
if this rule called multiple times on the same plan. But we can take that as
different PR
##########
datafusion/physical-plan/src/joins/hash_join/stream.rs:
##########
@@ -280,21 +289,35 @@ pub(super) fn lookup_join_hashmap(
hashes_buffer: &[u64],
limit: usize,
offset: JoinHashMapOffset,
+ probe_indices_buffer: &mut Vec<u32>,
+ build_indices_buffer: &mut Vec<u64>,
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
- let (probe_indices, build_indices, next_offset) =
- build_hashmap.get_matched_indices_with_limit_offset(hashes_buffer,
limit, offset);
-
- let build_indices: UInt64Array = build_indices.into();
- let probe_indices: UInt32Array = probe_indices.into();
-
+ let next_offset = build_hashmap.get_matched_indices_with_limit_offset(
+ hashes_buffer,
+ limit,
+ offset,
+ probe_indices_buffer,
+ build_indices_buffer,
+ );
+
+ let build_indices_unfiltered: UInt64Array =
+ std::mem::take(build_indices_buffer).into();
+ let probe_indices_unfiltered: UInt32Array =
+ std::mem::take(probe_indices_buffer).into();
+
+ // TODO: optimize equal_rows_arr to avoid allocation of new buffers
Review Comment:
just a kind reminder, not sure if you intend to follow up on this TODO as
the part of this PR or some other ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]