adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2153281386


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -846,8 +846,10 @@ pub struct SortExec {
     common_sort_prefix: Vec<PhysicalSortExpr>,
     /// Cache holding plan properties like equivalences, output partitioning 
etc.
     cache: PlanProperties,
-    /// Filter matching the state of the sort for dynamic filter pushdown
-    filter: Option<Arc<DynamicFilterPhysicalExpr>>,
+    /// Filter matching the state of the sort for dynamic filter pushdown.
+    /// If `fetch` is `Some`, this will also be set and a TopK operator may be 
used.
+    /// If `fetch` is `None`, this will be `None`.
+    filter: Option<TopKDynamicFilters>,

Review Comment:
   I feel like there's some further refactoring that could happen here, e.g. 
split up SortExec, leaving for another day.



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1158,7 +1153,10 @@ impl ExecutionPlan for SortExec {
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
-                    self.filter.clone(),
+                    self.filter
+                        .as_ref()
+                        .expect("Filter should be set when fetch is Some")
+                        .clone(),

Review Comment:
   I refactored so that the `TopK` struct always expects this parameter which 
better reflects the reality of execution. But since it's strangely tied to the 
`fetch` param I am doing an `expect` assertion here. It should never fail at 
runtime.



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -935,11 +940,6 @@ impl SortExec {
         }
     }
 
-    pub fn with_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> 
Self {
-        self.filter = Some(filter);
-        self
-    }

Review Comment:
   This was unused and had slipped through the cracks. I can make a new PR to 
just remove these methods.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -319,13 +341,88 @@ impl TopK {
     /// (a > 2 OR (a = 2 AND b < 3))
     /// ```
     fn update_filter(&mut self) -> Result<()> {
-        let Some(filter) = &self.filter else {
-            return Ok(());
-        };
         let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
             return Ok(());
         };
 
+        // Are the new thresholds more selective than our existing ones?
+        let should_update = {
+            if let Some(current) = self.filter.thresholds.write().as_mut() {
+                assert!(current.len() == thresholds.len());
+                // Check if new thresholds are more selective than current ones
+                let mut more_selective = false;
+                for ((current_value, new_value), sort_expr) in
+                    current.iter().zip(thresholds.iter()).zip(self.expr.iter())
+                {
+                    // Handle null cases
+                    let (current_is_null, new_is_null) =
+                        (current_value.is_null(), new_value.is_null());
+
+                    match (current_is_null, new_is_null) {
+                        (true, true) => {
+                            // Both null, continue checking next values
+                        }
+                        (true, false) => {
+                            // Current is null, new is not null
+                            // For nulls_first: null < non-null, so new value 
is less selective
+                            // For nulls_last: null > non-null, so new value 
is more selective
+                            more_selective = !sort_expr.options.nulls_first;
+                            break;
+                        }
+                        (false, true) => {
+                            // Current is not null, new is null
+                            // For nulls_first: non-null > null, so new value 
is more selective
+                            // For nulls_last: non-null < null, so new value 
is less selective
+                            more_selective = sort_expr.options.nulls_first;
+                            break;
+                        }
+                        (false, false) => {
+                            // Neither is null, compare values
+                            match current_value.partial_cmp(new_value) {
+                                Some(ordering) => {
+                                    match ordering {
+                                        Ordering::Equal => {
+                                            // Continue checking next values
+                                        }
+                                        Ordering::Less => {
+                                            // For descending sort: new > 
current means more selective
+                                            // For ascending sort: new > 
current means less selective
+                                            more_selective = 
sort_expr.options.descending;
+                                            break;
+                                        }
+                                        Ordering::Greater => {
+                                            // For descending sort: new < 
current means less selective
+                                            // For ascending sort: new < 
current means more selective
+                                            more_selective =
+                                                !sort_expr.options.descending;
+                                            break;
+                                        }
+                                    }
+                                }
+                                None => {
+                                    // If values can't be compared, don't 
update
+                                    more_selective = false;
+                                    break;

Review Comment:
   Not sure about this case 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to