rluvaton opened a new issue, #15530: URL: https://github.com/apache/datafusion/issues/15530
### Describe the bug when using aggregate exec with single mode, and spilling and the group by expressions are not the first expressions from the previous plan there will be schema mismatch ### To Reproduce ```rust #[cfg(test)] mod tests { use std::fmt::{Display, Formatter}; use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::Result; use datafusion::execution::memory_pool::FairSpillPool; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::TaskContext; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_expr::expressions::{lit, Column}; use datafusion::physical_plan::aggregates::{PhysicalGroupBy, AggregateExec, AggregateMode}; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::ExecutionPlan; use rand::{random, thread_rng, Rng}; use std::sync::Arc; use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; use parking_lot::RwLock; #[tokio::test] async fn test_debug() -> Result<()> { let scan_schema = Arc::new(Schema::new(vec![ Field::new("col_0", DataType::Int64, true), Field::new("col_1", DataType::Utf8, true), Field::new("col_2", DataType::Utf8, true), Field::new("col_3", DataType::Utf8, true), Field::new("col_4", DataType::Utf8, true), Field::new("col_5", DataType::Int32, true), Field::new("col_6", DataType::Utf8, true), Field::new("col_7", DataType::Utf8, true), Field::new("col_8", DataType::Utf8, true), ])); let group_by = PhysicalGroupBy::new_single(vec![ (Arc::new(Column::new("col_1", 1)), "col_1".to_string()), (Arc::new(Column::new("col_7", 7)), "col_7".to_string()), (Arc::new(Column::new("col_0", 0)), "col_0".to_string()), (Arc::new(Column::new("col_8", 8)), "col_8".to_string()), ]); fn generate_int64_array() -> ArrayRef { Arc::new(Int64Array::from_iter_values( (0..8192).map(|_| random::<i64>()), )) } fn generate_int32_array() -> ArrayRef { Arc::new(Int32Array::from_iter_values( (0..8192).map(|_| random::<i32>()), )) } fn generate_string_array() -> ArrayRef { Arc::new(StringArray::from( (0..8192) .map(|_| -> String { thread_rng() .sample_iter::<char, _>(rand::distributions::Standard) .take(10) .collect() }) .collect::<Vec<_>>(), )) } fn generate_record_batch(schema: &SchemaRef) -> Result<RecordBatch> { RecordBatch::try_new( Arc::clone(&schema), vec![ generate_int64_array(), generate_string_array(), generate_string_array(), generate_string_array(), generate_string_array(), generate_int32_array(), generate_string_array(), generate_string_array(), generate_string_array(), ], ) .map_err(|err| err.into()) } let aggregate_expressions = vec![Arc::new( AggregateExprBuilder::new(sum_udaf(), vec![lit(1i64)]) .schema(Arc::clone(&scan_schema)) .alias("SUM(1i64)") .build()?, )]; #[derive(Debug)] struct Generator { index: usize, count: usize, schema: SchemaRef, } impl Display for Generator { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Generator") } } impl LazyBatchGenerator for Generator { fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> { if self.index > self.count { return Ok(None); } let batch = generate_record_batch(&self.schema)?; self.index += 1; Ok(Some(batch)) } } let generator = Generator { index: 0, count: 10, schema: Arc::clone(&scan_schema), }; let plan: Arc<dyn ExecutionPlan> = Arc::new(LazyMemoryExec::try_new(Arc::clone(&scan_schema), vec![Arc::new(RwLock::new(generator))])?); let single_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Single, group_by, aggregate_expressions.clone(), vec![None; aggregate_expressions.len()], plan, Arc::clone(&scan_schema), )?); let memory_pool = Arc::new(FairSpillPool::new(10006216)); let task_ctx = Arc::new( TaskContext::default().with_runtime(Arc::new( RuntimeEnvBuilder::new() .with_memory_pool(memory_pool) .build()?, )), ); let res = collect(single_aggregate.execute(0, Arc::clone(&task_ctx))?).await; match res { Ok(_) => println!("Success"), Err(e) => { println!("Error: {}", e); return Err(e); }, } Ok(()) } } ``` The following error happen: ``` Error: Internal error: PhysicalExpr Column references column 'col_7' at index 7 (zero-based) but input schema only has 5 columns: ["col_1", "col_7", "col_0", "col_8", "SUM(1i64)[sum]"] backtrace: 0: std::backtrace_rs::backtrace::libunwind::trace ... 1: std::backtrace_rs::backtrace::trace_unsynchronized at ... 2: std::backtrace::Backtrace::create at ... 3: datafusion_common::error::DataFusionError::get_back_trace at <crates>/datafusion-common-46.0.1/src/error.rs:473:30 4: datafusion_physical_expr::expressions::column::Column::bounds_check at <crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:147:13 5: <datafusion_physical_expr::expressions::column::Column as datafusion_physical_expr_common::physical_expr::PhysicalExpr>::evaluate at <crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:126:9 6: datafusion_physical_plan::aggregates::evaluate_group_by::{{closure}} at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1305:25 7: core::iter::adapters::map::map_try_fold::{{closure}} at ... 8-21: std stuff 22: core::iter::traits::iterator::Iterator::collect at ... 23: datafusion_physical_plan::aggregates::evaluate_group_by at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1301:32 24: datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:821:13 25: <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:697:29 26-28: futures stuff 29: datafusion_physical_plan::common::collect::{{closure}} at <crates>/datafusion-physical-plan-46.0.1/src/common.rs:45:36 30: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}} at ./src/repro_bug_agg.rs:145:99 31-51: std and tokio stuff 52: datafusion_pg::repro_bug_agg::tests::test_debug at ./src/repro_bug_agg.rs:147:9 53: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}} at ./src/repro_bug_agg.rs:23:30 . This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker ``` ### Expected behavior should not fail ### Additional context The spill schema is: ``` spill_state.spill_schema: Schema { fields: [ Field { name: "col_1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "col_7", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "col_0", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "col_8", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "SUM(1i64)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, } ``` the issue is that the spilling schema is the output schema of the intermediate results while the group by expressions are the same and because column point to an index rather than by name the index now does not exists -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org