jonathanc-n commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2181125973
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +828,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
handle_state!(self.process_probe_batch())
}
NestedLoopJoinStreamState::ExhaustedProbeSide => {
- handle_state!(self.process_unmatched_build_batch())
+ handle_state!(self.prepare_unmatched_output_indices())
+ }
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+ handle_state!(self.build_unmatched_output())
}
NestedLoopJoinStreamState::Completed => Poll::Ready(None),
};
}
}
+ fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+ let (left_indices, right_indices, start) =
+ self.join_result_status.as_mut().ok_or_else(|| {
+ datafusion_common::_internal_datafusion_err!(
+ "get_next_join_result called without initializing
join_result_status"
+ )
+ })?;
+
+ let left_batch = self
+ .left_data
+ .as_ref()
+ .ok_or_else(|| {
+ datafusion_common::_internal_datafusion_err!("should have
left_batch")
+ })?
+ .batch();
+
+ let right_batch = match &self.state {
+ NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) =>
record_batch,
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch)
=> {
+ record_batch
+ }
+ _ => {
+ return internal_err!(
+ "state should be ProcessProbeBatch or OutputUnmatchBatch"
Review Comment:
```suggestion
"State should be ProcessProbeBatch or
OutputUnmatchedBuildRows"
```
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +828,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
handle_state!(self.process_probe_batch())
}
NestedLoopJoinStreamState::ExhaustedProbeSide => {
- handle_state!(self.process_unmatched_build_batch())
+ handle_state!(self.prepare_unmatched_output_indices())
+ }
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+ handle_state!(self.build_unmatched_output())
}
NestedLoopJoinStreamState::Completed => Poll::Ready(None),
};
}
}
+ fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+ let (left_indices, right_indices, start) =
+ self.join_result_status.as_mut().ok_or_else(|| {
+ datafusion_common::_internal_datafusion_err!(
+ "get_next_join_result called without initializing
join_result_status"
+ )
+ })?;
+
+ let left_batch = self
+ .left_data
+ .as_ref()
+ .ok_or_else(|| {
+ datafusion_common::_internal_datafusion_err!("should have
left_batch")
+ })?
+ .batch();
+
+ let right_batch = match &self.state {
+ NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) =>
record_batch,
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch)
=> {
Review Comment:
```suggestion
NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) |
NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => {
```
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +1000,63 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
let visited_left_side = left_data.bitmap();
let batch = self.state.try_as_process_probe_batch()?;
- match self.batch_transformer.next() {
- None => {
- // Setting up timer & updating input metrics
- self.join_metrics.input_batches.add(1);
- self.join_metrics.input_rows.add(batch.num_rows());
- let timer = self.join_metrics.join_time.timer();
-
- let result = join_left_and_right_batch(
- left_data.batch(),
- batch,
- self.join_type,
- self.filter.as_ref(),
- &self.column_indices,
- &self.schema,
- visited_left_side,
- &mut self.indices_cache,
- self.right_side_ordered,
- );
- timer.done();
+ let binding = self.join_metrics.join_time.clone();
+ let _timer = binding.timer();
+
+ if self.join_result_status.is_none() {
+ let (left_side_indices, right_side_indices) =
join_left_and_right_batch(
+ left_data.batch(),
+ batch,
+ self.join_type,
+ self.filter.as_ref(),
+ visited_left_side,
+ &mut self.indices_cache,
+ self.right_side_ordered,
+ self.intermediate_batch_size,
+ )?;
+ self.join_result_status = Some((left_side_indices,
right_side_indices, 0))
+ }
+
+ let join_result = self.get_next_join_result()?;
- self.batch_transformer.set_batch(result?);
+ match join_result {
+ Some(res) => {
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(res.num_rows());
+
+ Ok(StatefulStreamResult::Ready(Some(res)))
+ }
+ None => {
+ self.state = NestedLoopJoinStreamState::FetchProbeBatch;
+ self.join_result_status = None;
Ok(StatefulStreamResult::Continue)
}
- Some((batch, last)) => {
- if last {
- self.state = NestedLoopJoinStreamState::FetchProbeBatch;
- }
+ }
+ }
- self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(batch.num_rows());
- Ok(StatefulStreamResult::Ready(Some(batch)))
+ fn build_unmatched_output(
+ &mut self,
+ ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+ if matches!(
+ self.state,
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_)
+ ) {
+ let start = Instant::now();
+ let res = self.get_next_join_result()?;
+ self.join_metrics.join_time.add_elapsed(start);
+ match res {
+ Some(res) => Ok(StatefulStreamResult::Ready(Some(res))),
+ None => {
+ self.state = NestedLoopJoinStreamState::Completed;
+ Ok(StatefulStreamResult::Ready(None))
+ }
}
+ } else {
+ internal_err!("state should be OutputUnmatchBatch")
Review Comment:
```suggestion
internal_err!("State should be OutputUnmatchedBuildRows")
```
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -729,10 +716,26 @@ struct NestedLoopJoinStream<T> {
right_side_ordered: bool,
/// Current state of the stream
state: NestedLoopJoinStreamState,
+ #[allow(dead_code)]
+ // TODO: remove this field ??
/// Transforms the output batch before returning.
batch_transformer: T,
/// Result of the left data future
left_data: Option<Arc<JoinLeftData>>,
+
+ // Tracks progress when building join result batches incrementally
+ // Contains (build_indices, probe_indices, processed_count) where:
+ // - build_indices: row indices from build-side table (left table)
+ // - probe_indices: row indices from probe-side table (right table)
+ // - processed_count: number of index pairs already processed into output
batches
+ // We have completed join result for indices [0..processed_count)
+ join_result_status: Option<(
Review Comment:
I still think we should make this into a struct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]