jayzhan211 commented on code in PR #13133:
URL: https://github.com/apache/datafusion/pull/13133#discussion_r1822769095
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -327,16 +407,96 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
self.loser_tree_adjusted = true;
}
- /// Attempts to update the loser tree, following winner replacement, if
possible
+ /// Resets the poll count by incrementing the reset epoch.
+ fn reset_poll_counts(&mut self) {
+ self.current_reset_epoch += 1;
+ }
+
+ /// Handles tie-breaking logic during the adjustment of the loser tree.
+ ///
+ /// When comparing elements from multiple partitions in the
`update_loser_tree` process, a tie can occur
+ /// between the current winner and a challenger. This function is invoked
when such a tie needs to be
+ /// resolved according to the round-robin tie-breaker mode.
+ ///
+ /// If round-robin tie-breaking is not active, it is enabled, and the poll
counts for all elements are reset.
+ /// The function then compares the poll counts of the current winner and
the challenger:
+ /// - If the winner remains at the top after the final comparison, it
increments the winner's poll count.
+ /// - If the challenger has a lower poll count than the current winner,
the challenger becomes the new winner.
+ /// - If the poll counts are equal but the challenger's index is smaller,
the challenger is preferred.
+ ///
+ /// # Parameters
+ /// - `cmp_node`: The index of the comparison node in the loser tree where
the tie-breaking is happening.
+ /// - `winner`: A mutable reference to the current winner, which may be
updated based on the tie-breaking result.
+ /// - `challenger`: The index of the challenger being compared against the
winner.
+ ///
+ /// This function ensures fair selection among elements with equal values
when tie-breaking mode is enabled,
+ /// aiming to balance the polling across different partitions.
+ #[inline]
+ fn handle_tie(&mut self, cmp_node: usize, winner: &mut usize, challenger:
usize) {
+ if !self.round_robin_tie_breaker_mode {
+ self.round_robin_tie_breaker_mode = true;
+ // Reset poll count for tie-breaker
+ self.reset_poll_counts();
+ }
+ // Update poll count if the winner survives in the final match
+ if *winner == self.loser_tree[0] {
+ self.update_poll_count_on_the_same_value(*winner);
+ if self.is_poll_count_gt(*winner, challenger) {
+ self.update_winner(cmp_node, winner, challenger);
+ }
+ } else if challenger < *winner {
+ // If the winner doesn't survive in the final match, it means the
value has changed.
+ // The polls count are outdated (because the value advanced) but
not yet cleaned-up at this point.
+ // Given the value is equal, we choose the smaller index if the
value is the same.
+ self.update_winner(cmp_node, winner, challenger);
Review Comment:
> This line is executed when the challenger index is smaller and poll counts
are equal
Poll counts may not be the same, when we have *new winner*, it should have a
different value than the previous tie breaker round. Otherwise, we will choose
the stream in round-robin mode from the smaller index to the larger one.
When we reach the code here, the new winner has the same value with the
challenger, but it has different value than the original winner
(`self.loser_tree[0]`). In this case, we just need to compare with the index
since
this should be the new round of the tie breaker, polls count doesn't change
the result.
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -327,16 +407,96 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
self.loser_tree_adjusted = true;
}
- /// Attempts to update the loser tree, following winner replacement, if
possible
+ /// Resets the poll count by incrementing the reset epoch.
+ fn reset_poll_counts(&mut self) {
+ self.current_reset_epoch += 1;
+ }
+
+ /// Handles tie-breaking logic during the adjustment of the loser tree.
+ ///
+ /// When comparing elements from multiple partitions in the
`update_loser_tree` process, a tie can occur
+ /// between the current winner and a challenger. This function is invoked
when such a tie needs to be
+ /// resolved according to the round-robin tie-breaker mode.
+ ///
+ /// If round-robin tie-breaking is not active, it is enabled, and the poll
counts for all elements are reset.
+ /// The function then compares the poll counts of the current winner and
the challenger:
+ /// - If the winner remains at the top after the final comparison, it
increments the winner's poll count.
+ /// - If the challenger has a lower poll count than the current winner,
the challenger becomes the new winner.
+ /// - If the poll counts are equal but the challenger's index is smaller,
the challenger is preferred.
+ ///
+ /// # Parameters
+ /// - `cmp_node`: The index of the comparison node in the loser tree where
the tie-breaking is happening.
+ /// - `winner`: A mutable reference to the current winner, which may be
updated based on the tie-breaking result.
+ /// - `challenger`: The index of the challenger being compared against the
winner.
+ ///
+ /// This function ensures fair selection among elements with equal values
when tie-breaking mode is enabled,
+ /// aiming to balance the polling across different partitions.
+ #[inline]
+ fn handle_tie(&mut self, cmp_node: usize, winner: &mut usize, challenger:
usize) {
+ if !self.round_robin_tie_breaker_mode {
+ self.round_robin_tie_breaker_mode = true;
+ // Reset poll count for tie-breaker
+ self.reset_poll_counts();
+ }
+ // Update poll count if the winner survives in the final match
+ if *winner == self.loser_tree[0] {
+ self.update_poll_count_on_the_same_value(*winner);
+ if self.is_poll_count_gt(*winner, challenger) {
+ self.update_winner(cmp_node, winner, challenger);
+ }
+ } else if challenger < *winner {
+ // If the winner doesn't survive in the final match, it means the
value has changed.
+ // The polls count are outdated (because the value advanced) but
not yet cleaned-up at this point.
+ // Given the value is equal, we choose the smaller index if the
value is the same.
+ self.update_winner(cmp_node, winner, challenger);
Review Comment:
> This line is executed when the challenger index is smaller and poll counts
are equal
Poll counts may not be the same, when we have *new winner*, it should have a
different value than the previous tie breaker round. Otherwise, we will choose
the stream in round-robin mode from the smaller index to the larger one.
When we reach the code here, the new winner has the same value with the
challenger, but it has different value than the original winner
(`self.loser_tree[0]`). In this case, we just need to compare with the index
since
this should be the new round of the tie breaker, polls count doesn't change
the result.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]