alamb commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1729945409
##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -51,69 +51,67 @@ use object_store::{ObjectMeta, ObjectStore};
/// - the table provider can filter the table partition values with this
expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact`
once this filtering
/// was performed
-pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
+pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
Review Comment:
👍
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -826,28 +833,46 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
- let support: Vec<_> = filters
+ Ok(filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
- .map(|x| x.0.clone())
+ .map(|col| col.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partition pruning, it is
exact
- TableProviderFilterPushDown::Exact
- } else {
- // otherwise, we still might be able to handle the filter
with file
- // level mechanisms such as Parquet row group pruning.
- TableProviderFilterPushDown::Inexact
+ return TableProviderFilterPushDown::Exact;
}
+
+ #[cfg(feature = "parquet")]
Review Comment:
Using a `cfg` / compile time ifdef here has two downsides:
1. It means user defined `FileFormat`s can not support the same features as
parquet
2. I think it makes the code harder to read
As an alternate, perhaps we could make this a method on the `FileFormat`
trait?
For example, rather than
```rust
#[cfg(feature = "parquet")]
```
Maybe this could look like
```rust
else {
self.format.supports_filter_pushdown(expr)
}
```
And then add a function like this to the `FileFormat` trait
```rust
/// returns the ability of the format to push down the specified filter
fn supports_filters_pushdown(
&self,
options: &ConfigOptions,
filters:&Expr,
) -> Result<TableProviderFilterPushDown> {
```
##########
datafusion/physical-expr/src/expressions/binary.rs:
##########
@@ -133,12 +134,15 @@ impl std::fmt::Display for BinaryExpr {
}
/// Invoke a boolean kernel on a pair of arrays
-macro_rules! boolean_op {
- ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
- let ll = as_boolean_array($LEFT).expect("boolean_op failed to downcast
array");
- let rr = as_boolean_array($RIGHT).expect("boolean_op failed to
downcast array");
- Ok(Arc::new($OP(&ll, &rr)?))
- }};
+#[inline]
Review Comment:
This is a nice improvement, though also I think unrelated to the specific PR
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -826,28 +833,46 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
- let support: Vec<_> = filters
+ Ok(filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
- .map(|x| x.0.clone())
+ .map(|col| col.0.as_str())
Review Comment:
this is a nice improvement to avoid clones
##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((projection, rewritten_expr)) = non_pushdown_columns(
+ Arc::clone(&self.expr),
+ self.file_schema,
+ self.table_schema,
+ )?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&self.required_column_indices,
metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection,
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
+ if let Ok(idx) = self.file_schema.index_of(column_name) {
+ self.required_column_indices.insert(idx);
+
+ if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+ self.non_primitive_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+ } else if self.table_schema.index_of(column_name).is_err() {
+ // If the column does not exist in the (un-projected) table schema
then
+ // it must be a projected column.
+ self.projected_columns = true;
+ return Some(TreeNodeRecursion::Jump);
}
+
+ None
}
}
-/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that
-/// walks the expression tree and rewrites it in preparation of becoming
-/// `FilterCandidate`.
-impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
+impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
type Node = Arc<dyn PhysicalExpr>;
- /// Called before visiting each child
fn f_down(
&mut self,
node: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
- if let Ok(idx) = self.file_schema.index_of(column.name()) {
- self.required_column_indices.insert(idx);
-
- if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
- self.non_primitive_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
- }
- } else if self.table_schema.index_of(column.name()).is_err() {
- // If the column does not exist in the (un-projected) table
schema then
- // it must be a projected column.
- self.projected_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
+ if let Some(recursion) = self.check_single_column(column.name()) {
+ return Ok(Transformed::new(node, false, recursion));
}
}
Ok(Transformed::no(node))
}
- /// After visiting all children, rewrite column references to nulls if
- /// they are not in the file schema
fn f_up(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- // if the expression is a column, is it in the file schema?
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if self.file_schema.field_with_name(column.name()).is_err() {
- // Replace the column reference with a NULL (using the type
from the table schema)
- // e.g. `column = 'foo'` is rewritten be transformed to `NULL
= 'foo'`
- //
- // See comments on `FilterCandidateBuilder` for more
information
- return match self.table_schema.field_with_name(column.name()) {
- Ok(field) => {
+ // the column expr must be in the table schema
+ return self
+ .table_schema
+ .field_with_name(column.name())
+ .and_then(|field| {
// return the null value corresponding to the data type
let null_value =
ScalarValue::try_from(field.data_type())?;
-
Ok(Transformed::yes(Arc::new(Literal::new(null_value))))
- }
- Err(e) => {
- // If the column is not in the table schema, should
throw the error
- arrow_err!(e)
- }
- };
+ Ok(Transformed::yes(Arc::new(Literal::new(null_value))
as _))
+ })
+ // If the column is not in the table schema, should throw
the error
+ .map_err(|e| arrow_datafusion_err!(e));
}
}
Ok(Transformed::no(expr))
}
}
+type ProjectionAndExpr = (Vec<usize>, Arc<dyn PhysicalExpr>);
+
+// Checks if a given expression can be pushed down into `ParquetExec` as
opposed to being evaluated
+// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns
None. If it can't be
+// pushed down, this returns the content of
[`PushdownChecker::required_column_indices`],
+// transformed into a [`Vec`].
+pub fn non_pushdown_columns(
+ expr: Arc<dyn PhysicalExpr>,
+ file_schema: &Schema,
+ table_schema: &Schema,
+) -> Result<Option<ProjectionAndExpr>> {
+ let mut checker = PushdownChecker {
Review Comment:
minor: making a `PushdownChecker::new()` type function would make this
easier to read I think and reduce the repetition in
`would_column_prevent_pushdown`
##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((projection, rewritten_expr)) = non_pushdown_columns(
+ Arc::clone(&self.expr),
+ self.file_schema,
+ self.table_schema,
+ )?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&self.required_column_indices,
metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection,
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
Review Comment:
I wonder if this was a free function would it be eaiser to use in
`would_column_prevent_pushdown` (rather than setting a field) -- something lie
```rust
/// Return true if this column could be used in a pushdown expression, false
otherwise
fn check_single_column(
column_name: &str,
file_schema: &'schema Schema,
table_schema: &'schema Schema,
) -> bool {
...
}
```
##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((projection, rewritten_expr)) = non_pushdown_columns(
+ Arc::clone(&self.expr),
+ self.file_schema,
+ self.table_schema,
+ )?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&self.required_column_indices,
metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection,
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
+ if let Ok(idx) = self.file_schema.index_of(column_name) {
+ self.required_column_indices.insert(idx);
+
+ if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+ self.non_primitive_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+ } else if self.table_schema.index_of(column_name).is_err() {
+ // If the column does not exist in the (un-projected) table schema
then
+ // it must be a projected column.
+ self.projected_columns = true;
+ return Some(TreeNodeRecursion::Jump);
}
+
+ None
}
}
-/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that
-/// walks the expression tree and rewrites it in preparation of becoming
-/// `FilterCandidate`.
-impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
+impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
type Node = Arc<dyn PhysicalExpr>;
- /// Called before visiting each child
fn f_down(
&mut self,
node: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
- if let Ok(idx) = self.file_schema.index_of(column.name()) {
- self.required_column_indices.insert(idx);
-
- if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
- self.non_primitive_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
- }
- } else if self.table_schema.index_of(column.name()).is_err() {
- // If the column does not exist in the (un-projected) table
schema then
- // it must be a projected column.
- self.projected_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
+ if let Some(recursion) = self.check_single_column(column.name()) {
+ return Ok(Transformed::new(node, false, recursion));
}
}
Ok(Transformed::no(node))
}
- /// After visiting all children, rewrite column references to nulls if
- /// they are not in the file schema
fn f_up(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- // if the expression is a column, is it in the file schema?
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if self.file_schema.field_with_name(column.name()).is_err() {
- // Replace the column reference with a NULL (using the type
from the table schema)
- // e.g. `column = 'foo'` is rewritten be transformed to `NULL
= 'foo'`
- //
- // See comments on `FilterCandidateBuilder` for more
information
- return match self.table_schema.field_with_name(column.name()) {
- Ok(field) => {
+ // the column expr must be in the table schema
+ return self
+ .table_schema
+ .field_with_name(column.name())
+ .and_then(|field| {
// return the null value corresponding to the data type
let null_value =
ScalarValue::try_from(field.data_type())?;
-
Ok(Transformed::yes(Arc::new(Literal::new(null_value))))
- }
- Err(e) => {
- // If the column is not in the table schema, should
throw the error
- arrow_err!(e)
- }
- };
+ Ok(Transformed::yes(Arc::new(Literal::new(null_value))
as _))
+ })
+ // If the column is not in the table schema, should throw
the error
+ .map_err(|e| arrow_datafusion_err!(e));
}
}
Ok(Transformed::no(expr))
}
}
+type ProjectionAndExpr = (Vec<usize>, Arc<dyn PhysicalExpr>);
+
+// Checks if a given expression can be pushed down into `ParquetExec` as
opposed to being evaluated
+// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns
None. If it can't be
Review Comment:
this comment and name seems backwards to me (but maybe I am confused) -- the
function seems to return `Some(...)` if the expression *could* be pushed down
##########
datafusion/core/src/datasource/physical_plan/parquet/mod.rs:
##########
@@ -685,10 +686,12 @@ impl ExecutionPlan for ParquetExec {
partition_index: usize,
ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let projection = match
self.base_config.file_column_projection_indices() {
- Some(proj) => proj,
- None => (0..self.base_config.file_schema.fields().len()).collect(),
- };
+ let projection = self
Review Comment:
Just confirming my understanding: these are "drive by" cleanups (not related
to the functionalty in the PR)?
##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((projection, rewritten_expr)) = non_pushdown_columns(
+ Arc::clone(&self.expr),
+ self.file_schema,
+ self.table_schema,
+ )?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&self.required_column_indices,
metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection,
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
+ if let Ok(idx) = self.file_schema.index_of(column_name) {
+ self.required_column_indices.insert(idx);
+
+ if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+ self.non_primitive_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+ } else if self.table_schema.index_of(column_name).is_err() {
+ // If the column does not exist in the (un-projected) table schema
then
+ // it must be a projected column.
+ self.projected_columns = true;
+ return Some(TreeNodeRecursion::Jump);
}
+
+ None
}
}
-/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that
-/// walks the expression tree and rewrites it in preparation of becoming
-/// `FilterCandidate`.
-impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
+impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
type Node = Arc<dyn PhysicalExpr>;
- /// Called before visiting each child
fn f_down(
&mut self,
node: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
- if let Ok(idx) = self.file_schema.index_of(column.name()) {
- self.required_column_indices.insert(idx);
-
- if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
- self.non_primitive_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
- }
- } else if self.table_schema.index_of(column.name()).is_err() {
- // If the column does not exist in the (un-projected) table
schema then
- // it must be a projected column.
- self.projected_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
+ if let Some(recursion) = self.check_single_column(column.name()) {
+ return Ok(Transformed::new(node, false, recursion));
}
}
Ok(Transformed::no(node))
}
- /// After visiting all children, rewrite column references to nulls if
- /// they are not in the file schema
fn f_up(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- // if the expression is a column, is it in the file schema?
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if self.file_schema.field_with_name(column.name()).is_err() {
- // Replace the column reference with a NULL (using the type
from the table schema)
- // e.g. `column = 'foo'` is rewritten be transformed to `NULL
= 'foo'`
- //
- // See comments on `FilterCandidateBuilder` for more
information
- return match self.table_schema.field_with_name(column.name()) {
- Ok(field) => {
+ // the column expr must be in the table schema
+ return self
+ .table_schema
+ .field_with_name(column.name())
+ .and_then(|field| {
// return the null value corresponding to the data type
let null_value =
ScalarValue::try_from(field.data_type())?;
-
Ok(Transformed::yes(Arc::new(Literal::new(null_value))))
- }
- Err(e) => {
- // If the column is not in the table schema, should
throw the error
- arrow_err!(e)
- }
- };
+ Ok(Transformed::yes(Arc::new(Literal::new(null_value))
as _))
+ })
+ // If the column is not in the table schema, should throw
the error
+ .map_err(|e| arrow_datafusion_err!(e));
}
}
Ok(Transformed::no(expr))
}
}
+type ProjectionAndExpr = (Vec<usize>, Arc<dyn PhysicalExpr>);
+
+// Checks if a given expression can be pushed down into `ParquetExec` as
opposed to being evaluated
+// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns
None. If it can't be
+// pushed down, this returns the content of
[`PushdownChecker::required_column_indices`],
+// transformed into a [`Vec`].
+pub fn non_pushdown_columns(
Review Comment:
does this need to be `pub`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]