rkrishn7 commented on code in PR #17529: URL: https://github.com/apache/datafusion/pull/17529#discussion_r2356322889
########## datafusion/physical-plan/src/joins/hash_join/information_passing.rs: ########## @@ -0,0 +1,612 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for passing information from the build side of a hash join to the probe side. +//! This is also known as "sideways information passing", and enables optimizations that take +//! of the smaller build side to reduce data processed on the larger probe side. +//! +//! As an example, let's consider TPC-H Query 17: +//! +//! ```sql +//! select +//! sum(l_extendedprice) / 7.0 as avg_yearly +//! from +//! lineitem, +//! part +//! where +//! p_partkey = l_partkey +//! and p_brand = 'Brand#23' +//! and p_container = 'MED BOX' +//! and l_quantity < ( +//! select +//! 0.2 * avg(l_quantity) +//! from +//! lineitem +//! where +//! l_partkey = p_partkey +//! ); +//! ``` +//! The join portion of the query should look something like this: +//! +//! ```text +//! │ +//! │ +//! 2044 Rows │ +//! │ +//! ▼ +//! ┌────────────────┐ +//! │ HashJoin │ +//! │ p_partkey = │ +//! │ l_partkey │ +//! └──┬─────────┬───┘ +//! 2M Rows │ │ 60M Rows +//! │ │ +//! │ │ +//! ┌────────┘ └─────────┐ +//! │ │ This scan decodes 60M values of l_quantity and l_extendedprice, +//! ▼ ▼ even though all but 2044 are filtered by the join! +//! ┌──────────────────┐ ┌─────────────────────┐ +//! │Scan: part │ │Scan: lineitem │ │ +//! │projection: │ │projection: │ │ +//! │ p_partkey │ │ l_quantity, │ │ +//! │filters: │ │ l_extendedprice, │◀─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │ p_brand = .. │ │ l_partkey │ +//! │ p_container = ..│ │filters: │ +//! │ │ │ NONE │ +//! └──────────────────┘ └─────────────────────┘ +//! ``` +//! +//! The key observation is that the scan of `lineitem` produces 60 million rows, but only 2044 of them +//! will pass the join condition. If we can push down a filter to the scan of `lineitem` that only limits +//! the number of rows produced, we can avoid a lot of unnecessary work. +//! +//! Given that in a hash join, we fully process the build side (in this case, `part`) before scanning partitions +//! of the probe side (`lineitem`), we can collect information from the build side that helps us construct filters +//! for the probe side. This allows us to transform the above plan into something like this: +//! +//! ```text +//! │ +//! │ +//! 2044 Rows │ +//! │ +//! ▼ +//! ┌────────────────┐ +//! │ HashJoin │ +//! │ p_partkey = │ +//! │ l_partkey │ +//! └──┬─────────┬───┘ +//! 2M Rows │ │ 60M Rows +//! │ │ +//! │ │ +//! ┌────────┘ └─────────┐ +//! │ │ +//! ▼ ▼ +//! ┌──────────────────┐ ┌──────────────────────────────┐ +//! │Scan: part │ │Scan: lineitem │ Now, the scan contains a filter that takes into account +//! │projection: │ │projection: │ min/max bounds from the left side. This enables scans that +//! │ p_partkey │ │ l_quantity, │ take advantage of late materialization to avoid decoding +//! │filters: │ │ l_extendedprice, │ l_quantity and l_extendedprice for rows that do not match. +//! │ p_brand = .. │ │ l_partkey │ │ +//! │ p_container = ..│ │filters: │ │ +//! └──────────────────┘ │ l_partkey >= min(p_partkey) │ │ +//! │ and │ ◀─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │ l_partkey <= max(p_partkey) │ +//! │ and │ +//! │ ... │ +//! └──────────────────────────────┘ +//! ``` +//! +//! Dynamic filters are expressions that allow us to pass information sideways in a query plan. They essentially start out +//! as dummy filters that are always true (resulting in no selectivity), and may be updated later during query execution. +//! In the case of a hash join, we update dynamic filters after fully processing the build side, and before scanning the probe side. +//! +//! References: +//! - <https://github.com/apache/datafusion/issues/7955> +//! - <https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/> + +use std::{any::Any, fmt, hash::Hash, sync::Arc}; + +use crate::joins::utils::JoinHashMapType; +use crate::joins::utils::NoHashHasher; +use crate::joins::utils::NoHashSet; +use crate::joins::PartitionMode; +use crate::ExecutionPlan; +use crate::ExecutionPlanProperties; + +use ahash::RandomState; +use arrow::{ + array::BooleanArray, + buffer::MutableBuffer, + datatypes::{DataType, Schema}, + util::bit_util, +}; +use datafusion_common::utils::memory::estimate_memory_size; +use datafusion_common::HashSet; +use datafusion_common::{hash_utils::create_hashes, Result, ScalarValue}; +use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_expr::{ColumnarValue, Operator}; +use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::PhysicalExpr; + +use itertools::Itertools; +use parking_lot::Mutex; +use tokio::sync::Barrier; + +/// Represents the minimum and maximum values for a specific column. +/// Used in dynamic filter pushdown to establish value boundaries. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ColumnBounds { + /// The minimum value observed for this column + min: ScalarValue, + /// The maximum value observed for this column + max: ScalarValue, +} + +impl ColumnBounds { + pub(crate) fn new(min: ScalarValue, max: ScalarValue) -> Self { + Self { min, max } + } +} + +/// Represents the bounds for all join key columns from a single partition. +/// This contains the min/max values computed from one partition's build-side data. +#[derive(Debug, Clone)] +pub(crate) struct PartitionBounds { + /// Partition identifier for debugging and determinism (not strictly necessary) + partition: usize, + /// Min/max bounds for each join key column in this partition. + /// Index corresponds to the join key expression index. + column_bounds: Vec<ColumnBounds>, +} + +impl PartitionBounds { + pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self { + Self { + partition, + column_bounds, + } + } + + pub(crate) fn len(&self) -> usize { + self.column_bounds.len() + } + + pub(crate) fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> { + self.column_bounds.get(index) + } +} + +/// Coordinates information collected from the build side of the join, across multiple partitions +/// +/// This structure ensures that dynamic filters are built with complete information from all +/// relevant partitions before being applied to probe-side scans. Incomplete filters would +/// incorrectly eliminate valid join results. +/// +/// ## Synchronization Strategy +/// +/// 1. Each partition relays values computed from its build-side data (e.g. min/max bounds, available hashes) +/// 2. The per-partition information is accumulated in shared state +/// 3. A [`Barrier`] is used to track reporters. Once the last partition reports, the information is merged and the filter is updated exactly once +/// +/// ## Partition Counting +/// +/// The `total_partitions` count represents how many times `collect_build_side` will be called: +/// +/// - **CollectLeft**: Number of output partitions (each accesses shared build data) +/// - **Partitioned**: Number of input partitions (each builds independently) +/// +/// ## Thread Safety +/// +/// All fields use a single mutex to ensure correct coordination between concurrent +/// partition executions. +pub(crate) struct SharedBuildAccumulator { + /// Shared state protected by a single mutex to avoid ordering concerns + inner: Mutex<SharedBuildState>, + /// Synchronization barrier to track when all partitions have reported + barrier: Barrier, + /// Dynamic filter for pushdown to probe side + dynamic_filter: Arc<DynamicFilterPhysicalExpr>, + /// Right side join expressions needed for creating filter bounds + on_right: Vec<Arc<dyn PhysicalExpr>>, + /// Random state used for hash computation + random_state: &'static RandomState, +} + +/// State protected by SharedBuildAccumulator's mutex +struct SharedBuildState { + /// Bounds from completed partitions. + /// Each element represents the column bounds computed by one partition. + bounds: Vec<PartitionBounds>, + /// Hashes from the left (build) side, if enabled + left_hashes: NoHashSet<u64>, Review Comment: Ah yeah that's essentially the same as what I was referring to earlier with using a `Vec<Arc<dyn JoinHashMapType>>` (sorry probably should've clarified more). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org