2010YOUY01 commented on code in PR #13995:
URL: https://github.com/apache/datafusion/pull/13995#discussion_r1903119766


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2743,6 +2753,110 @@ mod tests {
         Ok(())
     }
 
+    // test for https://github.com/apache/datafusion/issues/13949
+    async fn run_test_with_spill_pool_if_necessary(pool_size: usize) -> 
Result<()> {
+        fn create_record_batch(
+            schema: &Arc<Schema>,
+            data: (Vec<u32>, Vec<f64>),
+        ) -> Result<RecordBatch> {
+            Ok(RecordBatch::try_new(
+                Arc::clone(schema),
+                vec![
+                    Arc::new(UInt32Array::from(data.0)),
+                    Arc::new(Float64Array::from(data.1)),
+                ],
+            )?)
+        }
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::UInt32, false),
+            Field::new("b", DataType::Float64, false),
+        ]));
+
+        let batches = vec![
+            create_record_batch(&schema, (vec![2, 3, 4, 4], vec![1.0, 2.0, 
3.0, 4.0]))?,
+            create_record_batch(&schema, (vec![2, 3, 4, 4], vec![1.0, 2.0, 
3.0, 4.0]))?,
+        ];
+        let plan: Arc<dyn ExecutionPlan> =
+            Arc::new(MemoryExec::try_new(&[batches], schema.clone(), None)?);
+
+        let grouping_set = PhysicalGroupBy::new(
+            vec![(physical_col("a", &schema)?, "a".to_string())],
+            vec![],
+            vec![vec![false]],
+        );
+
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![
+            Arc::new(
+                AggregateExprBuilder::new(
+                    datafusion_functions_aggregate::min_max::min_udaf(),
+                    vec![physical_col("b", &schema)?],
+                )
+                .schema(schema.clone())
+                .alias("MIN(b)")
+                .build()?,
+            ),
+            Arc::new(
+                AggregateExprBuilder::new(
+                    datafusion_functions_aggregate::min_max::max_udaf(),
+                    vec![physical_col("b", &schema)?],
+                )
+                .schema(schema.clone())
+                .alias("MAX(b)")
+                .build()?,
+            ),
+        ];
+
+        let single_aggregate = Arc::new(AggregateExec::try_new(
+            AggregateMode::Single,
+            grouping_set,
+            aggregates,
+            vec![None, None],
+            plan,
+            schema.clone(),
+        )?);
+
+        let batch_size = 2;
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        let task_ctx = Arc::new(
+            TaskContext::default()
+                
.with_session_config(SessionConfig::new().with_batch_size(batch_size))
+                .with_runtime(Arc::new(
+                    RuntimeEnvBuilder::new()
+                        .with_memory_pool(memory_pool)
+                        .build()?,
+                )),
+        );
+
+        let result =
+            common::collect(single_aggregate.execute(0, 
Arc::clone(&task_ctx))?).await?;
+

Review Comment:
   I suggest to add an assertion here to make sure spilling actually happened 
for certain test cases. Like:
   ```rust
           let metrics = single_aggregate.metrics();
           // ...and assert some metrics inside like 'spill count' is > 0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to