waynexia commented on code in PR #10208:
URL: https://github.com/apache/datafusion/pull/10208#discussion_r1582037662
##########
datafusion/expr/src/aggregate_function.rs:
##########
@@ -320,10 +320,12 @@ impl AggregateFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this
function or the execution panics.
match self {
- AggregateFunction::Count =>
Signature::variadic_any(Volatility::Immutable),
- AggregateFunction::ApproxDistinct
- | AggregateFunction::Grouping
- | AggregateFunction::ArrayAgg => Signature::any(1,
Volatility::Immutable),
+ AggregateFunction::Count | AggregateFunction::Grouping => {
+ Signature::variadic_any(Volatility::Immutable)
+ }
Review Comment:
To my understanding, this is the key change in the user-faced behavior of
this PR, supporting `grouping()` over multiple columns.
##########
datafusion/physical-expr/src/aggregate/grouping.rs:
##########
@@ -65,6 +82,12 @@ impl AggregateExpr for Grouping {
Ok(Field::new(&self.name, DataType::Int32, self.nullable))
}
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ not_impl_err!(
+ "physical plan is not yet implemented for GROUPING aggregate
function"
+ )
+ }
Review Comment:
Can we implement `Accumulator` for `GroupingGroupsAccumulator` and then
implement this method?
##########
datafusion/physical-expr/src/aggregate/grouping.rs:
##########
@@ -96,8 +113,172 @@ impl PartialEq<dyn Any> for Grouping {
self.name == x.name
&& self.data_type == x.data_type
&& self.nullable == x.nullable
- && self.expr.eq(&x.expr)
+ && self.exprs.len() == x.exprs.len()
+ && self
+ .exprs
+ .iter()
+ .zip(x.exprs.iter())
+ .all(|(expr1, expr2)| expr1.eq(expr2))
})
.unwrap_or(false)
}
}
+
+#[derive(Debug)]
+struct GroupingGroupsAccumulator {
+ /// Grouping columns' indices in grouping set
+ indices: Vec<usize>,
+
+ /// Mask per group.
+ ///
+ /// Note this is an i32 and not a u32 (or usize) because the
+ /// output type of grouping is `DataType::Int32`. Thus by using `i32`
+ /// for the grouping, the output [`Int32Array`] can be created
+ /// without copy.
+ masks: Vec<i32>,
+}
+
+impl GroupingGroupsAccumulator {
+ pub fn new(
+ grouping_exprs: &[Arc<dyn PhysicalExpr>],
+ group_by_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+ ) -> Result<Self> {
+ macro_rules! downcast_column {
+ ($EXPR:expr) => {{
+ if let Some(column) = $EXPR.as_any().downcast_ref::<Column>() {
+ column
+ } else {
+ return Err(DataFusionError::Execution(
+ "Grouping only supports grouping set which only
contains Column Expr".to_string(),
+ ));
+ }
+ }}
+ }
+
+ // collect column indices of group_by_exprs, only Column Expr
+ let mut group_by_column_indices =
Vec::with_capacity(group_by_exprs.len());
+ for (group_by_expr, _) in group_by_exprs.iter() {
+ let column = downcast_column!(group_by_expr);
+ group_by_column_indices.push(column.index());
+ }
+
+ // collect grouping_exprs' indices in group_by_exprs list, eg:
+ // SQL: SELECT c1, c2, grouping(c2, c1) FROM t GROUP BY ROLLUP(c1, c2);
+ // group_by_exprs: [c1, c2]
+ // grouping_exprs: [c2, c1]
+ // indices: [1, 0]
+ let mut indices = Vec::with_capacity(grouping_exprs.len());
+ for grouping_expr in grouping_exprs {
+ let column = downcast_column!(grouping_expr);
+ indices.push(find_grouping_column_index(
+ &group_by_column_indices,
+ column.index(),
+ )?);
+ }
+
+ Ok(Self {
+ indices,
+ masks: vec![],
+ })
+ }
+}
+
+fn find_grouping_column_index(
+ group_by_column_indices: &[usize],
+ grouping_column_index: usize,
+) -> Result<usize> {
+ for (i, group_by_column_index) in
group_by_column_indices.iter().enumerate() {
+ if grouping_column_index == *group_by_column_index {
+ return Ok(i);
+ }
+ }
+ Err(DataFusionError::Execution(
+ "Not found grouping column in group by columns".to_string(),
+ ))
+}
+
+fn compute_mask(indices: &[usize], grouping_set: &[bool]) -> i32 {
+ let mut mask = 0;
+ for (i, index) in indices.iter().rev().enumerate() {
+ if grouping_set[*index] {
+ mask |= 1 << i;
+ }
+ }
+ mask
+}
+
+impl GroupsAccumulator for GroupingGroupsAccumulator {
Review Comment:
From the comment of `GroupsAccumulator`
https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html#notes-on-implementing-groupaccumulator:
>All aggregates must first implement the simpler
[Accumulator](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html)
trait, which handles state for a single group. Implementing GroupsAccumulator
is optional and is harder to implement than Accumulator, but can be much faster
for queries with many group values.
I suppose this grouping group accumulator can also follow this to implement
`Accumulator` as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]