adriangb commented on code in PR #15261:
URL: https://github.com/apache/datafusion/pull/15261#discussion_r1997707832
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -118,35 +114,25 @@ impl DatafusionArrowPredicate {
/// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
pub fn try_new(
candidate: FilterCandidate,
- schema: &Schema,
metadata: &ParquetMetaData,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
- schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
- let schema = Arc::new(schema.project(&candidate.projection)?);
- let physical_expr = reassign_predicate_columns(candidate.expr,
&schema, true)?;
-
- // ArrowPredicate::evaluate is passed columns in the order they appear
in the file
- // If the predicate has multiple columns, we therefore must project
the columns based
- // on the order they appear in the file
- let projection = match candidate.projection.len() {
- 0 | 1 => vec![],
- 2.. => remap_projection(&candidate.projection),
Review Comment:
I believe the SchemaAdapter machinery now does this work.
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -118,35 +114,25 @@ impl DatafusionArrowPredicate {
/// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
pub fn try_new(
Review Comment:
`DatafusionArrowPredicate` is `pub(crate)` so this should be okay to edit
##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -276,14 +276,8 @@ mod tests {
]);
let adapter =
DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
- let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
- assert_eq!(indices, vec![0]);
-
- let file_batch = record_batch!(("b", Float64, vec![1.0,
2.0])).unwrap();
-
- // Mapping fails because it tries to fill in a non-nullable column
with nulls
- let err = mapper.map_batch(file_batch).unwrap_err().to_string();
- assert!(err.contains("Invalid argument error: Column 'a' is declared
as non-nullable but contains null values"), "{err}");
+ let err = adapter.map_schema(&file_schema).unwrap_err().to_string();
+ assert_eq!(err, "Error during planning: Column a is missing from the
file schema, cannot be generated, and is non-nullable");
Review Comment:
It's nice that this fails earlier 😄
##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -347,12 +341,5 @@ mod tests {
Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}
-
- fn map_partial_batch(
- &self,
- batch: RecordBatch,
- ) -> datafusion_common::Result<RecordBatch> {
- self.map_batch(batch)
- }
Review Comment:
The basic idea is to get rid of `map_partial_batch` by creating a
specialized SchemaAdapter for each filter that will be evaluated. This results
in less methods and I think cleans up some functionality duplication between
`SchemaAdapter` and filter pushdown (`remap_projections`, etc.)
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -537,12 +464,20 @@ pub fn build_row_filter(
// `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
let predicates = split_conjunction(expr);
+ let file_schema = Arc::new(file_schema.clone());
+ let table_schema = Arc::new(table_schema.clone());
Review Comment:
It would have been nice to just change the caller to pass in an `Arc<>` but
that would be a breaking change, this seems cheap enough to not worry too much
about it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]