adriangb commented on code in PR #16461: URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162548231
########## datafusion/physical-expr/src/schema_rewriter.rs: ########## @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical expression schema rewriting utilities + +use std::sync::Arc; + +use arrow::compute::can_cast_types; +use arrow::datatypes::{FieldRef, Schema}; +use datafusion_common::{ + exec_err, + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, ScalarValue, +}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +use crate::expressions::{self, CastExpr, Column}; + +/// Builder for rewriting physical expressions to match different schemas. +/// +/// # Example +/// +/// ```rust +/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter; +/// use arrow::datatypes::Schema; +/// +/// # fn example( +/// # predicate: std::sync::Arc<dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr>, +/// # physical_file_schema: &Schema, +/// # logical_file_schema: &Schema, +/// # ) -> datafusion_common::Result<()> { +/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema); +/// let adapted_predicate = rewriter.rewrite(predicate)?; +/// # Ok(()) +/// # } +/// ``` +pub struct PhysicalExprSchemaRewriter<'a> { + physical_file_schema: &'a Schema, + logical_file_schema: &'a Schema, + partition_fields: Vec<FieldRef>, + partition_values: Vec<ScalarValue>, +} + +impl<'a> PhysicalExprSchemaRewriter<'a> { + /// Create a new schema rewriter with the given schemas + pub fn new( + physical_file_schema: &'a Schema, + logical_file_schema: &'a Schema, + ) -> Self { + Self { + physical_file_schema, + logical_file_schema, + partition_fields: Vec::new(), + partition_values: Vec::new(), + } + } + + /// Add partition columns and their corresponding values + /// + /// When a column reference matches a partition field, it will be replaced + /// with the corresponding literal value from partition_values. + pub fn with_partition_columns( + mut self, + partition_fields: Vec<FieldRef>, + partition_values: Vec<ScalarValue>, + ) -> Self { + self.partition_fields = partition_fields; + self.partition_values = partition_values; + self + } + + /// Rewrite the given physical expression to match the target schema + /// + /// This method applies the following transformations: + /// 1. Replaces partition column references with literal values + /// 2. Handles missing columns by inserting null literals + /// 3. Casts columns when logical and physical schemas have different types + pub fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> { + expr.transform(|expr| self.rewrite_expr(expr)).data() + } + + fn rewrite_expr( + &self, + expr: Arc<dyn PhysicalExpr>, + ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { + if let Some(column) = expr.as_any().downcast_ref::<Column>() { + return self.rewrite_column(Arc::clone(&expr), column); + } + + Ok(Transformed::no(expr)) + } + + fn rewrite_column( + &self, + expr: Arc<dyn PhysicalExpr>, + column: &Column, + ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { + // Get the logical field for this column + let logical_field = match self.logical_file_schema.field_with_name(column.name()) + { + Ok(field) => field, + Err(e) => { + // If the column is a partition field, we can use the partition value + if let Some(partition_value) = self.get_partition_value(column.name()) { + return Ok(Transformed::yes(expressions::lit(partition_value))); + } + // If the column is not found in the logical schema and is not a partition value, return an error + // This should probably never be hit unless something upstream broke, but nontheless it's better + // for us to return a handleable error than to panic / do something unexpected. + return Err(e.into()); + } + }; + + // Check if the column exists in the physical schema + let physical_column_index = + match self.physical_file_schema.index_of(column.name()) { + Ok(index) => index, + Err(_) => { + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); + } + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + // TODO: do we need to sync this with what the `SchemaAdapter` actually does? Review Comment: For my own sake: the point here is that SchemaAdapter is a trait so it can do arbitrary things not just replace with nulls (that's what the default implementation does). It's super annoying to have to implement SchemaAdapterFactory and all of the 3 IIRC traits. I'd propose what we do here is just add hooks that follow the TreeNode APIs and make it a builder style struct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org