Copilot commented on code in PR #17632: URL: https://github.com/apache/datafusion/pull/17632#discussion_r2356586737
########## datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs: ########## @@ -244,39 +304,65 @@ impl SharedBoundsAccumulator { Ok(combined_predicate) } - /// Report bounds from a completed partition and update dynamic filter if all partitions are done - /// - /// This method coordinates the dynamic filter updates across all partitions. It stores the - /// bounds from the current partition, increments the completion counter, and when all - /// partitions have reported, creates an OR'd filter from individual partition bounds. - /// - /// This method is async and uses a [`tokio::sync::Barrier`] to wait for all partitions - /// to report their bounds. Once that occurs, the method will resolve for all callers and the - /// dynamic filter will be updated exactly once. + /// Create progressive filter from completed partitions using AND logic + pub(crate) fn create_progressive_filter_from_partition_bounds( + &self, + bounds: &[PartitionBounds], + ) -> Result<Arc<dyn PhysicalExpr>> { + if bounds.is_empty() { + return Ok(lit(true)); + } + + // Create a progressive filter for each completed partition + let mut partition_filters = Vec::with_capacity(bounds.len()); + + for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) { + let progressive_filter = + self.create_progressive_partition_filter(partition_bounds)?; + partition_filters.push(progressive_filter); + } + + // Combine all partition filters with AND + let combined_filter = partition_filters + .into_iter() + .reduce(|acc, filter| { + Arc::new(BinaryExpr::new(acc, Operator::And, filter)) + as Arc<dyn PhysicalExpr> + }) + .unwrap_or_else(|| lit(true)); + + Ok(combined_filter) + } + + /// Report bounds from a completed partition and immediately update the dynamic filter /// - /// # Note + /// This method applies progressive filtering by immediately injecting a filter for each + /// completed partition. The filter uses hash-based expressions to ensure correctness: + /// `(hash(cols) % num_partitions != partition_id OR col >= min AND col <= max)` /// - /// As barriers are reusable, it is likely an error to call this method more times than the - /// total number of partitions - as it can lead to pending futures that never resolve. We rely - /// on correct usage from the caller rather than imposing additional checks here. If this is a concern, - /// consider making the resulting future shared so the ready result can be reused. + /// When all partitions have completed, the filter is optimized to remove hash checks. /// /// # Arguments /// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds /// * `partition_bounds` - The bounds computed by this partition (if any) /// /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed - pub(crate) async fn report_partition_bounds( + pub(crate) fn report_partition_bounds( &self, left_side_partition_id: usize, partition_bounds: Option<Vec<ColumnBounds>>, ) -> Result<()> { - // Store bounds in the accumulator - this runs once per partition - if let Some(bounds) = partition_bounds { - let mut guard = self.inner.lock(); + let mut inner = self.inner.lock(); - let should_push = if let Some(last_bound) = guard.bounds.last() { + // Skip if this partition already reported (using improved deduplication logic from HEAD) Review Comment: The comment references 'improved deduplication logic from HEAD' which is unclear and may become outdated. Consider updating this comment to describe the actual deduplication behavior being implemented. ```suggestion // Skip processing if this partition has already reported its bounds to prevent duplicate updates ``` ########## datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs: ########## @@ -167,24 +175,107 @@ impl SharedBoundsAccumulator { }; Self { inner: Mutex::new(SharedBoundsState { - bounds: Vec::with_capacity(expected_calls), + bounds: Vec::with_capacity(total_partitions), + completed_partitions: HashSet::new(), + filter_optimized: false, }), - barrier: Barrier::new(expected_calls), + total_partitions, dynamic_filter, on_right, } } - /// Create a filter expression from individual partition bounds using OR logic. - /// - /// This creates a filter where each partition's bounds form a conjunction (AND) - /// of column range predicates, and all partitions are combined with OR. - /// - /// For example, with 2 partitions and 2 columns: - /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) - /// OR - /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1)) - pub(crate) fn create_filter_from_partition_bounds( + /// Create hash expression for the join keys: hash(col1, col2, ...) + fn create_hash_expression(&self) -> Result<Arc<dyn PhysicalExpr>> { + // Use the hash function with the same random state as hash joins for consistency + let hash_udf = Arc::new(ScalarUDF::from(Hash::new())); + + // Create the hash expression using ScalarFunctionExpr + Ok(Arc::new(ScalarFunctionExpr::new( + "hash", + hash_udf, + self.on_right.clone(), + Field::new("hash_result", DataType::UInt64, false).into(), + Arc::new(ConfigOptions::default()), + ))) Review Comment: Creating a new `ConfigOptions::default()` wrapped in `Arc` for every hash expression is inefficient. Consider caching this value as a field in `SharedBoundsAccumulator` to avoid repeated allocations. ########## datafusion/functions/src/hash.rs: ########## @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash Arrays UDF: A scalar function that hashes one or more arrays using the same algorithm as DataFusion's join operations + +use std::any::Any; +use std::sync::Arc; + +use ahash::RandomState; +use arrow::array::{Array, UInt64Array}; +use arrow::datatypes::DataType; +use datafusion_common::hash_utils::create_hashes; +use datafusion_common::{plan_err, Result}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use datafusion_macros::user_doc; + +#[user_doc( + doc_section(label = "Hashing Functions"), + description = "Computes hash values for one or more arrays using DataFusion's internal hash algorithm. When multiple arrays are provided, their hash values are combined using the same logic as multi-column joins.", + syntax_example = "hash(array1 [, array2, ...])", + sql_example = r#"```sql +-- Hash a single array +SELECT hash([1, 2, 3]); + +-- Hash multiple arrays (combines their values) +SELECT hash([1, 2, 3], ['a', 'b', 'c']); + +-- Hash arrays from table columns +SELECT hash(col1, col2) FROM table; +```"#, + standard_argument( + name = "array1", + prefix = "First array to hash (any supported type)" + ) +)] +#[derive(Debug)] +pub struct Hash { + signature: Signature, + /// RandomState for consistent hashing - using the same seed as hash joins + random_state: RandomState, +} + +impl PartialEq for Hash { + fn eq(&self, other: &Self) -> bool { + // RandomState doesn't implement PartialEq, so we just compare signatures + self.signature == other.signature + } +} + +impl Eq for Hash {} + +impl std::hash::Hash for Hash { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + // Only hash the signature since RandomState doesn't implement Hash + self.signature.hash(state); + } +} + +impl Default for Hash { + fn default() -> Self { + Self::new() + } +} + +impl Hash { + pub fn new() -> Self { + // Use the same seed as hash joins for consistency + let random_state = + RandomState::with_seeds('H' as u64, 'A' as u64, 'S' as u64, 'H' as u64); Review Comment: The hardcoded seed values ('H', 'A', 'S', 'H') should be defined as constants to ensure consistency across the codebase and make them easier to maintain. Consider defining these as module-level constants. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org