kosiew commented on code in PR #16642:
URL: https://github.com/apache/datafusion/pull/16642#discussion_r2182297728


##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -520,10 +520,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
-        Ok(
-            FilterDescription::new_with_child_count(self.children().len())
-                .all_parent_filters_unsupported(parent_filters),
-        )
+        // Default implementation: mark all filters as unsupported for all 
children
+        let mut desc = FilterDescription::new();
+        for _child in self.children() {
+            let child_filters = parent_filters
+                .iter()
+                .map(|f| PredicateSupport::Unsupported(Arc::clone(f)))
+                .collect();
+            desc = desc.with_child(ChildFilterDescription {
+                parent_filters: child_filters,
+                self_filters: vec![],
+            });
+        }

Review Comment:
   Opportunity for reducing DRY.
   
   This loop is effectively a broadcast of “unsupported” to every child. That’s 
exactly what FilterDescription::from_children could do if you passed a helper 
that always returns Unsupported.



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -291,18 +136,8 @@ impl<T> FilterPushdownPropagation<T> {
         }
     }
 
-    /// Create a new [`FilterPushdownPropagation`] that tells the parent node
-    /// that none of the parent filters were not pushed down.
-    pub fn unsupported(parent_filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
-        let unsupported = PredicateSupports::all_unsupported(parent_filters);
-        Self {
-            filters: unsupported,
-            updated_node: None,
-        }
-    }
-
     /// Create a new [`FilterPushdownPropagation`] with the specified filter 
