eejbyfeldt commented on code in PR #12565:
URL: https://github.com/apache/datafusion/pull/12565#discussion_r1769514218


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -578,7 +579,13 @@ impl GroupedHashAggregateStream {
 /// [`GroupsAccumulatorAdapter`] if not.
 pub(crate) fn create_group_accumulator(
     agg_expr: &AggregateFunctionExpr,
+    group_by: &PhysicalGroupBy,
 ) -> Result<Box<dyn GroupsAccumulator>> {
+    // GROUPING is a special fxn that exposes info about group organization
+    if let Some(grouping) = 
agg_expr.fun().inner().as_any().downcast_ref::<Grouping>() {
+        let args = agg_expr.all_expressions().args;
+        return grouping.create_grouping_accumulator(&args, &group_by.expr);
+    }

Review Comment:
   If we need special handling like this it seems to me that we should consider 
just making `Grouping` a build in. 
   
   Or we should probably make it more generic so it can be used to implement 
some other function.  But since the input is is just the bitmaks and the output 
is the same. I wonder if there are any conceivable functions that could not 
just be implemented as a transformation on a builtin grouping function.



##########
datafusion/functions-aggregate/src/grouping.rs:
##########
@@ -59,9 +73,55 @@ impl Grouping {
     /// Create a new GROUPING aggregate function.
     pub fn new() -> Self {
         Self {
-            signature: Signature::any(1, Volatility::Immutable),
+            signature: Signature::variadic_any(Volatility::Immutable),
         }
     }
+
+    /// Create an accumulator for GROUPING(grouping_args) in a GROUP BY over 
group_exprs
+    /// A special creation function is necessary because GROUPING has unusual 
input requirements.
+    pub fn create_grouping_accumulator(
+        &self,
+        grouping_args: &[Arc<dyn PhysicalExpr>],
+        group_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+    ) -> Result<Box<dyn GroupsAccumulator>> {
+        if grouping_args.len() > 32 {
+            return plan_err!(
+                "GROUPING is supported for up to 32 columns. Consider another \
+                GROUPING statement if you need to aggregate over more columns."
+            );
+        }
+        // The PhysicalExprs of grouping_exprs must be Column PhysicalExpr. 
Because if
+        // the group by PhysicalExpr in SQL is non-Column PhysicalExpr, then 
there is
+        // a ProjectionExec before AggregateExec to convert the non-column 
PhysicalExpr
+        // to Column PhysicalExpr.
+        let column_index =
+            |expr: &Arc<dyn PhysicalExpr>| match 
expr.as_any().downcast_ref::<Column>() {
+                Some(column) => Ok(column.index()),
+                None => internal_err!("Grouping doesn't support expr: {}", 
expr),
+            };

Review Comment:
   This is only true when one enabled the optimizer rule 
`CommonSubexprEliminate` . Does not seems like a acceptable to depend on 
optimizer rules for correctness/basic support.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to