Copilot commented on code in PR #17632:
URL: https://github.com/apache/datafusion/pull/17632#discussion_r2356586737


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -244,39 +304,65 @@ impl SharedBoundsAccumulator {
         Ok(combined_predicate)
     }
 
-    /// Report bounds from a completed partition and update dynamic filter if 
all partitions are done
-    ///
-    /// This method coordinates the dynamic filter updates across all 
partitions. It stores the
-    /// bounds from the current partition, increments the completion counter, 
and when all
-    /// partitions have reported, creates an OR'd filter from individual 
partition bounds.
-    ///
-    /// This method is async and uses a [`tokio::sync::Barrier`] to wait for 
all partitions
-    /// to report their bounds. Once that occurs, the method will resolve for 
all callers and the
-    /// dynamic filter will be updated exactly once.
+    /// Create progressive filter from completed partitions using AND logic
+    pub(crate) fn create_progressive_filter_from_partition_bounds(
+        &self,
+        bounds: &[PartitionBounds],
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        if bounds.is_empty() {
+            return Ok(lit(true));
+        }
+
+        // Create a progressive filter for each completed partition
+        let mut partition_filters = Vec::with_capacity(bounds.len());
+
+        for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
+            let progressive_filter =
+                self.create_progressive_partition_filter(partition_bounds)?;
+            partition_filters.push(progressive_filter);
+        }
+
+        // Combine all partition filters with AND
+        let combined_filter = partition_filters
+            .into_iter()
+            .reduce(|acc, filter| {
+                Arc::new(BinaryExpr::new(acc, Operator::And, filter))
+                    as Arc<dyn PhysicalExpr>
+            })
+            .unwrap_or_else(|| lit(true));
+
+        Ok(combined_filter)
+    }
+
+    /// Report bounds from a completed partition and immediately update the 
dynamic filter
     ///
-    /// # Note
+    /// This method applies progressive filtering by immediately injecting a 
filter for each
+    /// completed partition. The filter uses hash-based expressions to ensure 
correctness:
+    /// `(hash(cols) % num_partitions != partition_id OR col >= min AND col <= 
max)`
     ///
-    /// As barriers are reusable, it is likely an error to call this method 
more times than the
-    /// total number of partitions - as it can lead to pending futures that 
never resolve. We rely
-    /// on correct usage from the caller rather than imposing additional 
checks here. If this is a concern,
-    /// consider making the resulting future shared so the ready result can be 
reused.
+    /// When all partitions have completed, the filter is optimized to remove 
hash checks.
     ///
     /// # Arguments
     /// * `left_side_partition_id` - The identifier for the **left-side** 
partition reporting its bounds
     /// * `partition_bounds` - The bounds computed by this partition (if any)
     ///
     /// # Returns
     /// * `Result<()>` - Ok if successful, Err if filter update failed
-    pub(crate) async fn report_partition_bounds(
+    pub(crate) fn report_partition_bounds(
         &self,
         left_side_partition_id: usize,
         partition_bounds: Option<Vec<ColumnBounds>>,
     ) -> Result<()> {
-        // Store bounds in the accumulator - this runs once per partition
-        if let Some(bounds) = partition_bounds {
-            let mut guard = self.inner.lock();
+        let mut inner = self.inner.lock();
 
-            let should_push = if let Some(last_bound) = guard.bounds.last() {
+        // Skip if this partition already reported (using improved 
deduplication logic from HEAD)

Review Comment:
   The comment references 'improved deduplication logic from HEAD' which is 
unclear and may become outdated. Consider updating this comment to describe the 
actual deduplication behavior being implemented.
   ```suggestion
           // Skip processing if this partition has already reported its bounds 
