berkaysynnada commented on code in PR #13133:
URL: https://github.com/apache/datafusion/pull/13133#discussion_r1822658565


##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -327,16 +407,96 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
         self.loser_tree_adjusted = true;
     }
 
-    /// Attempts to update the loser tree, following winner replacement, if 
possible
+    /// Resets the poll count by incrementing the reset epoch.
+    fn reset_poll_counts(&mut self) {
+        self.current_reset_epoch += 1;
+    }
+
+    /// Handles tie-breaking logic during the adjustment of the loser tree.
+    ///
+    /// When comparing elements from multiple partitions in the 
`update_loser_tree` process, a tie can occur
+    /// between the current winner and a challenger. This function is invoked 
when such a tie needs to be
+    /// resolved according to the round-robin tie-breaker mode.
+    ///
+    /// If round-robin tie-breaking is not active, it is enabled, and the poll 
counts for all elements are reset.
+    /// The function then compares the poll counts of the current winner and 
the challenger:
+    /// - If the winner remains at the top after the final comparison, it 
increments the winner's poll count.
+    /// - If the challenger has a lower poll count than the current winner, 
the challenger becomes the new winner.
+    /// - If the poll counts are equal but the challenger's index is smaller, 
the challenger is preferred.
+    ///
+    /// # Parameters
+    /// - `cmp_node`: The index of the comparison node in the loser tree where 
the tie-breaking is happening.
+    /// - `winner`: A mutable reference to the current winner, which may be 
updated based on the tie-breaking result.
+    /// - `challenger`: The index of the challenger being compared against the 
winner.
+    ///
+    /// This function ensures fair selection among elements with equal values 
when tie-breaking mode is enabled,
+    /// aiming to balance the polling across different partitions.
+    #[inline]
+    fn handle_tie(&mut self, cmp_node: usize, winner: &mut usize, challenger: 
usize) {
+        if !self.round_robin_tie_breaker_mode {
+            self.round_robin_tie_breaker_mode = true;
+            // Reset poll count for tie-breaker
+            self.reset_poll_counts();
+        }
+        // Update poll count if the winner survives in the final match
+        if *winner == self.loser_tree[0] {
+            self.update_poll_count_on_the_same_value(*winner);
+            if self.is_poll_count_gt(*winner, challenger) {
+                self.update_winner(cmp_node, winner, challenger);
+            }
+        } else if challenger < *winner {
+            // If the winner doesn't survive in the final match, it means the 
value has changed.
+            // The polls count are outdated (because the value advanced) but 
not yet cleaned-up at this point.
+            // Given the value is equal, we choose the smaller index if the 
value is the same.
+            self.update_winner(cmp_node, winner, challenger);

Review Comment:
   @jayzhan211 This comment confused me. This line is executed when the 
challenger index is smaller and poll counts are equal. Does it also pass 
through here when winner does not survive because its value has changed?



##########
datafusion/physical-plan/benches/spm.rs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_plan::memory::MemoryExec;
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_physical_plan::{collect, ExecutionPlan};
+
+use criterion::async_executor::FuturesExecutor;
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+
+fn generate_spm_for_round_robin_tie_breaker(

Review Comment:
   I have suggested to @jayzhan211 to take the first steps in creating 
operator-specific benchmarks. I believe there's already a goal for this (I 
recall an older issue related to it). Perhaps we should extract these 
benchmarks from core and port them here @alamb ?



##########
datafusion/physical-plan/src/sorts/sort_preserving_merge.rs:
##########
@@ -326,18 +329,77 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
+    use arrow_array::Int64Array;
     use arrow_schema::SchemaRef;
     use datafusion_common::{assert_batches_eq, assert_contains, 
DataFusionError};
     use datafusion_common_runtime::SpawnedTask;
     use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
     use datafusion_execution::RecordBatchStream;
     use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_expr::EquivalenceProperties;
     use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
     use futures::{FutureExt, Stream, StreamExt};
+    use hashbrown::HashMap;
     use tokio::time::timeout;
 
+    fn generate_task_ctx_for_round_robin_tie_breaker() -> 
Result<Arc<TaskContext>> {
+        let mut pool_per_consumer = HashMap::new();
+        // Bytes from 660_000 to 30_000_000 (or even more) are all valid limits
+        pool_per_consumer.insert("RepartitionExec[0]".to_string(), 10_000_000);
+        pool_per_consumer.insert("RepartitionExec[1]".to_string(), 10_000_000);
+
+        let runtime = RuntimeEnvBuilder::new()
+            // Random large number for total mem limit, we only care about 
RepartitionExec only
+            .with_memory_limit_per_consumer(2_000_000_000, 1.0, 
pool_per_consumer)
+            .build_arc()?;
+        let config = SessionConfig::new();
+        let task_ctx = TaskContext::default()
+            .with_runtime(runtime)
+            .with_session_config(config);
+        Ok(Arc::new(task_ctx))
+    }
+    fn generate_spm_for_round_robin_tie_breaker() -> 
Result<Arc<SortPreservingMergeExec>>
+    {
+        let target_batch_size = 12500;

Review Comment:
   These numbers and the memory limit in these tests are actually correlated 
constants and can’t be adjusted independently. Could we encapsulate or link 
them somehow to emphasize this dependency @jayzhan211 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to