2010YOUY01 commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1903119766
########## datafusion/core/src/dataframe/mod.rs: ########## @@ -2743,6 +2753,110 @@ mod tests { Ok(()) } + // test for https://github.com/apache/datafusion/issues/13949 + async fn run_test_with_spill_pool_if_necessary(pool_size: usize) -> Result<()> { + fn create_record_batch( + schema: &Arc<Schema>, + data: (Vec<u32>, Vec<f64>), + ) -> Result<RecordBatch> { + Ok(RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(UInt32Array::from(data.0)), + Arc::new(Float64Array::from(data.1)), + ], + )?) + } + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::Float64, false), + ])); + + let batches = vec![ + create_record_batch(&schema, (vec![2, 3, 4, 4], vec![1.0, 2.0, 3.0, 4.0]))?, + create_record_batch(&schema, (vec![2, 3, 4, 4], vec![1.0, 2.0, 3.0, 4.0]))?, + ]; + let plan: Arc<dyn ExecutionPlan> = + Arc::new(MemoryExec::try_new(&[batches], schema.clone(), None)?); + + let grouping_set = PhysicalGroupBy::new( + vec![(physical_col("a", &schema)?, "a".to_string())], + vec![], + vec![vec![false]], + ); + + let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![ + Arc::new( + AggregateExprBuilder::new( + datafusion_functions_aggregate::min_max::min_udaf(), + vec![physical_col("b", &schema)?], + ) + .schema(schema.clone()) + .alias("MIN(b)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new( + datafusion_functions_aggregate::min_max::max_udaf(), + vec![physical_col("b", &schema)?], + ) + .schema(schema.clone()) + .alias("MAX(b)") + .build()?, + ), + ]; + + let single_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + grouping_set, + aggregates, + vec![None, None], + plan, + schema.clone(), + )?); + + let batch_size = 2; + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(SessionConfig::new().with_batch_size(batch_size)) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )), + ); + + let result = + common::collect(single_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; + Review Comment: I suggest to add an assertion here to make sure spilling actually happened for certain test cases like: ```rust let metrics = single_aggregate.metrics(); // ...and assert some metrics inside like 'spill count' is > 0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org