to prevent duplicate updates
   ```



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -167,24 +175,107 @@ impl SharedBoundsAccumulator {
         };
         Self {
             inner: Mutex::new(SharedBoundsState {
-                bounds: Vec::with_capacity(expected_calls),
+                bounds: Vec::with_capacity(total_partitions),
+                completed_partitions: HashSet::new(),
+                filter_optimized: false,
             }),
-            barrier: Barrier::new(expected_calls),
+            total_partitions,
             dynamic_filter,
             on_right,
         }
     }
 
-    /// Create a filter expression from individual partition bounds using OR 
logic.
-    ///
-    /// This creates a filter where each partition's bounds form a conjunction 
(AND)
-    /// of column range predicates, and all partitions are combined with OR.
-    ///
-    /// For example, with 2 partitions and 2 columns:
-    /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= 
p0_max1)
-    ///  OR
-    ///  (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= 
p1_max1))
-    pub(crate) fn create_filter_from_partition_bounds(
+    /// Create hash expression for the join keys: hash(col1, col2, ...)
+    fn create_hash_expression(&self) -> Result<Arc<dyn PhysicalExpr>> {
+        // Use the hash function with the same random state as hash joins for 
consistency
+        let hash_udf = Arc::new(ScalarUDF::from(Hash::new()));
+
+        // Create the hash expression using ScalarFunctionExpr
+        Ok(Arc::new(ScalarFunctionExpr::new(
+            "hash",
+            hash_udf,
+            self.on_right.clone(),
+            Field::new("hash_result", DataType::UInt64, false).into(),
+            Arc::new(ConfigOptions::default()),
+        )))

Review Comment:
   Creating a new `ConfigOptions::default()` wrapped in `Arc` for every hash 
expression is inefficient. Consider caching this value as a field in 
`SharedBoundsAccumulator` to avoid repeated allocations.



##########
datafusion/functions/src/hash.rs:
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash Arrays UDF: A scalar function that hashes one or more arrays using 
the same algorithm as DataFusion's join operations
+
+use std::any::Any;
+use std::sync::Arc;
+
+use ahash::RandomState;
+use arrow::array::{Array, UInt64Array};
+use arrow::datatypes::DataType;
+use datafusion_common::hash_utils::create_hashes;
+use datafusion_common::{plan_err, Result};
+use datafusion_expr::{
+    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
+    TypeSignature, Volatility,
+};
+use datafusion_macros::user_doc;
+
+#[user_doc(
+    doc_section(label = "Hashing Functions"),
+    description = "Computes hash values for one or more arrays using 
DataFusion's internal hash algorithm. When multiple arrays are provided, their 
hash values are combined using the same logic as multi-column joins.",
+    syntax_example = "hash(array1 [, array2, ...])",
+    sql_example = r#"```sql
+-- Hash a single array
+SELECT hash([1, 2, 3]);
+
+-- Hash multiple arrays (combines their values)
+SELECT hash([1, 2, 3], ['a', 'b', 'c']);
+
+-- Hash arrays from table columns
+SELECT hash(col1, col2) FROM table;
+```"#,
+    standard_argument(
+        name = "array1",
+        prefix = "First array to hash (any supported type)"
+    )
+)]
+#[derive(Debug)]
+pub struct Hash {
+    signature: Signature,
+    /// RandomState for consistent hashing - using the same seed as hash joins
+    random_state: RandomState,
+}
+
+impl PartialEq for Hash {
+    fn eq(&self, other: &Self) -> bool {
+        // RandomState doesn't implement PartialEq, so we just compare 
signatures
+        self.signature == other.signature
+    }
+}
+
+impl Eq for Hash {}
+
+impl std::hash::Hash for Hash {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        // Only hash the signature since RandomState doesn't implement Hash
+        self.signature.hash(state);
+    }
+}
+
+impl Default for Hash {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Hash {
+    pub fn new() -> Self {
+        // Use the same seed as hash joins for consistency
+        let random_state =
+            RandomState::with_seeds('H' as u64, 'A' as u64, 'S' as u64, 'H' as 
u64);

Review Comment:
   The hardcoded seed values ('H', 'A', 'S', 'H') should be defined as 
constants to ensure consistency across the codebase and make them easier to 
maintain. Consider defining these as module-level constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to