alamb commented on code in PR #15295: URL: https://github.com/apache/datafusion/pull/15295#discussion_r2100961322
########## datafusion/datasource/src/nested_schema_adapter.rs: ########## @@ -0,0 +1,943 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`NestedStructSchemaAdapter`] and [`NestedStructSchemaAdapterFactory`] to adapt file-level record batches to a table schema. +//! +//! Adapter provides a method of translating the RecordBatches that come out of the +//! physical format into how they should be used by DataFusion. For instance, a schema +//! can be stored external to a parquet file that maps parquet logical types to arrow types. + +use arrow::datatypes::{DataType::Struct, Field, Schema, SchemaRef}; +use datafusion_common::{ColumnStatistics, Result}; +use std::sync::Arc; + +use crate::schema_adapter::{ + create_field_mapping, DefaultSchemaAdapterFactory, SchemaAdapter, + SchemaAdapterFactory, SchemaMapper, +}; +use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::compute::cast; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use datafusion_common::arrow::array::new_null_array; + +/// Factory for creating [`NestedStructSchemaAdapter`] +/// +/// This factory creates schema adapters that properly handle schema evolution +/// for nested struct fields, allowing new fields to be added to struct columns +/// over time. +#[derive(Debug, Clone, Default)] +pub struct NestedStructSchemaAdapterFactory; + +impl SchemaAdapterFactory for NestedStructSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box<dyn SchemaAdapter> { + Box::new(NestedStructSchemaAdapter::new( + projected_table_schema, + table_schema, + )) + } +} + +impl NestedStructSchemaAdapterFactory { + /// Create a new factory for mapping batches from a file schema to a table + /// schema with support for nested struct evolution. + /// + /// This is a convenience method that handles nested struct fields properly. + pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> { + Self.create(Arc::clone(&table_schema), table_schema) + } + + /// Determines if a schema contains nested struct fields that would benefit + /// from special handling during schema evolution + pub fn has_nested_structs(schema: &Schema) -> bool { + schema + .fields() + .iter() + .any(|field| matches!(field.data_type(), Struct(_))) + } + + /// Create an appropriate schema adapter based on schema characteristics. + /// Returns a NestedStructSchemaAdapter if the projected schema contains nested structs, + /// otherwise returns a DefaultSchemaAdapter. + pub fn create_adapter( + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box<dyn SchemaAdapter> { + // Use nested adapter if target has nested structs + if Self::has_nested_structs(table_schema.as_ref()) { + NestedStructSchemaAdapterFactory.create(projected_table_schema, table_schema) + } else { + // Default case for simple schemas + DefaultSchemaAdapterFactory.create(projected_table_schema, table_schema) + } + } +} + +/// A SchemaAdapter that handles schema evolution for nested struct types +#[derive(Debug, Clone)] +pub struct NestedStructSchemaAdapter { Review Comment: People have requested something like this for other DataFusion operations (such as `CAST` and coercing structs to other types) -- I wonder if there is some way we make the logic more reusable 🤔 Something like separate the schema mapping structure and actual logic out of the datasource crate ########## datafusion/core/src/datasource/listing/table.rs: ########## @@ -1178,6 +1207,31 @@ impl ListingTable { } } +/// Extension trait for FileSource to allow schema evolution support +pub trait FileSourceExt { Review Comment: this is unfortunate (that we have something here that depends on parquet). Maybe we can add a `with_schema_adapter_factor` directly to `FileSource` 🤔 ########## datafusion/core/src/datasource/listing/table.rs: ########## @@ -123,9 +127,23 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(listing_options), + schema_adapter_factory: self.schema_adapter_factory, } } + /// Add a schema adapter factory to the [`ListingTableConfig`] + /// + /// Schema adapters handle schema evolution over time, allowing the table to adapt + /// to changes in file schemas. This is particularly useful for handling nested fields + /// in formats like Parquet where the schema may evolve. + pub fn with_schema_adapter_factory( Review Comment: at a high level, it makes a lot of sense to provide the schema adapter factory to the listing table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org