Friede80 opened a new issue, #13949:
URL: https://github.com/apache/datafusion/issues/13949

   ### Describe the bug
   
   There is an issue when using multiple aggregations and triggering a spill in 
a `GroupedHashAggregateStream`.
   
   A query like `SELECT MIN(b), MAX(b) FROM table GROUP BY a` results in an 
error
   ```
   ArrowError(InvalidArgumentError("number of columns(3) must match number of 
fields(2) in schema"), None)
   ```
   
   The problematic schema is captured here: 
https://github.com/apache/datafusion/blob/9b5995fa024d95c19e1270447e13f3c9dd428c69/datafusion/physical-plan/src/aggregates/row_hash.rs#L967-L969
   And the error gets thrown from `emit` here: 
https://github.com/apache/datafusion/blob/9b5995fa024d95c19e1270447e13f3c9dd428c69/datafusion/physical-plan/src/aggregates/row_hash.rs#L950
   It takes the schema of the input batch and then uses that schema for the 
intermediate aggregate data to spill to disk. These schemas are clearly not 
going to match, but I can't quite grok where exactly things have gone wrong.
   
   ### To Reproduce
   
   
   Below is a minimal example which should be analogous to a query like `SELECT 
MIN(b), MAX(b) FROM table GROUP BY a`. The `batch_size` and memory pool size 
are set small to trigger a spill.
   
   ```rust
   use arrow::array::{Float64Array, RecordBatch, UInt32Array};
   use arrow_schema::{DataType, Field, Schema};
   use datafusion::execution::runtime_env::RuntimeEnvBuilder;
   use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
   use datafusion::{
       execution::{memory_pool::FairSpillPool, TaskContext},
       physical_expr::aggregate::AggregateExprBuilder,
       physical_plan::{
           aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
           common,
           expressions::col,
           memory::MemoryExec,
           udaf::AggregateFunctionExpr,
           ExecutionPlan,
       },
       prelude::SessionConfig,
   };
   use datafusion_common::Result;
   use std::sync::Arc;
   
   #[tokio::test]
   async fn reproduce_spill_schema_error() -> Result<()> {
       let schema = Arc::new(Schema::new(vec![
           Field::new("a", DataType::UInt32, false),
           Field::new("b", DataType::Float64, false),
       ]));
   
       let batches = vec![
           RecordBatch::try_new(
               schema.clone(),
               vec![
                   Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
                   Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
               ],
           )?,
           RecordBatch::try_new(
               schema.clone(),
               vec![
                   Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
                   Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
               ],
           )?,
       ];
       let plan: Arc<dyn ExecutionPlan> =
           Arc::new(MemoryExec::try_new(&[batches], schema.clone(), None)?);
   
       let grouping_set = PhysicalGroupBy::new(
           vec![(col("a", &schema)?, "a".to_string())],
           vec![],
           vec![vec![false]],
       );
   
       let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![
           Arc::new(
               AggregateExprBuilder::new(min_udaf(), vec![col("b", &schema)?])
                   .schema(schema.clone())
                   .alias("MIN(b)")
                   .build()?,
           ),
           Arc::new(
               AggregateExprBuilder::new(max_udaf(), vec![col("b", &schema)?])
                   .schema(schema.clone())
                   .alias("MAX(b)")
                   .build()?,
           ),
       ];
   
       let single_aggregate = Arc::new(AggregateExec::try_new(
           AggregateMode::Single,
           grouping_set,
           aggregates,
           vec![None, None],
           plan,
           schema.clone(),
       )?);
   
       let batch_size = 2;
       let memory_pool = Arc::new(FairSpillPool::new(1600));
       let task_ctx = Arc::new(
           TaskContext::default()
               
.with_session_config(SessionConfig::new().with_batch_size(batch_size))
               .with_runtime(Arc::new(
                   RuntimeEnvBuilder::new()
                       .with_memory_pool(memory_pool)
                       .build()?,
               )),
       );
   
       let _result =
           common::collect(single_aggregate.execute(0, 
Arc::clone(&task_ctx))?).await?;
       Ok(())
   }
   ```
   
   ### Expected behavior
   
   I expect the test to complete successfully.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to