alamb commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2093618262


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -339,6 +344,35 @@ impl SkipAggregationProbe {
 /// │ 2 │ 2     │ 3.0 │    │ 2 │ 2     │ 3.0 │                   └────────────┘
 /// └─────────────────┘    └─────────────────┘
 /// ```
+///
+/// # Blocked approach for intermediate results
+///
+/// An important optimization for [`group_values`] and [`accumulators`]
+/// is to manage such intermediate results using the blocked approach.
+///
+/// In the original method, intermediate results are managed within a single 
large block

Review Comment:
   Is it possible (not in this PR) to eventually remove the original, single 
block approach?



##########
datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs:
##########
@@ -205,20 +284,462 @@ where
                     Some(_) => self.null_group.take(),
                     None => None,
                 };
-                let mut split = self.values.split_off(n);
-                std::mem::swap(&mut self.values, &mut split);
+
+                let single_block = self.values.last_mut().unwrap();
+                let mut split = single_block.split_off(n);
+                mem::swap(single_block, &mut split);
                 build_primitive(split, null_group)
             }
+
+            // ===============================================
+            // Emitting in blocked mode
+            // ===============================================
+            EmitTo::NextBlock => {
+                let (total_num_groups, block_size) = if !self.is_emitting() {
+                    // Similar as `EmitTo:All`, we will clear the old index 
infos(like `map`)

Review Comment:
   is this still a todo item?



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -639,6 +686,53 @@ pub(crate) fn create_group_accumulator(
     }
 }
 
+/// Check if we can enable the blocked optimization for `GroupValues` and 
`GroupsAccumulator`s.
+/// The blocked optimization will be enabled when:
+///   - When `enable_aggregation_blocked_groups` is true(default to true)
+///   - It is not streaming aggregation(because blocked mode can't support 
Emit::first(exact n))
+///   - The spilling is disabled(still need to consider more to support it 
efficiently)
+///   - The accumulator is not empty(I am still not sure about logic in this 
case)
+///   - [`GroupValues::supports_blocked_groups`] and all 
[`GroupsAccumulator::supports_blocked_groups`] are true
+///
+/// [`GroupValues::supports_blocked_groups`]: 
crate::aggregates::group_values::GroupValues::supports_blocked_groups
+/// [`GroupsAccumulator::supports_blocked_groups`]: 
datafusion_expr::GroupsAccumulator::supports_blocked_groups
+///
+// TODO: support blocked optimization in streaming, spilling, and maybe empty 
accumulators case?

Review Comment:
   yes, I think the goal should be to support blocked optimizations in all 
these cases (so we can remove the existing code)
   
   I can file tickets to help organize this work (obviously you don't have to 
do so)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to