adriangb commented on code in PR #15261: URL: https://github.com/apache/datafusion/pull/15261#discussion_r1997707832
########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -118,35 +114,25 @@ impl DatafusionArrowPredicate { /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate` pub fn try_new( candidate: FilterCandidate, - schema: &Schema, metadata: &ParquetMetaData, rows_pruned: metrics::Count, rows_matched: metrics::Count, time: metrics::Time, - schema_mapping: Arc<dyn SchemaMapper>, ) -> Result<Self> { - let schema = Arc::new(schema.project(&candidate.projection)?); - let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; - - // ArrowPredicate::evaluate is passed columns in the order they appear in the file - // If the predicate has multiple columns, we therefore must project the columns based - // on the order they appear in the file - let projection = match candidate.projection.len() { - 0 | 1 => vec![], - 2.. => remap_projection(&candidate.projection), Review Comment: I believe the SchemaAdapter machinery now does this work. ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -118,35 +114,25 @@ impl DatafusionArrowPredicate { /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate` pub fn try_new( Review Comment: `DatafusionArrowPredicate` is `pub(crate)` so this should be okay to edit ########## datafusion/core/src/datasource/mod.rs: ########## @@ -276,14 +276,8 @@ mod tests { ]); let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(indices, vec![0]); - - let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); - - // Mapping fails because it tries to fill in a non-nullable column with nulls - let err = mapper.map_batch(file_batch).unwrap_err().to_string(); - assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); + let err = adapter.map_schema(&file_schema).unwrap_err().to_string(); + assert_eq!(err, "Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable"); Review Comment: It's nice that this fails earlier 😄 ########## datafusion/core/src/datasource/mod.rs: ########## @@ -347,12 +341,5 @@ mod tests { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } - - fn map_partial_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result<RecordBatch> { - self.map_batch(batch) - } Review Comment: The basic idea is to get rid of `map_partial_batch` by creating a specialized SchemaAdapter for each filter that will be evaluated. This results in less methods and I think cleans up some functionality duplication between `SchemaAdapter` and filter pushdown (`remap_projections`, etc.) ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -537,12 +464,20 @@ pub fn build_row_filter( // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); + let file_schema = Arc::new(file_schema.clone()); + let table_schema = Arc::new(table_schema.clone()); Review Comment: It would have been nice to just change the caller to pass in an `Arc<>` but that would be a breaking change, this seems cheap enough to not worry too much about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org