2010YOUY01 commented on code in PR #16996: URL: https://github.com/apache/datafusion/pull/16996#discussion_r2262474885
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -660,529 +684,1180 @@ async fn collect_left_input( )) } -/// This enumeration represents various states of the nested loop join algorithm. -#[derive(Debug, Clone)] -enum NestedLoopJoinStreamState { - /// The initial state, indicating that build-side data not collected yet - WaitBuildSide, - /// Indicates that build-side has been collected, and stream is ready for - /// fetching probe-side - FetchProbeBatch, - /// Indicates that a non-empty batch has been fetched from probe-side, and - /// is ready to be processed - ProcessProbeBatch(RecordBatch), - /// Preparation phase: Gathers the indices of unmatched rows from the build-side. - /// This state is entered for join types that emit unmatched build-side rows - /// (e.g., LEFT and FULL joins) after the entire probe-side input has been consumed. - PrepareUnmatchedBuildRows, - /// Output unmatched build-side rows. - /// The indices for rows to output has already been calculated in the previous - /// `PrepareUnmatchedBuildRows` state. In this state the final batch will be materialized incrementally. - // The inner `RecordBatch` is an empty dummy batch used to get right schema. - OutputUnmatchedBuildRows(RecordBatch), - /// Indicates that NestedLoopJoinStream execution is completed - Completed, -} - -impl NestedLoopJoinStreamState { - /// Tries to extract a `ProcessProbeBatchState` from the - /// `NestedLoopJoinStreamState` enum. Returns an error if state is not - /// `ProcessProbeBatchState`. - fn try_as_process_probe_batch(&mut self) -> Result<&RecordBatch> { - match self { - NestedLoopJoinStreamState::ProcessProbeBatch(state) => Ok(state), - _ => internal_err!("Expected join stream in ProcessProbeBatch state"), - } - } -} - -/// Tracks incremental output of join result batches. -/// -/// Initialized with all matching pairs that satisfy the join predicate. -/// Pairs are stored as indices in `build_indices` and `probe_indices` -/// Each poll outputs a batch within the configured size limit and updates -/// processed_count until all pairs are consumed. -/// -/// Example: 5000 matches, batch size limit is 100 -/// - Poll 1: output batch[0..100], processed_count = 100 -/// - Poll 2: output batch[100..200], processed_count = 200 -/// - ...continues until processed_count = 5000 -struct JoinResultProgress { - /// Row indices from build-side table (left table). - build_indices: PrimitiveArray<UInt64Type>, - /// Row indices from probe-side table (right table). - probe_indices: PrimitiveArray<UInt32Type>, - /// Number of index pairs already processed into output batches. - /// We have completed join result for indices [0..processed_count). - processed_count: usize, +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, } - -/// A stream that issues [RecordBatch]es as they arrive from the right of the join. -struct NestedLoopJoinStream { - /// Input schema - schema: Arc<Schema>, +pub(crate) struct NLJStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // + // Note: The implementation uses the terms left/inner/build-side table and + // right/outer/probe-side table interchangeably. + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc<Schema>, /// join filter - filter: Option<JoinFilter>, + pub(crate) join_filter: Option<JoinFilter>, /// type of the join - join_type: JoinType, - /// the outer table data of the nested loop join - outer_table: SendableRecordBatchStream, - /// the inner table data of the nested loop join - inner_table: OnceFut<JoinLeftData>, - /// Information of index and left / right placement of columns - column_indices: Vec<ColumnIndex>, - // TODO: support null aware equal - // null_equality: NullEquality, + pub(crate) join_type: JoinType, + /// the outer(right) table data of the nested loop join + pub(crate) outer_table: SendableRecordBatchStream, + /// the inner(left) table data of the nested loop join + pub(crate) inner_table: OnceFut<JoinLeftData>, + /// Projection to construct the output schema from the left and right tables. + /// Example: + /// - output_schema: ['a', 'c'] + /// - left_schema: ['a', 'b'] + /// - right_schema: ['c'] + /// + /// The column indices would be [(left, 0), (right, 0)] -- taking the left + /// 0th column and right 0th column can construct the output schema. + /// + /// Note there are other columns ('b' in the example) still kept after + /// projection pushdown; this is because they might be used to evaluate + /// the join filter (e.g., `JOIN ON (b+c)>0`). + pub(crate) column_indices: Vec<ColumnIndex>, /// Join execution metrics - join_metrics: BuildProbeJoinMetrics, - /// Cache for join indices calculations - indices_cache: (UInt64Array, UInt32Array), - /// Whether the right side is ordered - right_side_ordered: bool, - /// Current state of the stream - state: NestedLoopJoinStreamState, - /// Result of the left data future - left_data: Option<Arc<JoinLeftData>>, - - /// Tracks progress when building join result batches incrementally. - join_result_status: Option<JoinResultProgress>, - - intermediate_batch_size: usize, + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + cfg_batch_size: usize, + + /// Should we use a bitmap to track each incoming right batch's each row's + /// 'joined' status. + /// For example in right joins, we have to use a bit map to track matched + /// right side rows, and later enter a `EmitRightUnmatched` stage to emit + /// unmatched right rows. + should_track_unmatched_right: bool, Review Comment: I have updated the comment to better explain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org