alamb commented on code in PR #15520:
URL: https://github.com/apache/datafusion/pull/15520#discussion_r2025546904


##########
datafusion/core/tests/memory_limit/mod.rs:
##########
@@ -524,6 +526,93 @@ async fn test_external_sort_zero_merge_reservation() {
     assert!(spill_count > 0);
 }
 
+// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
+// ------------------------------------------------------------------
+
+// Create a new `SessionContext` with speicified disk limit and memory pool 
limit
+async fn setup_context(
+    disk_limit: u64,
+    memory_pool_limit: usize,
+) -> Result<SessionContext> {
+    let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
+
+    let disk_manager = Arc::try_unwrap(disk_manager)
+        .expect("DiskManager should be a single instance")
+        .with_max_temp_directory_size(disk_limit)?;
+
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
+        .build_arc()
+        .unwrap();
+
+    let runtime = Arc::new(RuntimeEnv {
+        memory_pool: runtime.memory_pool.clone(),
+        disk_manager: Arc::new(disk_manager),
+        cache_manager: runtime.cache_manager.clone(),
+        object_store_registry: runtime.object_store_registry.clone(),
+    });
+
+    let config = SessionConfig::new()
+        .with_sort_spill_reservation_bytes(10 * 1024 * 1024) // 10MB
+        .with_target_partitions(1);
+
+    Ok(SessionContext::new_with_config_rt(config, runtime))
+}
+
+/// If the spilled bytes exceed the disk limit, the query should fail
+/// (specified by `max_temp_directory_size` in `DiskManager`)
+#[tokio::test]
+async fn test_disk_spill_limit_reached() -> Result<()> {
+    let ctx = setup_context(10 * 1024 * 1024, 60 * 1024 * 1024).await?;
+
+    let df = ctx
+        .sql("select * from generate_series(1, 1000000000000) as t1(v1) order 
by v1")
+        .await
+        .unwrap();
+
+    let err = df.collect().await.unwrap_err();
+    assert_contains!(
+    err.to_string(),
+    "The used disk space during the spilling process has exceeded the 
allowable limit"
+    );
+
+    Ok(())
+}
+
+/// External query should succeed, if the spilled bytes is less than the disk 
limit
+/// Also verify that after the query is finished, all the disk usage accounted 
by
+/// tempfiles are cleaned up.
+#[tokio::test]
+async fn test_disk_spill_limit_not_reached() -> Result<()> {
+    let disk_spill_limit = 100 * 1024 * 1024; // 100MB

Review Comment:
   do we really need to generate 100MB to test temporary file space? Could we 
perhaps lower this to something less resource intensive like 1MB (and reduce 
the argument to `generate_series`)?



##########
datafusion/physical-plan/src/spill/in_progress_spill_file.rs:
##########
@@ -70,6 +75,11 @@ impl InProgressSpillFile {
         }
         if let Some(writer) = &mut self.writer {
             let (spilled_rows, spilled_bytes) = writer.write(batch)?;
+            if let Some(in_progress_file) = &mut self.in_progress_file {
+                in_progress_file.update_disk_usage()?;

Review Comment:
   It is quite nice that this is encapsulated as part of `InProgressSpillFile` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to