alamb commented on code in PR #17444:
URL: https://github.com/apache/datafusion/pull/17444#discussion_r2333837528


##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1188,29 +1189,123 @@ impl ExecutionPlan for HashJoinExec {
     }
 }
 
-/// Compute min/max bounds for each column in the given arrays
-fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
-    arrays
-        .iter()
-        .map(|array| {
-            if array.is_empty() {
-                // Return NULL values for empty arrays
-                return Ok(ColumnBounds::new(
-                    ScalarValue::try_from(array.data_type())?,
-                    ScalarValue::try_from(array.data_type())?,
-                ));
+/// Accumulator for collecting min/max bounds from build-side data during hash 
join.
+///
+/// This struct encapsulates the logic for progressively computing column 
bounds
+/// (minimum and maximum values) for a specific join key expression as batches
+/// are processed during the build phase of a hash join.
+///
+/// The bounds are used for dynamic filter pushdown optimization, where filters
+/// based on the actual data ranges can be pushed down to the probe side to
+/// eliminate unnecessary data early.
+struct CollectLeftAccumulator {
+    /// The physical expression to evaluate for each batch
+    expr: Arc<dyn PhysicalExpr>,
+    /// Accumulator for tracking the minimum value across all batches
+    min: MinAccumulator,

Review Comment:
   I think using the min/max accumulators is a great idea
   
   It may also help the symptoms @LiaCastaneda is reporting in 
   - https://github.com/apache/datafusion/issues/17486
   
   As I believe I remember there is a better optimized implementation for Lists 
in those accumulators than what was here previously



##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1254,24 +1350,48 @@ async fn collect_left_input(
     // This operation performs 2 steps at once:
     // 1. creates a [JoinHashMap] of all batches from the stream
     // 2. stores the batches in a vector.
-    let initial = (Vec::new(), 0, metrics, reservation);
-    let (batches, num_rows, metrics, mut reservation) = left_stream
-        .try_fold(initial, |mut acc, batch| async {
+    let initial = BuildSideState::try_new(
+        metrics,
+        reservation,
+        on_left.clone(),
+        &schema,
+        should_compute_bounds,
+    )?;
+
+    let state = left_stream
+        .try_fold(initial, |mut state, batch| async move {
+            // Update accumulators if computing bounds
+            if let Some(ref mut accumulators) = state.bounds_accumulators {
+                for accumulator in accumulators {
+                    accumulator.update_batch(&batch)?;
+                }
+            }
+
+            // Decide if we spill or not
             let batch_size = get_record_batch_memory_size(&batch);
             // Reserve memory for incoming batch
-            acc.3.try_grow(batch_size)?;
+            state.reservation.try_grow(batch_size)?;

Review Comment:
   I really like this `state.reservation` rather than `acc.2.`



##########
datafusion/physical-plan/Cargo.toml:
##########
@@ -53,7 +53,7 @@ datafusion-common = { workspace = true, default-features = 
true }
 datafusion-common-runtime = { workspace = true, default-features = true }
 datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
-datafusion-functions-aggregate-common = { workspace = true }
+datafusion-functions-aggregate = { workspace = true }

Review Comment:
   I worry about bringing in all of the aggregate functions for all physical 
plans
   
   I would personally prefer to have Min/Max accumulators moved into 
`datafusion-functions-aggregate-common` and avoid this new dependency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to