support.
-    pub fn with_filters(filters: PredicateSupports) -> Self {
+    pub fn with_filters(filters: Vec<PredicateSupport>) -> Self {

Review Comment:
   The old .unsupported(...) helper was very explicit. Now, to produce an “all 
unsupported” result we must write:
   
   ```rust
   FilterPushdownPropagation::with_filters(
   filters.into_iter().map(PredicateSupport::Unsupported).collect()
   )
   ```
   
    While this is idiomatic Rust, it forces the developer to think about the 
mechanism (map, collect) rather than the intent ("all of these filters were 
rejected").
   
   How about adding a helper function makes the intent crystal clear at the 
call site.
   
   ```rust
        FilterPushdownPropagation::all_rejected(filters)
   ```
         This is better because:
          * It's self-documenting. The name of the function tells you exactly 
what's happening.
          * It's higher-level. It allows developers to work at the level of 
"what they want to do" rather than "how to do it."



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -317,24 +152,76 @@ impl<T> FilterPushdownPropagation<T> {
 }
 
 #[derive(Debug, Clone)]
-struct ChildFilterDescription {
+pub struct ChildFilterDescription {
     /// Description of which parent filters can be pushed down into this node.
     /// Since we need to transmit filter pushdown results back to this node's 
parent
     /// we need to track each parent filter for each child, even those that 
are unsupported / won't be pushed down.
     /// We do this using a [`PredicateSupport`] which simplifies manipulating 
supported/unsupported filters.
-    parent_filters: PredicateSupports,
+    pub(crate) parent_filters: Vec<PredicateSupport>,
     /// Description of which filters this node is pushing down to its children.
     /// Since this is not transmitted back to the parents we can have variable 
sized inner arrays
     /// instead of having to track supported/unsupported.
-    self_filters: Vec<Arc<dyn PhysicalExpr>>,
+    pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
 }
 
 impl ChildFilterDescription {
-    fn new() -> Self {
-        Self {
-            parent_filters: PredicateSupports::new(vec![]),
-            self_filters: vec![],
+    /// Build a child filter description by analyzing which parent filters can 
be pushed to a specific child.
+    ///
+    /// See [`FilterDescription::from_children`] for more details
+    pub fn from_child(
+        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+        child: &Arc<dyn crate::ExecutionPlan>,
+    ) -> Result<Self> {
+        let child_schema = child.schema();
+
+        // Get column names from child schema for quick lookup
+        let child_column_names: HashSet<&str> = child_schema
+            .fields()
+            .iter()
+            .map(|f| f.name().as_str())
+            .collect();

Review Comment:
   We could be redundantly rebuilding the entire `HashSet` of  column names for 
the exact same `ExecutionPlan` node multiple times during the optimization 
process.
   
     Here’s a scenario to illustrate:
   
   
     Imagine a complex query plan. A specific node, let's say 
ParquetScan(file1.parquet), exists as a shared reference (Arc<dyn 
ExecutionPlan>) and might be evaluated in different contexts as the optimizer 
walks the plan tree.
   
   
      1. First Encounter: The optimizer analyzes a FilterNode that has 
ParquetScan(file1.parquet) as a child. It calls
         ChildFilterDescription::from_child. This function iterates through the 
schema of the parquet scan and builds a HashSet of its column names for the 
first time.
   
   
      2. Second Encounter: Later in the same optimization pass, another 
node—perhaps a JoinNode—also needs to analyze what can be pushed down to that 
very same ParquetScan(file1.parquet) instance.
   
     Without caching, the from_child function would be called again for the 
same ParquetScan node, and it would re-build the exact same  `HashSet` of 
column names from scratch. 
   
   Perhaps, consider implement a caching mechanism?



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -346,14 +233,48 @@ pub struct FilterDescription {
     child_filter_descriptions: Vec<ChildFilterDescription>,
 }
 
+impl Default for FilterDescription {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl FilterDescription {
-    pub fn new_with_child_count(num_children: usize) -> Self {
+    /// Create a new empty FilterDescription
+    pub fn new() -> Self {
         Self {
-            child_filter_descriptions: vec![ChildFilterDescription::new(); 
num_children],
+            child_filter_descriptions: vec![],
+        }
+    }
+
+    /// Add a child filter description
+    pub fn with_child(mut self, child: ChildFilterDescription) -> Self {
+        self.child_filter_descriptions.push(child);
+        self
+    }
+
+    /// Build a filter description by analyzing which parent filters can be 
pushed to each child.
+    /// This method automatically determines filter routing based on column 
analysis:
+    /// - If all columns referenced by a filter exist in a child's schema, it 
can be pushed down
+    /// - Otherwise, it cannot be pushed down to that child
+    pub fn from_children(
+        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+        children: &[&Arc<dyn crate::ExecutionPlan>],
+    ) -> Result<Self> {
+        let mut desc = Self::new();
+
+        // For each child, create a ChildFilterDescription
+        for child in children {
+            desc = desc.with_child(ChildFilterDescription::from_child(
+                parent_filters.clone(),

Review Comment:
    cloning a  Vec<Arc<...>> means:
      1. Allocating a new Vec on the heap with the same capacity.
      2. Iterating through every Arc in the original Vec and cloning it 
(bumping its ref-count) into the new Vec.
   
   In a "wide plan" with many children (e.g., a UNION of 50 tables), this 
function would create 50 new `Vec`s and perform 50 * N Arc  clones, where N is 
the number of parent filters.
   
   Could we instead loop by index and share a single clone, or better yet 
refactor from_child to take a shared reference?



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -83,163 +85,6 @@ impl PredicateSupport {
     }
 }
 

Review Comment:
   Adding a few paragraphs at the top will shorten the onboarding curve.
   
    1. Optimizer Asks Parent for a Plan: The FilterPushdown optimizer rule 
visits a parent node. It calls parent.describe_filters(...) on that node. The 
parent inspects its own logic and its children's schemas to create and return a 
FilterDescription. This
         FilterDescription is the parent's proposed plan for which filters can 
be pushed to which children.
          * (This is a refinement of your step 1: the optimizer calls the 
method, the plan doesn't call it on its own).
   
   
      2. Optimizer Executes the Pushdown: The optimizer takes the 
FilterDescription from the parent. It then iterates through the children. For 
each child, it recursively runs the pushdown logic, passing along the specific 
set of filters that the parent's plan designated for that child.
          * (This clarifies your step 2: The runtime doesn't use `from_child` 
to push. `from_child` is a constructor used back in step 1 to help build the 
`FilterDescription`. The "push" is the optimizer's recursive traversal).
   
   
      3. Optimizer Gathers Results and Informs Parent: After the recursive 
pushdown completes for all children, the optimizer collects the results. It 
bundles these results into a ChildPushdownResult struct. Now, it calls 
parent.handle_child_pushdown_result(...), passing this bundle of results. This 
is the parent's chance to react to whether the pushdown into its children was 
successful.
   
   
      4. Parent Responds to Optimizer: Inside handle_child_pushdown_result, the 
parent node decides what to do. If the children couldn't handle some filters, 
the parent might need to re-introduce a FilterExec node above itself. It 
communicates its decision back to the optimizer by returning a 
FilterPushdownPropagation struct. This struct tells the optimizer which filters 
(if any) still need to be handled and whether the parent node itself needs to 
be replaced.
          * (This refines your step 3: The children's results are bundled and 
given to the parent, which then replies to the optimizer with a 
`FilterPushdownPropagation`).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to