alamb commented on code in PR #17444: URL: https://github.com/apache/datafusion/pull/17444#discussion_r2333837528
########## datafusion/physical-plan/src/joins/hash_join/exec.rs: ########## @@ -1188,29 +1189,123 @@ impl ExecutionPlan for HashJoinExec { } } -/// Compute min/max bounds for each column in the given arrays -fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> { - arrays - .iter() - .map(|array| { - if array.is_empty() { - // Return NULL values for empty arrays - return Ok(ColumnBounds::new( - ScalarValue::try_from(array.data_type())?, - ScalarValue::try_from(array.data_type())?, - )); +/// Accumulator for collecting min/max bounds from build-side data during hash join. +/// +/// This struct encapsulates the logic for progressively computing column bounds +/// (minimum and maximum values) for a specific join key expression as batches +/// are processed during the build phase of a hash join. +/// +/// The bounds are used for dynamic filter pushdown optimization, where filters +/// based on the actual data ranges can be pushed down to the probe side to +/// eliminate unnecessary data early. +struct CollectLeftAccumulator { + /// The physical expression to evaluate for each batch + expr: Arc<dyn PhysicalExpr>, + /// Accumulator for tracking the minimum value across all batches + min: MinAccumulator, Review Comment: I think using the min/max accumulators is a great idea It may also help the symptoms @LiaCastaneda is reporting in - https://github.com/apache/datafusion/issues/17486 As I believe I remember there is a better optimized implementation for Lists in those accumulators than what was here previously ########## datafusion/physical-plan/src/joins/hash_join/exec.rs: ########## @@ -1254,24 +1350,48 @@ async fn collect_left_input( // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = left_stream - .try_fold(initial, |mut acc, batch| async { + let initial = BuildSideState::try_new( + metrics, + reservation, + on_left.clone(), + &schema, + should_compute_bounds, + )?; + + let state = left_stream + .try_fold(initial, |mut state, batch| async move { + // Update accumulators if computing bounds + if let Some(ref mut accumulators) = state.bounds_accumulators { + for accumulator in accumulators { + accumulator.update_batch(&batch)?; + } + } + + // Decide if we spill or not let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + state.reservation.try_grow(batch_size)?; Review Comment: I really like this `state.reservation` rather than `acc.2.` ########## datafusion/physical-plan/Cargo.toml: ########## @@ -53,7 +53,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } -datafusion-functions-aggregate-common = { workspace = true } +datafusion-functions-aggregate = { workspace = true } Review Comment: I worry about bringing in all of the aggregate functions for all physical plans I would personally prefer to have Min/Max accumulators moved into `datafusion-functions-aggregate-common` and avoid this new dependency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org