kosiew commented on code in PR #16642: URL: https://github.com/apache/datafusion/pull/16642#discussion_r2182297728
########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -520,10 +520,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription> { - Ok( - FilterDescription::new_with_child_count(self.children().len()) - .all_parent_filters_unsupported(parent_filters), - ) + // Default implementation: mark all filters as unsupported for all children + let mut desc = FilterDescription::new(); + for _child in self.children() { + let child_filters = parent_filters + .iter() + .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) + .collect(); + desc = desc.with_child(ChildFilterDescription { + parent_filters: child_filters, + self_filters: vec![], + }); + } Review Comment: Opportunity for reducing DRY. This loop is effectively a broadcast of “unsupported” to every child. That’s exactly what FilterDescription::from_children could do if you passed a helper that always returns Unsupported. ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -291,18 +136,8 @@ impl<T> FilterPushdownPropagation<T> { } } - /// Create a new [`FilterPushdownPropagation`] that tells the parent node - /// that none of the parent filters were not pushed down. - pub fn unsupported(parent_filters: Vec<Arc<dyn PhysicalExpr>>) -> Self { - let unsupported = PredicateSupports::all_unsupported(parent_filters); - Self { - filters: unsupported, - updated_node: None, - } - } - /// Create a new [`FilterPushdownPropagation`] with the specified filter support. - pub fn with_filters(filters: PredicateSupports) -> Self { + pub fn with_filters(filters: Vec<PredicateSupport>) -> Self { Review Comment: The old .unsupported(...) helper was very explicit. Now, to produce an “all unsupported” result we must write: ```rust FilterPushdownPropagation::with_filters( filters.into_iter().map(PredicateSupport::Unsupported).collect() ) ``` While this is idiomatic Rust, it forces the developer to think about the mechanism (map, collect) rather than the intent ("all of these filters were rejected"). How about adding a helper function makes the intent crystal clear at the call site. ```rust FilterPushdownPropagation::all_rejected(filters) ``` This is better because: * It's self-documenting. The name of the function tells you exactly what's happening. * It's higher-level. It allows developers to work at the level of "what they want to do" rather than "how to do it." ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -317,24 +152,76 @@ impl<T> FilterPushdownPropagation<T> { } #[derive(Debug, Clone)] -struct ChildFilterDescription { +pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters. - parent_filters: PredicateSupports, + pub(crate) parent_filters: Vec<PredicateSupport>, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays /// instead of having to track supported/unsupported. - self_filters: Vec<Arc<dyn PhysicalExpr>>, + pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>, } impl ChildFilterDescription { - fn new() -> Self { - Self { - parent_filters: PredicateSupports::new(vec![]), - self_filters: vec![], + /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. + /// + /// See [`FilterDescription::from_children`] for more details + pub fn from_child( + parent_filters: Vec<Arc<dyn PhysicalExpr>>, + child: &Arc<dyn crate::ExecutionPlan>, + ) -> Result<Self> { + let child_schema = child.schema(); + + // Get column names from child schema for quick lookup + let child_column_names: HashSet<&str> = child_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); Review Comment: We could be redundantly rebuilding the entire `HashSet` of column names for the exact same `ExecutionPlan` node multiple times during the optimization process. Here’s a scenario to illustrate: Imagine a complex query plan. A specific node, let's say ParquetScan(file1.parquet), exists as a shared reference (Arc<dyn ExecutionPlan>) and might be evaluated in different contexts as the optimizer walks the plan tree. 1. First Encounter: The optimizer analyzes a FilterNode that has ParquetScan(file1.parquet) as a child. It calls ChildFilterDescription::from_child. This function iterates through the schema of the parquet scan and builds a HashSet of its column names for the first time. 2. Second Encounter: Later in the same optimization pass, another node—perhaps a JoinNode—also needs to analyze what can be pushed down to that very same ParquetScan(file1.parquet) instance. Without caching, the from_child function would be called again for the same ParquetScan node, and it would re-build the exact same `HashSet` of column names from scratch. Perhaps, consider implement a caching mechanism? ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -346,14 +233,48 @@ pub struct FilterDescription { child_filter_descriptions: Vec<ChildFilterDescription>, } +impl Default for FilterDescription { + fn default() -> Self { + Self::new() + } +} + impl FilterDescription { - pub fn new_with_child_count(num_children: usize) -> Self { + /// Create a new empty FilterDescription + pub fn new() -> Self { Self { - child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], + child_filter_descriptions: vec![], + } + } + + /// Add a child filter description + pub fn with_child(mut self, child: ChildFilterDescription) -> Self { + self.child_filter_descriptions.push(child); + self + } + + /// Build a filter description by analyzing which parent filters can be pushed to each child. + /// This method automatically determines filter routing based on column analysis: + /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down + /// - Otherwise, it cannot be pushed down to that child + pub fn from_children( + parent_filters: Vec<Arc<dyn PhysicalExpr>>, + children: &[&Arc<dyn crate::ExecutionPlan>], + ) -> Result<Self> { + let mut desc = Self::new(); + + // For each child, create a ChildFilterDescription + for child in children { + desc = desc.with_child(ChildFilterDescription::from_child( + parent_filters.clone(), Review Comment: cloning a Vec<Arc<...>> means: 1. Allocating a new Vec on the heap with the same capacity. 2. Iterating through every Arc in the original Vec and cloning it (bumping its ref-count) into the new Vec. In a "wide plan" with many children (e.g., a UNION of 50 tables), this function would create 50 new `Vec`s and perform 50 * N Arc clones, where N is the number of parent filters. Could we instead loop by index and share a single clone, or better yet refactor from_child to take a shared reference? ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -83,163 +85,6 @@ impl PredicateSupport { } } Review Comment: Adding a few paragraphs at the top will shorten the onboarding curve. 1. Optimizer Asks Parent for a Plan: The FilterPushdown optimizer rule visits a parent node. It calls parent.describe_filters(...) on that node. The parent inspects its own logic and its children's schemas to create and return a FilterDescription. This FilterDescription is the parent's proposed plan for which filters can be pushed to which children. * (This is a refinement of your step 1: the optimizer calls the method, the plan doesn't call it on its own). 2. Optimizer Executes the Pushdown: The optimizer takes the FilterDescription from the parent. It then iterates through the children. For each child, it recursively runs the pushdown logic, passing along the specific set of filters that the parent's plan designated for that child. * (This clarifies your step 2: The runtime doesn't use `from_child` to push. `from_child` is a constructor used back in step 1 to help build the `FilterDescription`. The "push" is the optimizer's recursive traversal). 3. Optimizer Gathers Results and Informs Parent: After the recursive pushdown completes for all children, the optimizer collects the results. It bundles these results into a ChildPushdownResult struct. Now, it calls parent.handle_child_pushdown_result(...), passing this bundle of results. This is the parent's chance to react to whether the pushdown into its children was successful. 4. Parent Responds to Optimizer: Inside handle_child_pushdown_result, the parent node decides what to do. If the children couldn't handle some filters, the parent might need to re-introduce a FilterExec node above itself. It communicates its decision back to the optimizer by returning a FilterPushdownPropagation struct. This struct tells the optimizer which filters (if any) still need to be handled and whether the parent node itself needs to be replaced. * (This refines your step 3: The children's results are bundled and given to the parent, which then replies to the optimizer with a `FilterPushdownPropagation`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org