kosiew commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1903698138
########## datafusion/core/src/dataframe/mod.rs: ########## @@ -2743,6 +2753,110 @@ mod tests { Ok(()) } + // test for https://github.com/apache/datafusion/issues/13949 + async fn run_test_with_spill_pool_if_necessary(pool_size: usize) -> Result<()> { + fn create_record_batch( + schema: &Arc<Schema>, + data: (Vec<u32>, Vec<f64>), + ) -> Result<RecordBatch> { + Ok(RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(UInt32Array::from(data.0)), + Arc::new(Float64Array::from(data.1)), + ], + )?) + } + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::Float64, false), + ])); + + let batches = vec![ + create_record_batch(&schema, (vec![2, 3, 4, 4], vec![1.0, 2.0, 3.0, 4.0]))?, + create_record_batch(&schema, (vec![2, 3, 4, 4], vec![1.0, 2.0, 3.0, 4.0]))?, + ]; + let plan: Arc<dyn ExecutionPlan> = + Arc::new(MemoryExec::try_new(&[batches], schema.clone(), None)?); + + let grouping_set = PhysicalGroupBy::new( + vec![(physical_col("a", &schema)?, "a".to_string())], + vec![], + vec![vec![false]], + ); + + let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![ + Arc::new( + AggregateExprBuilder::new( + datafusion_functions_aggregate::min_max::min_udaf(), + vec![physical_col("b", &schema)?], + ) + .schema(schema.clone()) + .alias("MIN(b)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new( + datafusion_functions_aggregate::min_max::max_udaf(), + vec![physical_col("b", &schema)?], + ) + .schema(schema.clone()) + .alias("MAX(b)") + .build()?, + ), + ]; + + let single_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + grouping_set, + aggregates, + vec![None, None], + plan, + schema.clone(), + )?); + + let batch_size = 2; + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(SessionConfig::new().with_batch_size(batch_size)) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )), + ); + + let result = + common::collect(single_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; + Review Comment: Thanks @2010YOUY01 for the review and suggestions. I have implemented both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org