jayzhan211 commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2026105522


##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -433,6 +437,80 @@ impl ExecutionPlan for FilterExec {
         }
         try_embed_projection(projection, self)
     }
+
+    fn filter_pushdown_request(
+        &self,
+        filters: &[Arc<dyn PhysicalExpr>],
+    ) -> Result<Vec<FilterPushdownAllowed>> {
+        // Note: we don't have to worry about / deal with the projection here 
because
+        // `FilterExec`'s projection can only remove columns, not add them.
+        // Thus if a filter was valid applied to our output it should be valid 
applied to our input.
+        // We do however need to remap the columns.
+        let input_schema = self.input.schema();
+        let filters = filters
+            .iter()
+            .map(|f| reassign_predicate_columns(Arc::clone(f), &input_schema, 
false))
+            .collect::<Result<Vec<_>>>()?;
+        Ok(filters
+            .into_iter()
+            .map(FilterPushdownAllowed::Allowed)
+            .collect())
+    }
+
+    fn filters_for_pushdown(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+        Ok(split_conjunction(self.predicate())
+            .iter()
+            .map(|f| Arc::clone(f))
+            .collect())
+    }
+
+    fn with_filter_pushdown_result(
+        self: Arc<Self>,
+        own_filters_result: &[FilterSupport],
+        parent_filters_remaining: &[Arc<dyn PhysicalExpr>],
+    ) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
+        // Only keep filters who's index maps to the pushdown result 
Unsupported
+        let filters_for_pushdown = self.filters_for_pushdown()?;
+        let new_filters = filters_for_pushdown
+            .iter()
+            .zip(own_filters_result.iter())
+            .filter_map(|(f, p)| {
+                if matches!(p, FilterSupport::HandledExact) {
+                    // Exact pushdown support means we keep discard filter
+                    None
+                } else {
+                    // Otherwise we still have to apply it
+                    Some(Arc::clone(f))
+                }
+            })
+            // Combine that with any leftover filters from parents that our 
children couldn't handle
+            .chain(parent_filters_remaining.iter().map(Arc::clone));
+
+        let new_predicate = conjunction(new_filters);
+
+        if new_predicate.eq(&lit(true)) && self.projection.is_none() {
+            // We can remove ourselves from the execution tree
+            Ok(Some(ExecutionPlanFilterPushdownResult::new(
+                Arc::clone(&self.input),
+                vec![FilterSupport::HandledExact; 
parent_filters_remaining.len()],
+            )))
+        } else {
+            Ok(Some(ExecutionPlanFilterPushdownResult {
+                inner: Arc::new(Self {
+                    predicate: new_predicate,
+                    input: Arc::clone(&self.input),
+                    metrics: self.metrics.clone(),
+                    default_selectivity: self.default_selectivity,
+                    cache: self.cache.clone(),

Review Comment:
   if `predicate` is updated, should we update the cache too?



##########
datafusion/physical-plan/src/dynamic_filters.rs:
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    any::Any,
+    fmt::Display,
+    hash::Hash,
+    sync::{Arc, RwLock},
+};
+
+use datafusion_common::{
+    tree_node::{Transformed, TransformedResult, TreeNode},
+    Result,
+};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr::{utils::conjunction, PhysicalExpr};
+use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
+
+/// A source of dynamic runtime filters.
+///
+/// During query execution, operators implementing this trait can provide
+/// filter expressions that other operators can use to dynamically prune data.
+///
+/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs 
for examples.
+pub trait DynamicFilterSource:
+    Send + Sync + std::fmt::Debug + DynEq + DynHash + Display + 'static
+{
+    /// Take a snapshot of the current state of filtering, returning a 
non-dynamic PhysicalExpr.
+    /// This is used to e.g. serialize dynamic filters across the wire or to 
pass them into systems
+    /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete 
types of the expressions like `PruningPredicate` does).
+    /// For example, it is expected that this returns a relatively simple 
expression such as `col1 > 5` for a TopK operator or
+    /// `col2 IN (1, 2, ... N)` for a HashJoin operator.
+    fn snapshot_current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>;
+
+    fn as_any(&self) -> &dyn Any;
+}
+
+impl PartialEq for dyn DynamicFilterSource {
+    fn eq(&self, other: &Self) -> bool {
+        self.dyn_eq(other.as_any())
+    }
+}
+
+impl Eq for dyn DynamicFilterSource {}
+
+/// A wrapper around a [`DynamicFilterSource`] that allows it to be used as a 
physical expression.
+/// This will call [`DynamicFilterSource::snapshot_current_filters`] to get 
the current filters for each call to
+/// [`PhysicalExpr::evaluate`], [`PhysicalExpr::data_type`], and 
[`PhysicalExpr::nullable`].
+/// It also implements [`PhysicalExpr::snapshot`] by forwarding the call to 
[`DynamicFilterSource::snapshot_current_filters`].
+#[derive(Debug)]
+pub struct DynamicFilterPhysicalExpr {
+    /// The original children of this PhysicalExpr, if any.
+    /// This is necessary because the dynamic filter may be initialized with a 
placeholder (e.g. `lit(true)`)
+    /// and later remapped to the actual expressions that are being filtered.
+    /// But we need to know the children (e.g. columns referenced in the 
expression) ahead of time to evaluate the expression correctly.
+    children: Vec<Arc<dyn PhysicalExpr>>,
+    /// If any of the children were remapped / modified (e.g. to adjust for 
projections) we need to keep track of the new children
+    /// so that when we update `current()` in subsequent iterations we can 
re-apply the replacements.
+    remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
+    /// The source of dynamic filters.
+    inner: Arc<dyn DynamicFilterSource>,
+    /// For testing purposes track the data type and nullability to make sure 
they don't change.
+    /// If they do, there's a bug in the implementation.
+    /// But this can have overhead in production, so it's only included in our 
tests.
+    data_type: Arc<RwLock<Option<arrow::datatypes::DataType>>>,
+    nullable: Arc<RwLock<Option<bool>>>,
+}
+
+impl Hash for DynamicFilterPhysicalExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.inner.dyn_hash(state);
+        self.children.dyn_hash(state);
+        self.remapped_children.dyn_hash(state);
+    }
+}
+
+impl PartialEq for DynamicFilterPhysicalExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.inner.dyn_eq(other.inner.as_any())
+            && self.children == other.children
+            && self.remapped_children == other.remapped_children
+    }
+}
+
+impl Eq for DynamicFilterPhysicalExpr {}
+
+impl Display for DynamicFilterPhysicalExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "DynamicFilterPhysicalExpr [ {} ]", self.inner)
+    }
+}
+
+impl DynamicFilterPhysicalExpr {
+    pub fn new(
+        children: Vec<Arc<dyn PhysicalExpr>>,
+        inner: Arc<dyn DynamicFilterSource>,
+    ) -> Self {
+        Self {
+            children,
+            remapped_children: None, // Initially no remapped children
+            inner,
+            data_type: Arc::new(RwLock::new(None)),
+            nullable: Arc::new(RwLock::new(None)),
+        }
+    }
+
+    fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
+        let current = conjunction(self.inner.snapshot_current_filters()?);
+        if let Some(remapped_children) = &self.remapped_children {
+            // Remap children to the current children
+            // of the expression.
+            current
+                .transform_up(|expr| {
+                    // Check if this is any of our original children
+                    if let Some(pos) = self
+                        .children
+                        .iter()
+                        .position(|c| c.as_ref() == expr.as_ref())
+                    {
+                        // If so, remap it to the current children
+                        // of the expression.
+                        let new_child = Arc::clone(&remapped_children[pos]);
+                        Ok(Transformed::yes(new_child))
+                    } else {
+                        // Otherwise, just return the expression
+                        Ok(Transformed::no(expr))
+                    }
+                })
+                .data()
+        } else {
+            Ok(current)
+        }
+    }
+}
+
+impl PhysicalExpr for DynamicFilterPhysicalExpr {

Review Comment:
   `PhysicalExpr` is usually in `physical-expr` crate



##########
datafusion/physical-plan/src/sorts/sort_filters.rs:
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    fmt::Display,
+    hash::{Hash, Hasher},
+    sync::{Arc, RwLock},
+};
+
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::Operator;
+use datafusion_physical_expr::{
+    expressions::{is_not_null, is_null, lit, BinaryExpr},
+    LexOrdering, PhysicalExpr,
+};
+
+use crate::dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSource};
+
+/// Pushdown of dynamic filters from sort + limit operators (aka `TopK`) is 
used to speed up queries
+/// such as `SELECT * FROM table ORDER BY col DESC LIMIT 10` by pushing down 
the
+/// threshold values for the sort columns to the data source.
+/// That is, the TopK operator will keep track of the top 10 values for the 
sort
+/// and before a new file is opened its statistics will be checked against the
+/// threshold values to determine if the file can be skipped and predicate 
pushdown
+/// will use these to skip rows during the scan.
+///
+/// For example, imagine this data gets created if multiple sources with clock 
skews,
+/// network delays, etc. are writing data and you don't do anything fancy to 
guarantee
+/// perfect sorting by `timestamp` (i.e. you naively write out the data to 
Parquet, maybe do some compaction, etc.).
+/// The point is that 99% of yesterday's files have a `timestamp` smaller than 
99% of today's files
+/// but there may be a couple seconds of overlap between files.
+/// To be concrete, let's say this is our data:
+//
+// | file | min | max |
+// |------|-----|-----|
+// | 1    | 1   | 10  |
+// | 2    | 9   | 19  |
+// | 3    | 20  | 31  |
+// | 4    | 30  | 35  |
+//
+// Ideally a [`TableProvider`] is able to use file level stats or other 
methods to roughly order the files
+// within each partition / file group such that we start with the newest / 
largest `timestamp`s.
+// If this is not possible the optimization still works but is less efficient 
and harder to visualize,
+// so for this example let's assume that we process 1 file at a time and we 
started with file 4.
+// After processing file 4 let's say we have 10 values in our TopK heap, the 
smallest of which is 30.
+// The TopK operator will then push down the filter `timestamp < 30` down the 
tree of [`ExecutionPlan`]s
+// and if the data source supports dynamic filter pushdown it will accept a 
reference to this [`DynamicPhysicalExprSource`]
+// and when it goes to open file 3 it will ask the 
[`DynamicPhysicalExprSource`] for the current filters.
+// Since file 3 may contain values larger than 30 we cannot skip it entirely,
+// but scanning it may still be more efficient due to page pruning and other 
optimizations.
+// Once we get to file 2 however we can skip it entirely because we know that 
all values in file 2 are smaller than 30.
+// The same goes for file 1.
+// So this optimization just saved us 50% of the work of scanning the data.
+#[derive(Debug)]
+pub struct SortDynamicFilterSource {
+    /// Sort expressions
+    expr: LexOrdering,
+    /// Current threshold values
+    thresholds: Arc<RwLock<Vec<Option<ScalarValue>>>>,
+}
+
+impl Hash for SortDynamicFilterSource {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        // Hash the pointers to the thresholds
+        let thresholds = Arc::as_ptr(&self.thresholds) as usize;
+        thresholds.hash(state);
+    }
+}
+
+impl PartialEq for SortDynamicFilterSource {
+    fn eq(&self, other: &Self) -> bool {
+        Arc::ptr_eq(&self.thresholds, &other.thresholds)
+    }
+}
+
+impl Eq for SortDynamicFilterSource {}
+
+impl SortDynamicFilterSource {
+    pub fn new(expr: LexOrdering) -> Self {
+        let thresholds = Arc::new(RwLock::new(vec![None; expr.len()]));
+        Self { expr, thresholds }
+    }
+
+    pub fn update_values(&self, new_values: &[ScalarValue]) -> Result<()> {
+        let replace = {
+            let thresholds = self.thresholds.read().map_err(|_| {
+                datafusion_common::DataFusionError::Execution(

Review Comment:
   `exec_err!()`



##########
datafusion/physical-optimizer/src/filter_pushdown.rs:
##########
@@ -0,0 +1,501 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::{config::ConfigOptions, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_plan::{
+    execution_plan::{
+        ExecutionPlanFilterPushdownResult, FilterPushdownAllowed, 
FilterSupport,
+    },
+    with_new_children_if_necessary, ExecutionPlan,
+};
+
+use crate::PhysicalOptimizerRule;
+
+/// The state of filter pushdown support for a given filter.
+#[derive(Clone, Copy, Debug)]
+enum ChildPushdownState {
+    /// A child said it can handle the filter exactly.
+    ChildExact,
+    /// A child exists and took a look at the filter.
+    /// It may partially handle it or not handle it at all.
+    /// The parent still needs to re-apply the filter.
+    ChildInexact,
+    /// No child exists, there is no one to handle the filter.
+    /// This is the default / initial state.
+    NoChild,
+}
+
+impl ChildPushdownState {
+    /// Combine the current state with another state.
+    /// This is used to combine the results of multiple children.
+    fn combine_with_other(&self, other: &FilterSupport) -> ChildPushdownState {
+        match (other, self) {
+            (FilterSupport::HandledExact, ChildPushdownState::NoChild) => {
+                ChildPushdownState::ChildExact
+            }
+            (FilterSupport::HandledExact, ChildPushdownState::ChildInexact) => 
{
+                ChildPushdownState::ChildInexact
+            }
+            (FilterSupport::Unhandled, ChildPushdownState::NoChild) => {
+                ChildPushdownState::ChildInexact
+            }
+            (FilterSupport::Unhandled, ChildPushdownState::ChildExact) => {
+                ChildPushdownState::ChildInexact
+            }
+            (FilterSupport::Unhandled, ChildPushdownState::ChildInexact) => {
+                ChildPushdownState::ChildInexact
+            }
+            (FilterSupport::HandledExact, ChildPushdownState::ChildExact) => {
+                // If both are exact, keep it as exact
+                ChildPushdownState::ChildExact
+            }
+        }
+    }
+}
+
+/// See [`pushdown_filters`] for more details.
+fn push_down_into_children(
+    node: &Arc<dyn ExecutionPlan>,
+    filters: &[Arc<dyn PhysicalExpr>],
+) -> Result<ExecutionPlanFilterPushdownResult> {
+    let children = node.children();
+    let mut new_children = Vec::with_capacity(children.len());
+    let mut filter_pushdown_result = vec![ChildPushdownState::NoChild; 
filters.len()];
+    for child in children {
+        if let Some(result) = pushdown_filters(child, filters)? {
+            new_children.push(result.inner);
+            for (idx, support) in result.support.iter().enumerate() {
+                filter_pushdown_result[idx] =
+                    filter_pushdown_result[idx].combine_with_other(support)
+            }
+        } else {
+            new_children.push(Arc::clone(child));
+        }
+    }
+    let support = filter_pushdown_result
+        .iter()
+        .map(|s| match s {
+            ChildPushdownState::ChildExact => FilterSupport::HandledExact,
+            ChildPushdownState::ChildInexact => FilterSupport::Unhandled,
+            ChildPushdownState::NoChild => FilterSupport::Unhandled,
+        })
+        .collect::<Vec<_>>();
+    let node = with_new_children_if_necessary(Arc::clone(node), new_children)?;
+    Ok(ExecutionPlanFilterPushdownResult::new(node, support))
+}
+
+/// Recursively a collection of filters down through the execution plan tree 
in a depth-first manner.
+///
+/// For each filter we try to push it down to children as far down as 
possible, keeping track of if the children
+/// can handle the filter or not.
+///
+/// If a child can handle the filter, we mark it as handled exact and parent 
nodes (including the source of the filter)
+/// can decide to discard it / not re-apply it themselves.
+/// If a child cannot handle the filter or may return false positives (aka 
"inexact" handling) we mark it as handled inexact.
+/// If a child does not allow filter pushdown at all (e.g. an aggregation 
node) we keep recursing but clear the current set of filters
+/// we are pushing down.
+///
+/// As we recurse back up the tree we combine the results of the children to 
determine if the overall result is exact or inexact:
+/// - For nodes with a single child we just take the child's result.
+/// - For nodes with multiple children we combine the results of the children 
to determine if the overall result is exact or inexact.
+///   We do this by checking if all children are exact (we return exact up) or 
if any child is inexact (we return inexact).
+/// - If a node has no children this is equivalent to inexact handling (there 
is no child to handle the filter).
+///
+/// See [`FilterPushdown`] for more details on how this works in practice.
+fn pushdown_filters(
+    node: &Arc<dyn ExecutionPlan>,
+    parent_filters: &[Arc<dyn PhysicalExpr>],
+) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
+    // Gather the filters from the current node.
+    // These are the filters the current node "owns" or "produces" and wants 
to push down.
+    let node_filters = node.filters_for_pushdown()?;
+    // Check which nodes from parents this node is okay with us trying to push 
down to it's children.
+    let parent_pushdown_request_result = 
node.filter_pushdown_request(parent_filters)?;
+    // Do some index masking so that we only ever call nodes with the filters 
relevant to them / that they're allowed to touch.
+    // But we still need to reconstruct the full result for our caller.
+    let parent_filter_for_pushdown_indices = parent_pushdown_request_result
+        .iter()
+        .enumerate()
+        .filter_map(|(i, s)| {
+            if matches!(s, FilterPushdownAllowed::Allowed(_)) {
+                Some(i)
+            } else {
+                None
+            }
+        })
+        .collect::<Vec<_>>();
+    let parent_filters_to_push_down = parent_filter_for_pushdown_indices
+        .iter()
+        .map(|&i| Arc::clone(&parent_filters[i]))
+        .collect::<Vec<_>>();
+    let all_filters_to_push_down = node_filters
+        .iter()
+        .chain(parent_filters_to_push_down.iter())
+        .map(Arc::clone)
+        .collect::<Vec<_>>();
+    // Push down into children
+    let child_pushdown_result = push_down_into_children(node, 
&all_filters_to_push_down)?;
+    let mut node = child_pushdown_result.inner;
+    // A bit more index masking to construct the final result for our caller.
+    let node_filters_pushdown_result =
+        child_pushdown_result.support[..node_filters.len()].to_vec();
+    let mut parent_filter_pushdown_result =
+        vec![FilterSupport::Unhandled; parent_filters.len()];
+    for (parent_filter_idx, support) in parent_filter_for_pushdown_indices
+        .iter()
+        .zip(child_pushdown_result.support[node_filters.len()..].iter())
+    {
+        parent_filter_pushdown_result[*parent_filter_idx] = *support;
+    }
+    // Collect the remaining unhandled parent filters
+    let unhandled_parent_filter_indices = (0..parent_filters.len())
+        .filter(|&i| matches!(parent_filter_pushdown_result[i], 
FilterSupport::Unhandled))
+        .collect::<Vec<_>>();
+    let unhandled_parent_filters = unhandled_parent_filter_indices
+        .iter()
+        .map(|&i| Arc::clone(&parent_filters[i]))
+        .collect::<Vec<_>>();
+    // Check if the node can handle the filters
+    if let Some(result) = Arc::clone(&node).with_filter_pushdown_result(
+        &node_filters_pushdown_result,
+        &unhandled_parent_filters,
+    )? {
+        node = result.inner;
+        for (parent_filter_index, support) in
+            unhandled_parent_filter_indices.iter().zip(result.support)
+        {
+            parent_filter_pushdown_result[*parent_filter_index] = support;
+        }
+    }
+    Ok(Some(ExecutionPlanFilterPushdownResult::new(
+        node,
+        parent_filter_pushdown_result,
+    )))
+}
+
+/// A physical optimizer rule that pushes down filters in the execution plan.
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │      FilterExec      │
+/// │  filters = [ id=1]   │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// └──────────────────────┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the `FilterExec` node to the 
`DataSourceExec` node.
+/// If this filter is selective it can avoid massive amounts of data being 
read from the source (the projection is `*` so all matching columns are read).
+/// In this simple case we:
+/// 1. Enter the recursion with no filters.
+/// 2. We find the `FilterExec` node and it tells us that it has a filter (see 
[`ExecutionPlan::filters_for_pushdown`] and 
`datafusion::physical_plan::filter::FilterExec`).
+/// 3. We recurse down into it's children (the `DataSourceExec` node) now 
carrying the filters `[id = 1]`.
+/// 4. The `DataSourceExec` node tells us that it can handle the filter and we 
mark it as handled exact (see 
[`ExecutionPlan::push_down_filters_from_parents`]).
+/// 5. Since the `DataSourceExec` node has no children we recurse back up the 
tree.
+/// 6. We now tell the `FilterExec` node that it has a child that can handle 
the filter and we mark it as handled exact (see 
[`ExecutionPlan::with_filter_pushdown_result`]).
+///    The `FilterExec` node can now return a new execution plan, either a 
copy of itself without that filter or if has no work left to do it can even 
return the child node directly.
+/// 7. We recurse back up to `CoalesceBatchesExec` and do nothing there since 
it had no filters to push down.
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+//  │    projection = *    │
+//  │   filters = [ id=1]  │
+/// └──────────────────────┘
+/// ```
+///
+/// Let's consider a more complex example involving a `ProjectionExec` node in 
betweeen the `FilterExec` and `DataSourceExec` nodes that creates a new column 
that the filter depends on.
+///
+/// ```text
+// ┌──────────────────────┐
+// │ CoalesceBatchesExec  │
+// └──────────────────────┘
+//             │
+//             ▼
+// ┌──────────────────────┐
+// │      FilterExec      │
+// │    filters =         │
+// │     [cost>50,id=1]   │
+// └──────────────────────┘
+//             │
+//             ▼
+// ┌──────────────────────┐
+// │    ProjectionExec    │
+// │ cost = price * 1.2   │
+// └──────────────────────┘
+//             │
+//             ▼
+// ┌──────────────────────┐
+// │    DataSourceExec    │
+// │    projection = *    │
+// └──────────────────────┘
+/// ```
+///
+/// We want to push down the filters `[id=1]` to the [`DataSourceExec`] node, 
but can't push down `[cost>50]` because it requires the `ProjectionExec` node 
to be executed first:
+///
+/// ```text
+// ┌──────────────────────┐
+// │ CoalesceBatchesExec  │
+// └──────────────────────┘
+//             │
+//             ▼
+// ┌──────────────────────┐
+// │      FilterExec      │
+// │    filters =         │
+// │     [cost>50]        │
+// └──────────────────────┘
+//             │
+//             ▼
+// ┌──────────────────────┐
+// │    ProjectionExec    │
+// │ cost = price * 1.2   │
+// └──────────────────────┘
+//             │
+//             ▼
+// ┌──────────────────────┐
+// │    DataSourceExec    │
+// │    projection = *    │
+// │   filters = [ id=1]  │
+// └──────────────────────┘
+/// ```
+///
+/// There are also cases where we may be able to push down filters within a 
subtree but not the entire tree.
+/// A good exmaple of this is aggreagation nodes:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ ProjectionExec       │
+/// │ projection = *       │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ FilterExec           │
+/// │ filters = [sum > 10] │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌───────────────────────┐
+/// │     AggregateExec     │
+/// │    group by = [id]    │
+/// │    aggregate =        │
+/// │      [sum(price)]     │
+/// └───────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ FilterExec           │
+/// │ filters = [id=1]     │
+/// └──────────────────────┘
+///          │
+///          ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec       │
+/// │ projection = *       │
+/// └──────────────────────┘
+/// ```
+///
+/// The transformation here is to push down the `[id=1]` filter to the 
`DataSourceExec` node:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ ProjectionExec       │
+/// │ projection = *       │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ FilterExec           │
+/// │ filters = [sum > 10] │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌───────────────────────┐
+/// │     AggregateExec     │
+/// │    group by = [id]    │
+/// │    aggregate =        │
+/// │      [sum(price)]     │
+/// └───────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec       │
+/// │ projection = *       │
+/// │ filters = [id=1]     │
+/// └──────────────────────┘
+/// ```
+///
+/// The point here is that:
+/// 1. We cannot push down `sum > 10` through the `AggregateExec` node into 
the `DataSourceExec` node.
+///    Any filters above the `AggregateExec` node are not pushed down.
+///    This is determined by calling 
[`ExecutionPlan::supports_filter_pushdown`] on the `AggregateExec` node.
+/// 2. We need to keep recursing into the tree so that we can discover the 
other `FilterExec` node and push down the `[id=1]` filter.
+///
+/// It is also possible to push down filters through joins and from joins.
+/// For example, a hash join where we build a hash table of the left side and 
probe the right side
+/// (ignoring why we would choose this order, typically it depends on the size 
of each table, etc.).
+///
+/// ```text
+///              ┌─────────────────────┐
+///              │     FilterExec      │
+///              │ filters =           │
+///              │  [d.size > 100]     │
+///              └─────────────────────┘
+///                         │
+///                         │
+///              ┌──────────▼──────────┐
+///              │                     │
+///              │    HashJoinExec     │
+///              │ [u.dept@hash(d.id)] │
+///              │                     │
+///              └─────────────────────┘
+///                         │
+///            ┌────────────┴────────────┐
+/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
+/// │   DataSourceExec    │   │   DataSourceExec    │
+/// │  alias [users as u] │   │  alias [dept as d]  │
+/// │                     │   │                     │
+/// └─────────────────────┘   └─────────────────────┘
+/// ```
+///
+/// There are two pushdowns we can do here:
+/// 1. Push down the `[d.size > 100]` filter through the `HashJoinExec` node 
to the `DataSourceExec` node for the `departments` table.
+/// 2. Push down the hash table state from the `HashJoinExec` node to the 
`DataSourceExec` node to avoid reading
+///    rows from teh `users` table that will be eliminated by the join.

Review Comment:
   ```suggestion
   ///    rows from the `users` table that will be eliminated by the join.
   ```



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -584,6 +584,22 @@ impl DataSource for FileScanConfig {
             ) as _
         }))
     }
