jonathanc-n commented on code in PR #17632: URL: https://github.com/apache/datafusion/pull/17632#discussion_r2357425738
########## datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs: ########## @@ -167,24 +183,112 @@ impl SharedBoundsAccumulator { }; Self { inner: Mutex::new(SharedBoundsState { - bounds: Vec::with_capacity(expected_calls), + bounds: Vec::with_capacity(total_partitions), + completed_partitions: HashSet::new(), + filter_optimized: false, }), - barrier: Barrier::new(expected_calls), + total_partitions, dynamic_filter, on_right, + config_options: Arc::new(ConfigOptions::default()), } } - /// Create a filter expression from individual partition bounds using OR logic. - /// - /// This creates a filter where each partition's bounds form a conjunction (AND) - /// of column range predicates, and all partitions are combined with OR. - /// - /// For example, with 2 partitions and 2 columns: - /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) - /// OR - /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1)) - pub(crate) fn create_filter_from_partition_bounds( + /// Create hash expression for the join keys: hash(col1, col2, ...) + fn create_hash_expression(&self) -> Result<Arc<dyn PhysicalExpr>> { + // Use the same random state as RepartitionExec for consistent partitioning + // This ensures hash(row) % num_partitions produces the same partition assignment + // as the original repartitioning operation + let hash_udf = Arc::new(ScalarUDF::from(Hash::new_with_random_state( + REPARTITION_RANDOM_STATE, + ))); + + // Create the hash expression using ScalarFunctionExpr + Ok(Arc::new(ScalarFunctionExpr::new( + "hash", + hash_udf, + self.on_right.clone(), + Field::new("hash_result", DataType::UInt64, false).into(), + Arc::clone(&self.config_options), + ))) + } + + /// Create a bounds predicate for a single partition: (col >= min AND col <= max) for all columns + fn create_partition_bounds_predicate( + &self, + partition_bounds: &PartitionBounds, + ) -> Result<Arc<dyn PhysicalExpr>> { + let mut column_predicates = Vec::with_capacity(partition_bounds.len()); + + for (col_idx, right_expr) in self.on_right.iter().enumerate() { + if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) as Arc<dyn PhysicalExpr>; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) as Arc<dyn PhysicalExpr>; + let range_expr = + Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc<dyn PhysicalExpr>; + column_predicates.push(range_expr); + } + } + + // Combine all column predicates for this partition with AND + if column_predicates.is_empty() { Review Comment: If we return `lit(true)` here it will nullify the filter once everything is `or`ed together in the final phase. Maybe we can pass back an Option and skip the partition filter that returns `None` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org