jayzhan211 commented on code in PR #13133:
URL: https://github.com/apache/datafusion/pull/13133#discussion_r1818373156


##########
datafusion/physical-plan/src/sorts/sort_preserving_merge.rs:
##########
@@ -326,18 +343,87 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
+    use arrow_array::Int64Array;
     use arrow_schema::SchemaRef;
     use datafusion_common::{assert_batches_eq, assert_contains, 
DataFusionError};
     use datafusion_common_runtime::SpawnedTask;
     use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
     use datafusion_execution::RecordBatchStream;
     use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_expr::EquivalenceProperties;
     use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
     use futures::{FutureExt, Stream, StreamExt};
+    use hashbrown::HashMap;
     use tokio::time::timeout;
 
+    fn generate_task_ctx_for_round_robin_tie_breaker() -> 
Result<Arc<TaskContext>> {
+        let mut pool_per_consumer = HashMap::new();
+        // Bytes from 660_000 to 30_000_000 (or even more) are all valid limits
+        pool_per_consumer.insert("RepartitionExec[0]".to_string(), 10_000_000);
+        pool_per_consumer.insert("RepartitionExec[1]".to_string(), 10_000_000);
+
+        let runtime = RuntimeEnvBuilder::new()
+            // Random large number for total mem limit, we only care about 
RepartitionExec only
+            .with_memory_limit_per_consumer(2_000_000_000, 1.0, 
pool_per_consumer)
+            .build_arc()?;
+        let config = SessionConfig::new();
+        let task_ctx = TaskContext::default()
+            .with_runtime(runtime)
+            .with_session_config(config);
+        Ok(Arc::new(task_ctx))
+    }
+    fn generate_spm_for_round_robin_tie_breaker(
+        enable_round_robin_repartition: bool,
+    ) -> Result<Arc<SortPreservingMergeExec>> {
+        let target_batch_size = 12500;
+        let row_size = 12500;
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); 
row_size]));
+        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size]));
+        let rb = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
+
+        let rbs = (0..1024).map(|_| rb.clone()).collect::<Vec<_>>();
+
+        let schema = rb.schema();
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("b", &schema).unwrap(),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("c", &schema).unwrap(),
+                options: Default::default(),
+            },
+        ];
+
+        let exec = MemoryExec::try_new(&[rbs], schema, None).unwrap();
+        let repartition_exec =
+            RepartitionExec::try_new(Arc::new(exec), 
Partitioning::RoundRobinBatch(2))?;
+        let coalesce_batches_exec =
+            CoalesceBatchesExec::new(Arc::new(repartition_exec), 
target_batch_size);
+        let spm = SortPreservingMergeExec::new(sort, 
Arc::new(coalesce_batches_exec))
+            .with_round_robin_repartition(enable_round_robin_repartition);
+        Ok(Arc::new(spm))
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn test_round_robin_tie_breaker_success() -> Result<()> {
+        let task_ctx = generate_task_ctx_for_round_robin_tie_breaker()?;
+        let spm = generate_spm_for_round_robin_tie_breaker(true)?;
+        let _collected = collect(spm, task_ctx).await.unwrap();
+        Ok(())
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn test_round_robin_tie_breaker_fail() -> Result<()> {

Review Comment:
   > Only the repartition operator can grow memory consumption indefinitely, 
SPM's memory consumption should not grow
   
   Yes, this is why we only measure the memory consumption on Repartition, and 
the test here ensure the memory will not exceed a certain limit if round-robin 
is enabled.
   
   > I don't understand why it requries change to MemoryPool to add a 
per-reservation limit, looks like it's also possible to trigger OOM without 
change.
   
   It is really hard to measure the memory change of repartition if we measure 
the total memory (repartiion + spm) together since the SPM mem usage grows to 
the sum of batches at some point which dominates the memory usage over 
Repartition. It is easier to measure repartition only which is what we care 
about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to