rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212827518


##########
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##########
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> 
Vec<RecordBatch> {
 
     batches
 }
+
+#[tokio::test]
+async fn test_sort_with_limited_memory() -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    let record_batch_size = pool_size / 16;
+
+    // Basic test with a lot of groups that cannot all fit in memory and 1 
record batch
+    // from each spill file is too much memory
+    let spill_count =
+        run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+            pool_size,
+            task_ctx,
+            number_of_record_batches: 100,
+            get_size_of_record_batch_to_generate: Box::pin(move |_| 
record_batch_size),
+            memory_behavior: Default::default(),
+        })
+        .await?;
+
+    let total_spill_files_size = spill_count * record_batch_size;
+    assert!(
+        total_spill_files_size > pool_size,
+        "Total spill files size {total_spill_files_size} should be greater 
than pool size {pool_size}",
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch() 
-> Result<()>
+{
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |i| {
+            if i % 25 == 1 {
+                pool_size / 4
+            } else {
+                16 * KB
+            }
+        }),
+        memory_behavior: Default::default(),
+    })
+    .await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn 
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_changing_memory_reservation(
+) -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |i| {
+            if i % 25 == 1 {
+                pool_size / 4
+            } else {
+                16 * KB
+            }
+        }),
+        memory_behavior: 
MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10),
+    })
+    .await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn 
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_take_all_memory(
+) -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |i| {
+            if i % 25 == 1 {
+                pool_size / 4
+            } else {
+                16 * KB
+            }
+        }),
+        memory_behavior: MemoryBehavior::TakeAllMemoryAtTheBeginning,
+    })
+    .await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    // Test that the merge degree of multi level merge sort cannot be fixed 
size when there is not enough memory
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 4),
+        memory_behavior: Default::default(),
+    })
+    .await?;
+
+    Ok(())
+}
+
+struct RunSortTestWithLimitedMemoryArgs {

Review Comment:
   moved to separate test for testing spilling fuzz in memory constrained envs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to