korowa commented on code in PR #13995:
URL: https://github.com/apache/datafusion/pull/13995#discussion_r1903949142


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2743,6 +2754,143 @@ mod tests {
         Ok(())
     }
 
+    // test for https://github.com/apache/datafusion/issues/13949
+    async fn run_test_with_spill_pool_if_necessary(

Review Comment:
   I suppose it'll be better to move this test to other aggregate tests in 
`datafusion/physical-plan/src/mod.rs`



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -522,7 +527,7 @@ impl GroupedHashAggregateStream {
         let spill_state = SpillState {
             spills: vec![],
             spill_expr,
-            spill_schema: Arc::clone(&agg_schema),
+            spill_schema: partial_agg_schema,

Review Comment:
   It seems like the issue was related only to 
`AggregateMode::Single[Partitioned]` cases, since for both Final and 
FinalPartitioned, there is a 
[reassignment](https://github.com/apache/datafusion/blob/487b952cf1a748cc79724638f13e66761a6665e2/datafusion/physical-plan/src/aggregates/row_hash.rs#L969)
 right before spilling (the new value is a schema for Partial output which is 
exactly group_by + state fields). Perhaps we can remove this reassignment now 
and rely on original spill_schema value set on stream creation (before removing 
it, we need to ensure that spill schema will be equal to intermediate result 
schema for any aggregation mode which supports spilling)?



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -802,6 +807,45 @@ impl RecordBatchStream for GroupedHashAggregateStream {
     }
 }
 
+// fix https://github.com/apache/datafusion/issues/13949
+/// Builds a **partial aggregation** schema by combining the group columns and
+/// the accumulator state columns produced by each aggregate expression.
+///
+/// # Why Partial Aggregation Schema Is Needed
+///
+/// In a multi-stage (partial/final) aggregation strategy, each 
partial-aggregate
+/// operator produces *intermediate* states (e.g., partial sums, counts) rather
+/// than final scalar values. These extra columns do **not** exist in the 
original
+/// input schema (which may be something like `[colA, colB, ...]`). Instead,
+/// each aggregator adds its own internal state columns (e.g., `[acc_state_1, 
acc_state_2, ...]`).
+///
+/// Therefore, when we spill these intermediate states or pass them to another
+/// aggregation operator, we must use a schema that includes both the group
+/// columns **and** the partial-state columns. Otherwise, using the original 
input
+/// schema to read partial states will result in a column-count mismatch error.
+///
+/// This helper function constructs such a schema:
+/// `[group_col_1, group_col_2, ..., state_col_1, state_col_2, ...]`
+/// so that partial aggregation data can be handled consistently.
+fn build_partial_agg_schema(

Review Comment:
   Perhaps instead of the new helper we could reuse 
[aggregates::create_schema](https://github.com/apache/datafusion/blob/487b952cf1a748cc79724638f13e66761a6665e2/datafusion/physical-plan/src/aggregates/mod.rs#L895)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to