alamb commented on code in PR #15520: URL: https://github.com/apache/datafusion/pull/15520#discussion_r2025546904
########## datafusion/core/tests/memory_limit/mod.rs: ########## @@ -524,6 +526,93 @@ async fn test_external_sort_zero_merge_reservation() { assert!(spill_count > 0); } +// Tests for disk limit (`max_temp_directory_size` in `DiskManager`) +// ------------------------------------------------------------------ + +// Create a new `SessionContext` with speicified disk limit and memory pool limit +async fn setup_context( + disk_limit: u64, + memory_pool_limit: usize, +) -> Result<SessionContext> { + let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + + let disk_manager = Arc::try_unwrap(disk_manager) + .expect("DiskManager should be a single instance") + .with_max_temp_directory_size(disk_limit)?; + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) + .build_arc() + .unwrap(); + + let runtime = Arc::new(RuntimeEnv { + memory_pool: runtime.memory_pool.clone(), + disk_manager: Arc::new(disk_manager), + cache_manager: runtime.cache_manager.clone(), + object_store_registry: runtime.object_store_registry.clone(), + }); + + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(10 * 1024 * 1024) // 10MB + .with_target_partitions(1); + + Ok(SessionContext::new_with_config_rt(config, runtime)) +} + +/// If the spilled bytes exceed the disk limit, the query should fail +/// (specified by `max_temp_directory_size` in `DiskManager`) +#[tokio::test] +async fn test_disk_spill_limit_reached() -> Result<()> { + let ctx = setup_context(10 * 1024 * 1024, 60 * 1024 * 1024).await?; + + let df = ctx + .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1") + .await + .unwrap(); + + let err = df.collect().await.unwrap_err(); + assert_contains!( + err.to_string(), + "The used disk space during the spilling process has exceeded the allowable limit" + ); + + Ok(()) +} + +/// External query should succeed, if the spilled bytes is less than the disk limit +/// Also verify that after the query is finished, all the disk usage accounted by +/// tempfiles are cleaned up. +#[tokio::test] +async fn test_disk_spill_limit_not_reached() -> Result<()> { + let disk_spill_limit = 100 * 1024 * 1024; // 100MB Review Comment: do we really need to generate 100MB to test temporary file space? Could we perhaps lower this to something less resource intensive like 1MB (and reduce the argument to `generate_series`)? ########## datafusion/physical-plan/src/spill/in_progress_spill_file.rs: ########## @@ -70,6 +75,11 @@ impl InProgressSpillFile { } if let Some(writer) = &mut self.writer { let (spilled_rows, spilled_bytes) = writer.write(batch)?; + if let Some(in_progress_file) = &mut self.in_progress_file { + in_progress_file.update_disk_usage()?; Review Comment: It is quite nice that this is encapsulated as part of `InProgressSpillFile` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org