korowa commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1903949142
########## datafusion/core/src/dataframe/mod.rs: ########## @@ -2743,6 +2754,143 @@ mod tests { Ok(()) } + // test for https://github.com/apache/datafusion/issues/13949 + async fn run_test_with_spill_pool_if_necessary( Review Comment: I suppose it'll be better to move this test to other aggregate tests in `datafusion/physical-plan/src/mod.rs` ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -522,7 +527,7 @@ impl GroupedHashAggregateStream { let spill_state = SpillState { spills: vec![], spill_expr, - spill_schema: Arc::clone(&agg_schema), + spill_schema: partial_agg_schema, Review Comment: It seems like the issue was related only to `AggregateMode::Single[Partitioned]` cases, since for both Final and FinalPartitioned, there is a [reassignment](https://github.com/apache/datafusion/blob/487b952cf1a748cc79724638f13e66761a6665e2/datafusion/physical-plan/src/aggregates/row_hash.rs#L969) right before spilling (the new value is a schema for Partial output which is exactly group_by + state fields). Perhaps we can remove this reassignment now and rely on original spill_schema value set on stream creation (before removing it, we need to ensure that spill schema will be equal to intermediate result schema for any aggregation mode which supports spilling)? ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -802,6 +807,45 @@ impl RecordBatchStream for GroupedHashAggregateStream { } } +// fix https://github.com/apache/datafusion/issues/13949 +/// Builds a **partial aggregation** schema by combining the group columns and +/// the accumulator state columns produced by each aggregate expression. +/// +/// # Why Partial Aggregation Schema Is Needed +/// +/// In a multi-stage (partial/final) aggregation strategy, each partial-aggregate +/// operator produces *intermediate* states (e.g., partial sums, counts) rather +/// than final scalar values. These extra columns do **not** exist in the original +/// input schema (which may be something like `[colA, colB, ...]`). Instead, +/// each aggregator adds its own internal state columns (e.g., `[acc_state_1, acc_state_2, ...]`). +/// +/// Therefore, when we spill these intermediate states or pass them to another +/// aggregation operator, we must use a schema that includes both the group +/// columns **and** the partial-state columns. Otherwise, using the original input +/// schema to read partial states will result in a column-count mismatch error. +/// +/// This helper function constructs such a schema: +/// `[group_col_1, group_col_2, ..., state_col_1, state_col_2, ...]` +/// so that partial aggregation data can be handled consistently. +fn build_partial_agg_schema( Review Comment: Perhaps instead of the new helper we could reuse [aggregates::create_schema](https://github.com/apache/datafusion/blob/487b952cf1a748cc79724638f13e66761a6665e2/datafusion/physical-plan/src/aggregates/mod.rs#L895)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org