suibianwanwank commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2006221885


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -644,10 +737,72 @@ impl RecordBatchStore {
     }
 }
 
+struct TopKDynamicFilterSource {
+    /// The TopK heap that provides the current filters
+    heap: Arc<RwLock<TopKHeap>>,
+    /// The sort expressions used to create the TopK
+    expr: Arc<[PhysicalSortExpr]>,
+}
+
+impl std::fmt::Debug for TopKDynamicFilterSource {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TopKDynamicFilterSource")
+            .field("expr", &self.expr)
+            .finish()
+    }
+}
+
+impl DynamicFilterSource for TopKDynamicFilterSource {
+    fn current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+        let heap_guard = self.heap.read().map_err(|_| {
+            DataFusionError::Internal(
+                "Failed to acquire read lock on TopK heap".to_string(),
+            )
+        })?;
+
+        // Get the threshold values for all sort expressions
+        let Some(thresholds) = heap_guard.get_threshold_values(&self.expr)? 
else {
+            return Ok(vec![]); // No thresholds available yet
+        };
+
+        // Create filter expressions for each threshold
+        let mut filters: Vec<Arc<dyn PhysicalExpr>> =
+            Vec::with_capacity(thresholds.len());
+
+        for threshold in thresholds {
+            // Skip null threshold values - can't create a meaningful filter
+            if threshold.value.is_null() {
+                continue;
+            }
+
+            // Create the appropriate operator based on sort order
+            let op = if threshold.sort_options.descending {
+                // For descending sort, we want col > threshold (exclude 
smaller values)
+                Operator::Gt
+            } else {
+                // For ascending sort, we want col < threshold (exclude larger 
values)
+                Operator::Lt
+            };
+
+            let comparison = Arc::new(BinaryExpr::new(
+                Arc::clone(&threshold.expr),
+                op,
+                lit(threshold.value.clone()),
+            ));
+
+            // TODO: handle nulls first/last?

Review Comment:
   This transformation might work:
   **For nulls-first =>**` (threshold.value is not null) and (threshold.expr is 
null or comparison) `
   **For nulls-last =>** `comparison` // comparison include (threshold.expr is 
not null)
   Since this part of the code already ensures threshold.value is not null
   ```RUST
   // Skip null threshold values - can't create a meaningful filter  
   if threshold.value.is_null() {  
       continue;  
   }  
   ```
   we might be able to simplify: **For nulls-first =>**` (threshold.expr is 
null or comparison) `
   WDYT :)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to