UBarney commented on code in PR #16889: URL: https://github.com/apache/datafusion/pull/16889#discussion_r2234007180
########## datafusion/physical-plan/src/joins/nlj.rs: ########## @@ -0,0 +1,803 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of the Nested Loop Join operator. +//! +//! For detailed information regarding the operator's state machine and execution flow, +//! please refer to the documentation provided in the `poll_next()` method. + +use arrow::buffer::MutableBuffer; +use arrow::compute::BatchCoalescer; +use futures::{ready, StreamExt}; +use log::debug; +use std::sync::Arc; +use std::task::Poll; + +use crate::joins::nested_loop_join::JoinLeftData; +use crate::joins::utils::{ + apply_join_filter_to_indices, build_batch_from_indices_maybe_empty, + need_produce_result_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + OnceFut, +}; +use crate::metrics::Count; +use crate::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::array::{ + BooleanArray, BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, + UInt64Builder, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ + internal_datafusion_err, unwrap_or_internal_err, DataFusionError, JoinSide, Result, +}; +use datafusion_expr::JoinType; + +use futures::Stream; + +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, +} + +pub(crate) struct NLJStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc<Schema>, + /// join filter + pub(crate) join_filter: Option<JoinFilter>, + /// type of the join + pub(crate) join_type: JoinType, + /// the outer table data of the nested loop join + pub(crate) outer_table: SendableRecordBatchStream, + /// the inner table data of the nested loop join + pub(crate) inner_table: OnceFut<JoinLeftData>, + /// Information of index and left / right placement of columns + pub(crate) column_indices: Vec<ColumnIndex>, + /// Join execution metrics + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + cfg_batch_size: usize, + + /// Should we use a bitmap to track each incoming right batch's each row's + /// 'joined' status. + /// For example in right joins, we have to use a bit map to track matched + /// right side rows, and later enter a `EmitRightUnmatched` stage to emit + /// unmatched right rows. + should_track_unmatched_right: bool, + + // ======================================================================== + // STATE FLAGS/BUFFERS: + // Fields that hold intermediate data/flags during execution + // ======================================================================== + /// State Tracking + state: NLJState, + /// Output buffer holds the join result to output. It will emit eagerly when + /// the threshold is reached. + output_buffer: Box<BatchCoalescer>, + /// See comments in `NLJState::Done` for its purpose + handled_empty_output: bool, + + // Buffer(left) side + // ----------------- + /// The current buffered left data to join + buffered_left_data: Option<Arc<JoinLeftData>>, + /// Index into the left buffered batch. Used in `ProbeRight` state + l_index: usize, + /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state + emit_cursor: u64, + /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched` + /// state is over. + left_exhausted: bool, + /// If we can buffer all left data in one pass + /// TODO(now): this is for the (unimplemented) memory-limited execution + #[allow(dead_code)] + left_buffered_in_one_pass: bool, + + // Probe(right) side + // ----------------- + /// The current probe batch to process + current_right_batch: Option<RecordBatch>, + // For right join, keep track of matched rows in `current_right_batch` + // Constructed when fetching each new incoming right batch in `FetchingRight` state. + current_right_batch_matched: Option<BooleanBufferBuilder>, +} + +impl Stream for NLJStream { + type Item = Result<RecordBatch>; + + /// # Design + /// + /// The high-level control flow for this operator is: + /// 1. Buffer all batches from the left side (unless memory limit is reached, + /// in which case see notes at 'Memory-limited Execution'). + /// - Rationale: The right side scanning can be expensive (it might + /// include decoding Parquet files), so it tries to buffer more left + /// batches at once to minimize the scan passes. + /// 2. Read right side batch one at a time. For each iteration, it only + /// evaluates the join filter on (1-left-row x right-batch), and puts the Review Comment: > The new implementation can also preserve left order in some cases Only when `right_table.num_rows() < batch_size ` ? For join types like inner join, we are no longer preserving the order of the right table. I'm **not** sure if this is a breaking change. It's possible that some users rely on this behavior, especially those who create an ExecutionPlan directly, bypassing the DataFusion optimizer. (Are there any users who do this?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org