alamb commented on code in PR #17245: URL: https://github.com/apache/datafusion/pull/17245#discussion_r2286227253
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -121,13 +122,37 @@ pub struct TopK { /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown - filter: Option<Arc<DynamicFilterPhysicalExpr>>, + filter: TopKDynamicFilters, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. pub(crate) finished: bool, } +#[derive(Debug, Clone)] +pub struct TopKDynamicFilters { + /// The current *global* threshold for the dynamic filter. + /// This is shared across all partitions and is updated by any of them. + /// Stored as row bytes for efficient comparison. + threshold_row: Arc<ArcSwapOption<Vec<u8>>>, Review Comment: As I was going through this PR, it seems to me like the expr should only be updated when we know the threshold row is changed -- but currently there is no synchronization between them 🤔 ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -319,19 +343,89 @@ impl TopK { /// (a > 2 OR (a = 2 AND b < 3)) /// ``` fn update_filter(&mut self) -> Result<()> { - let Some(filter) = &self.filter else { + // If the heap doesn't have k elements yet, we can't create thresholds + let Some(max_row) = self.heap.max() else { return Ok(()); }; - let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { + + let new_threshold_row = &max_row.row; + + // Extract scalar values BEFORE acquiring lock to reduce critical section + let thresholds = match self.heap.get_threshold_values(&self.expr)? { + Some(t) => t, + None => return Ok(()), + }; + + // Fast path: check if the current value in topk is better than what is + // currently set in the filter with a read only lock + let needs_update = self + .filter + .threshold_row + .read() + .as_ref() + .map(|current_row| { + // new < current means new threshold is more selective + new_threshold_row < current_row + }) + .unwrap_or(true); // No current threshold, so we need to set one + + // exit early if the current values are better + if !needs_update { return Ok(()); + } + + // Build the filter expression OUTSIDE any synchronization + let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let new_threshold = new_threshold_row.to_vec(); + + // update the threshold. Since there was a lock gap, we must check if it is still the best + // may have changed while we were building the expression without the lock + let mut current_threshold = self.filter.threshold_row.write(); + let old_threshold = current_threshold.take(); + + // Update filter if we successfully updated the threshold + // (or if there was no previous threshold and we're the first) + match old_threshold { + Some(old_threshold) => { + // new threshold is still better than the old one + if new_threshold.as_slice() < old_threshold.as_slice() { + *current_threshold = Some(new_threshold); Review Comment: Specifically I think this logic is missing from the https://github.com/apache/datafusion/pull/16433 -- that if the other thread updated to a new value, but the new value wasn't as good, we need to update it again ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -121,13 +122,37 @@ pub struct TopK { /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown - filter: Option<Arc<DynamicFilterPhysicalExpr>>, + filter: TopKDynamicFilters, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. pub(crate) finished: bool, } +#[derive(Debug, Clone)] +pub struct TopKDynamicFilters { + /// The current *global* threshold for the dynamic filter. + /// This is shared across all partitions and is updated by any of them. + /// Stored as row bytes for efficient comparison. + threshold_row: Arc<ArcSwapOption<Vec<u8>>>, Review Comment: Specifically, I am thinking that there is a race condition if one thread decides to update threshold_row, but another thread has subsequently updated it The first thread could still update the filter to a less good value of `threshold_row` 🤔 ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -319,19 +342,84 @@ impl TopK { /// (a > 2 OR (a = 2 AND b < 3)) /// ``` fn update_filter(&mut self) -> Result<()> { - let Some(filter) = &self.filter else { + // If the heap doesn't have k elements yet, we can't create thresholds + let Some(max_row) = self.heap.max() else { return Ok(()); }; - let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { - return Ok(()); + + let new_threshold_row = &max_row.row; + + // Extract scalar values BEFORE acquiring lock to reduce critical section + let thresholds = match self.heap.get_threshold_values(&self.expr)? { + Some(t) => t, + None => return Ok(()), }; + // Extract filter expression reference before entering critical section + let filter_expr = Arc::clone(&self.filter.expr); + + // Check if we need to update the threshold (lock-free read) + let current_threshold = self.filter.threshold_row.load(); + let needs_update = match current_threshold.as_ref() { + Some(current_row) => { + // new < current means new threshold is more selective + current_row.as_slice().cmp(new_threshold_row) == Ordering::Greater + } + None => true, // No current threshold, so we need to set one + }; + + // Only proceed if we need to update + if needs_update { + // Build the filter expression OUTSIDE any synchronization + let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let new_threshold_arc = Arc::new(new_threshold_row.to_vec()); + + // Atomically update the threshold using compare-and-swap + let old_threshold = self.filter.threshold_row.compare_and_swap( + ¤t_threshold, + Some(Arc::clone(&new_threshold_arc)), + ); + + // Only update filter if we successfully updated the threshold + // (or if there was no previous threshold and we're the first) + let should_update_filter = + match (old_threshold.as_ref(), current_threshold.as_ref()) { + // We successfully swapped + (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) => true, + // We were the first to set it + (None, None) => true, + // Another thread updated before us, check if our threshold is still better + (Some(actual_old), _) => { + actual_old.as_slice().cmp(new_threshold_row) == Ordering::Greater Review Comment: If another thread updated concurrently, but the updated value wasn't as good, that that means we should also update the current threshold again right? ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -319,19 +343,89 @@ impl TopK { /// (a > 2 OR (a = 2 AND b < 3)) /// ``` fn update_filter(&mut self) -> Result<()> { - let Some(filter) = &self.filter else { + // If the heap doesn't have k elements yet, we can't create thresholds + let Some(max_row) = self.heap.max() else { return Ok(()); }; - let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { + + let new_threshold_row = &max_row.row; + + // Extract scalar values BEFORE acquiring lock to reduce critical section + let thresholds = match self.heap.get_threshold_values(&self.expr)? { + Some(t) => t, + None => return Ok(()), + }; + + // Fast path: check if the current value in topk is better than what is + // currently set in the filter with a read only lock + let needs_update = self + .filter + .threshold_row + .read() + .as_ref() + .map(|current_row| { + // new < current means new threshold is more selective + new_threshold_row < current_row + }) + .unwrap_or(true); // No current threshold, so we need to set one + + // exit early if the current values are better + if !needs_update { return Ok(()); + } + + // Build the filter expression OUTSIDE any synchronization + let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let new_threshold = new_threshold_row.to_vec(); + + // update the threshold. Since there was a lock gap, we must check if it is still the best + // may have changed while we were building the expression without the lock + let mut current_threshold = self.filter.threshold_row.write(); Review Comment: I rewrote the section here that used arc_swap to use a more classic "get a read lock to check on fast path, and then get a write lock and recheck to update" pattern -- I think it avoids the race condition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org