rkrishn7 commented on code in PR #17632:
URL: https://github.com/apache/datafusion/pull/17632#discussion_r2356601560


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -167,24 +175,107 @@ impl SharedBoundsAccumulator {
         };
         Self {
             inner: Mutex::new(SharedBoundsState {
-                bounds: Vec::with_capacity(expected_calls),
+                bounds: Vec::with_capacity(total_partitions),
+                completed_partitions: HashSet::new(),
+                filter_optimized: false,
             }),
-            barrier: Barrier::new(expected_calls),
+            total_partitions,
             dynamic_filter,
             on_right,
         }
     }
 
-    /// Create a filter expression from individual partition bounds using OR 
logic.
-    ///
-    /// This creates a filter where each partition's bounds form a conjunction 
(AND)
-    /// of column range predicates, and all partitions are combined with OR.
-    ///
-    /// For example, with 2 partitions and 2 columns:
-    /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= 
p0_max1)
-    ///  OR
-    ///  (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= 
p1_max1))
-    pub(crate) fn create_filter_from_partition_bounds(
+    /// Create hash expression for the join keys: hash(col1, col2, ...)
+    fn create_hash_expression(&self) -> Result<Arc<dyn PhysicalExpr>> {
+        // Use the hash function with the same random state as hash joins for 
consistency
+        let hash_udf = Arc::new(ScalarUDF::from(Hash::new()));
+
+        // Create the hash expression using ScalarFunctionExpr
+        Ok(Arc::new(ScalarFunctionExpr::new(
+            "hash",
+            hash_udf,
+            self.on_right.clone(),

Review Comment:
   Don't we want to specify the `RandomState` here?
   
   The notion of the "correct" partition (and thus if the bounds are relevant) 
occurs downstream of re-partitioning, since the build side builds hash tables 
and independent bounds according to these partitions.
   
   So I would think we would want to specify the same random state as the 
repartition operator to ensure that the `hash(...) % n != partition_id` portion 
returns the right result, right? Otherwise we may potentially evaluate 
incorrect bounds?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to