itsjunetime commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1737074972
##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((projection, rewritten_expr)) = non_pushdown_columns(
+ Arc::clone(&self.expr),
+ self.file_schema,
+ self.table_schema,
+ )?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&self.required_column_indices,
metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection,
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
+ if let Ok(idx) = self.file_schema.index_of(column_name) {
+ self.required_column_indices.insert(idx);
+
+ if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+ self.non_primitive_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+ } else if self.table_schema.index_of(column_name).is_err() {
+ // If the column does not exist in the (un-projected) table schema
then
+ // it must be a projected column.
+ self.projected_columns = true;
+ return Some(TreeNodeRecursion::Jump);
}
+
+ None
}
}
-/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that
-/// walks the expression tree and rewrites it in preparation of becoming
-/// `FilterCandidate`.
-impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
+impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
type Node = Arc<dyn PhysicalExpr>;
- /// Called before visiting each child
fn f_down(
&mut self,
node: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
- if let Ok(idx) = self.file_schema.index_of(column.name()) {
- self.required_column_indices.insert(idx);
-
- if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
- self.non_primitive_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
- }
- } else if self.table_schema.index_of(column.name()).is_err() {
- // If the column does not exist in the (un-projected) table
schema then
- // it must be a projected column.
- self.projected_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
+ if let Some(recursion) = self.check_single_column(column.name()) {
+ return Ok(Transformed::new(node, false, recursion));
}
}
Ok(Transformed::no(node))
}
- /// After visiting all children, rewrite column references to nulls if
- /// they are not in the file schema
fn f_up(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- // if the expression is a column, is it in the file schema?
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if self.file_schema.field_with_name(column.name()).is_err() {
- // Replace the column reference with a NULL (using the type
from the table schema)
- // e.g. `column = 'foo'` is rewritten be transformed to `NULL
= 'foo'`
- //
- // See comments on `FilterCandidateBuilder` for more
information
- return match self.table_schema.field_with_name(column.name()) {
- Ok(field) => {
+ // the column expr must be in the table schema
+ return self
+ .table_schema
+ .field_with_name(column.name())
+ .and_then(|field| {
// return the null value corresponding to the data type
let null_value =
ScalarValue::try_from(field.data_type())?;
-
Ok(Transformed::yes(Arc::new(Literal::new(null_value))))
- }
- Err(e) => {
- // If the column is not in the table schema, should
throw the error
- arrow_err!(e)
- }
- };
+ Ok(Transformed::yes(Arc::new(Literal::new(null_value))
as _))
+ })
+ // If the column is not in the table schema, should throw
the error
+ .map_err(|e| arrow_datafusion_err!(e));
}
}
Ok(Transformed::no(expr))
}
}
+type ProjectionAndExpr = (Vec<usize>, Arc<dyn PhysicalExpr>);
+
+// Checks if a given expression can be pushed down into `ParquetExec` as
opposed to being evaluated
+// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns
None. If it can't be
+// pushed down, this returns the content of
[`PushdownChecker::required_column_indices`],
+// transformed into a [`Vec`].
+pub fn non_pushdown_columns(
+ expr: Arc<dyn PhysicalExpr>,
+ file_schema: &Schema,
+ table_schema: &Schema,
+) -> Result<Option<ProjectionAndExpr>> {
+ let mut checker = PushdownChecker {
Review Comment:
Done in
https://github.com/apache/datafusion/pull/12135/commits/755fd69cca3d8112a1ad26eaae14f62f026422ee
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]