+
+    fn push_down_filters(
+        &self,
+        filters: &[Arc<dyn PhysicalExpr>],

Review Comment:
   ```suggestion
           filters: &[PhysicalExprRef],
   ```



##########
datafusion/physical-expr/src/utils/mod.rs:
##########
@@ -47,6 +47,22 @@ pub fn split_conjunction(
     split_impl(Operator::And, predicate, vec![])
 }
 
+/// Create a conjunction of the given predicates.
+/// If the input is empty, return a literal true.
+/// If the input contains a single predicate, return the predicate.
+/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`).
+pub fn conjunction(
+    predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
+) -> Arc<dyn PhysicalExpr> {
+    predicates
+        .into_iter()
+        .fold(None, |acc, predicate| match acc {
+            None => Some(predicate),
+            Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, 
predicate))),
+        })
+        .unwrap_or_else(|| crate::expressions::lit(true))

Review Comment:
   I think we can check the len of `predicates` before calling this function so 
we know need to create lit(true) for this case



##########
datafusion/physical-plan/src/sorts/sort_filters.rs:
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    fmt::Display,
+    hash::{Hash, Hasher},
+    sync::{Arc, RwLock},
+};
+
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::Operator;
+use datafusion_physical_expr::{
+    expressions::{is_not_null, is_null, lit, BinaryExpr},
+    LexOrdering, PhysicalExpr,
+};
+
+use crate::dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSource};
+
+/// Pushdown of dynamic filters from sort + limit operators (aka `TopK`) is 
used to speed up queries
+/// such as `SELECT * FROM table ORDER BY col DESC LIMIT 10` by pushing down 
the
+/// threshold values for the sort columns to the data source.
+/// That is, the TopK operator will keep track of the top 10 values for the 
sort
+/// and before a new file is opened its statistics will be checked against the
+/// threshold values to determine if the file can be skipped and predicate 
pushdown
+/// will use these to skip rows during the scan.
+///
+/// For example, imagine this data gets created if multiple sources with clock 
skews,
+/// network delays, etc. are writing data and you don't do anything fancy to 
guarantee
+/// perfect sorting by `timestamp` (i.e. you naively write out the data to 
Parquet, maybe do some compaction, etc.).
+/// The point is that 99% of yesterday's files have a `timestamp` smaller than 
99% of today's files
+/// but there may be a couple seconds of overlap between files.
+/// To be concrete, let's say this is our data:
+//
+// | file | min | max |
+// |------|-----|-----|
+// | 1    | 1   | 10  |
+// | 2    | 9   | 19  |
+// | 3    | 20  | 31  |
+// | 4    | 30  | 35  |
+//
+// Ideally a [`TableProvider`] is able to use file level stats or other 
methods to roughly order the files
+// within each partition / file group such that we start with the newest / 
largest `timestamp`s.
+// If this is not possible the optimization still works but is less efficient 
and harder to visualize,
+// so for this example let's assume that we process 1 file at a time and we 
started with file 4.
+// After processing file 4 let's say we have 10 values in our TopK heap, the 
smallest of which is 30.
+// The TopK operator will then push down the filter `timestamp < 30` down the 
tree of [`ExecutionPlan`]s
+// and if the data source supports dynamic filter pushdown it will accept a 
reference to this [`DynamicPhysicalExprSource`]
+// and when it goes to open file 3 it will ask the 
[`DynamicPhysicalExprSource`] for the current filters.
+// Since file 3 may contain values larger than 30 we cannot skip it entirely,
+// but scanning it may still be more efficient due to page pruning and other 
optimizations.
+// Once we get to file 2 however we can skip it entirely because we know that 
all values in file 2 are smaller than 30.
+// The same goes for file 1.
+// So this optimization just saved us 50% of the work of scanning the data.
+#[derive(Debug)]
+pub struct SortDynamicFilterSource {
+    /// Sort expressions
+    expr: LexOrdering,
+    /// Current threshold values
+    thresholds: Arc<RwLock<Vec<Option<ScalarValue>>>>,

Review Comment:
   do we need `Arc<RwLock<T>>`? I think we will only have single instance so it 
is safe to update the values 



##########
datafusion/physical-plan/src/dynamic_filters.rs:
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    any::Any,
+    fmt::Display,
+    hash::Hash,
+    sync::{Arc, RwLock},
+};
+
+use datafusion_common::{
+    tree_node::{Transformed, TransformedResult, TreeNode},
+    Result,
+};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr::{utils::conjunction, PhysicalExpr};
+use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
+
+/// A source of dynamic runtime filters.
+///
+/// During query execution, operators implementing this trait can provide
+/// filter expressions that other operators can use to dynamically prune data.
+///
+/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs 
for examples.
+pub trait DynamicFilterSource:
+    Send + Sync + std::fmt::Debug + DynEq + DynHash + Display + 'static

Review Comment:
   Do we need `'static`



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -584,6 +584,22 @@ impl DataSource for FileScanConfig {
             ) as _
         }))
     }
+
+    fn push_down_filters(
+        &self,
+        filters: &[Arc<dyn PhysicalExpr>],
+    ) -> Result<Option<DataSourceFilterPushdownResult>> {
+        if let Some(file_source_result) = 
self.file_source.push_down_filters(filters)? {
+            let mut new_self = self.clone();

Review Comment:
   We could try to avoid clone if possible
    
   ```rust
       fn push_down_filters(
           self: Arc<Self>,
           filters: &[Arc<dyn PhysicalExpr>],
       ) -> Result<Option<DataSourceFilterPushdownResult>> {
           if let Some(file_source_result) = 
self.file_source.push_down_filters(filters)? {
   
               let mut inner = Arc::into_inner(self).unwrap();
               inner.file_source = file_source_result.inner;
   
               Ok(Some(DataSourceFilterPushdownResult {
                   inner: Arc::new(inner) as Arc<dyn DataSource>,
                   support: file_source_result.support,
               }))
           } else {
               Ok(None)
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org


Reply via email to