@@ -0,0 +1,868 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//   http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//! [`NestedStructSchemaAdapter`] and [`NestedStructSchemaAdapterFactory`] to 
adapt file-level record batches to a table schema.
+//! Adapter provides a method of translating the RecordBatches that come out 
of the
+//! physical format into how they should be used by DataFusion.  For instance, 
a schema
+//! can be stored external to a parquet file that maps parquet logical types 
to arrow types.
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use datafusion_common::Result;
+use std::collections::HashMap;
+use std::sync::Arc;
+use crate::schema_adapter::{
+    DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, 
+use arrow::array::{Array, ArrayRef, StructArray};
+use arrow::compute::cast;
+use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use datafusion_common::arrow::array::new_null_array;
+/// Factory for creating [`NestedStructSchemaAdapter`]
+/// This factory creates schema adapters that properly handle schema evolution
+/// for nested struct fields, allowing new fields to be added to struct columns
+/// over time.
+#[derive(Debug, Clone, Default)]
+pub struct NestedStructSchemaAdapterFactory;
+impl SchemaAdapterFactory for NestedStructSchemaAdapterFactory {
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(NestedStructSchemaAdapter::new(
+            projected_table_schema,
+            table_schema,
+        ))
+    }
+impl NestedStructSchemaAdapterFactory {
+    /// Create a new factory for mapping batches from a file schema to a table
+    /// schema with support for nested struct evolution.
+    ///
+    /// This is a convenience method that handles nested struct fields 
+    pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        Self.create(Arc::clone(&table_schema), table_schema)
+    }
+    /// Determines if a schema contains nested struct fields that would benefit
+    /// from special handling during schema evolution
+    pub fn has_nested_structs(schema: &Schema) -> bool {
+        schema
+            .fields()
+            .iter()
+            .any(|field| matches!(field.data_type(), DataType::Struct(_)))
+    }
+    /// Create an appropriate schema adapter based on schema characteristics.
+    /// Returns a NestedStructSchemaAdapter if the projected schema contains 
nested structs,
+    /// otherwise returns a DefaultSchemaAdapter.
+    pub fn create_adapter(
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        // Use nested adapter if target has nested structs
+        if Self::has_nested_structs(table_schema.as_ref()) {
+            NestedStructSchemaAdapterFactory.create(projected_table_schema, 
+        } else {
+            // Default case for simple schemas
+            DefaultSchemaAdapterFactory.create(projected_table_schema, 
+        }
+    }
+/// A SchemaAdapter that handles schema evolution for nested struct types
+#[derive(Debug, Clone)]
+pub struct NestedStructSchemaAdapter {
+    /// The schema for the table, projected to include only the fields being 
output (projected) by the
+    /// associated ParquetSource
+    projected_table_schema: SchemaRef,
+    /// The entire table schema for the table we're using this to adapt.
+    ///
+    /// This is used to evaluate any filters pushed down into the scan
+    /// which may refer to columns that are not referred to anywhere
+    /// else in the plan.
+    table_schema: SchemaRef,
+/// Adapt the source schema fields to match the target schema while preserving
+/// nested struct fields and handling field additions/removals
+/// The helper function adapt_fields creates a HashMap from the source fields 
for each call.
+/// If this function is called frequently or on large schemas, consider 
whether the
+/// performance overhead is acceptable or if caching/optimizing the lookup 
could be beneficial.
+fn adapt_fields(source_fields: &Fields, target_fields: &Fields) -> Vec<Field> {
+    let mut adapted_fields = Vec::new();
+    let source_map: HashMap<_, _> = source_fields
+        .iter()
+        .map(|f| (f.name().as_str(), f))
+        .collect();
+    for target_field in target_fields {
+        match source_map.get(target_field.name().as_str()) {
+            Some(source_field) => {
+                match (source_field.data_type(), target_field.data_type()) {
+                    // Recursively adapt nested struct fields
+                    (
+                        DataType::Struct(source_children),
+                        DataType::Struct(target_children),
+                    ) => {
+                        let adapted_children =
+                            adapt_fields(source_children, target_children);
+                        adapted_fields.push(Field::new(
+                            target_field.name(),
+                            DataType::Struct(adapted_children.into()),
+                            target_field.is_nullable(),
+                        ));
+                    }
+                    // If types match exactly, keep source field
+                    _ if source_field.data_type() == target_field.data_type() 
=> {
+                        adapted_fields.push(source_field.as_ref().clone());
+                    }
+                    // Types don't match - use target field definition
+                    _ => {
+                        adapted_fields.push(target_field.as_ref().clone());
+                    }
+                }
+            }
+            // Field doesn't exist in source - add from target
+            None => {
+                adapted_fields.push(target_field.as_ref().clone());
+            }
+        }
+    }
+    adapted_fields
+impl NestedStructSchemaAdapter {
+    /// Create a new NestedStructSchemaAdapter with the target schema
+    pub fn new(projected_table_schema: SchemaRef, table_schema: SchemaRef) -> 
Self {
+        Self {
+            projected_table_schema,
+            table_schema,
+        }
+    }
+    pub fn projected_table_schema(&self) -> &Schema {
+        self.projected_table_schema.as_ref()
+    }
+    pub fn table_schema(&self) -> &Schema {
+        self.table_schema.as_ref()
+    }
+    // Takes a source schema and transforms it to match the structure of the 
target schema.
+    fn adapt_schema(&self, source_schema: SchemaRef) -> Result<SchemaRef> {
+        let adapted_fields =
+            adapt_fields(source_schema.fields(), self.table_schema.fields());
+        Ok(Arc::new(Schema::new_with_metadata(
+            adapted_fields,
+            self.table_schema.metadata().clone(),
+        )))
+    }
+    /// Create a schema mapping that can transform data from source schema to 
target schema
+    fn create_schema_mapping(
+        &self,
+        source_schema: &Schema,
+        target_schema: &Schema,
+    ) -> Result<Arc<dyn SchemaMapper>> {
+        // Map field names between schemas
+        let mut field_mappings = Vec::new();
+        for target_field in target_schema.fields() {
+            let index = source_schema.index_of(target_field.name());
+            field_mappings.push(index.ok());
+        }
+        // Create our custom NestedStructSchemaMapping
+        let mapping = NestedStructSchemaMapping::new(
+            Arc::new(target_schema.clone()), // projected_table_schema
+            field_mappings,                  // field_mappings
+        );
+        Ok(Arc::new(mapping))
+    }
+impl SchemaAdapter for NestedStructSchemaAdapter {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field_name = self.table_schema.field(index).name();
+        file_schema.index_of(field_name).ok()
+    }
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        // Adapt the file schema to match the target schema structure
+        let adapted_schema = self.adapt_schema(Arc::new(file_schema.clone()))?;
+        // Create a mapper that can transform data from file schema to the 
adapted schema
+        let mapper = self.create_schema_mapping(file_schema, &adapted_schema)?;
+        // Collect column indices to project from the file
+        let mut projection = Vec::new();
+        for field_name in file_schema.fields().iter().map(|f| f.name()) {
+            if let Ok(idx) = file_schema.index_of(field_name) {
+                projection.push(idx);
+            }
+        }
+        Ok((mapper, projection))
+    }
+/// A SchemaMapping implementation specifically for nested structs
+struct NestedStructSchemaMapping {
+    /// The schema for the table, projected to include only the fields being 
+    projected_table_schema: SchemaRef,
+    /// Field mappings from projected table to file schema
+    field_mappings: Vec<Option<usize>>,
+impl NestedStructSchemaMapping {
+    /// Create a new nested struct schema mapping
+    pub fn new(
+        projected_table_schema: SchemaRef,
+        field_mappings: Vec<Option<usize>>,
+    ) -> Self {
+        Self {
+            projected_table_schema,
+            field_mappings,
+        }
+    }
+/// Maps a `RecordBatch` to a new `RecordBatch` according to the schema 
mapping defined in `NestedStructSchemaMapping`.
+/// # Arguments
+/// * `batch` - The input `RecordBatch` to be mapped.
+/// # Returns
+/// A `Result` containing the new `RecordBatch` with columns adapted according 
to the schema mapping, or an error if the mapping fails.
+/// # Behavior
+/// - For each field in the projected table schema, the corresponding column 
in the input batch is adapted.
+/// - If a field does not exist in the input batch, a null array of the 
appropriate data type and length is created and used in the output batch.
+/// - If a field exists in the input batch, the column is adapted to handle 
potential nested struct adaptation.
+/// # Errors
+/// Returns an error if the column adaptation fails or if the new 
`RecordBatch` cannot be created.
+impl SchemaMapper for NestedStructSchemaMapping {
+    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        let batch_rows = batch.num_rows();
+        let batch_cols = batch.columns().to_vec();
+        let cols = self
+            .projected_table_schema
+            .fields()
+            .iter()
+            .zip(&self.field_mappings)
+            .map(|(field, file_idx)| {
+                file_idx.map_or_else(
+                    // If field doesn't exist in file, return null array
+                    || Ok(new_null_array(field.data_type(), batch_rows)),
+                    // If field exists, handle potential nested struct 
+                    |batch_idx| adapt_column(&batch_cols[batch_idx], field),
+                )
+            })
+            .collect::<Result<Vec<_>, _>>()?;
+        // Create record batch with adapted columns
+        let options = 
+        let schema = Arc::clone(&self.projected_table_schema);
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
+        Ok(record_batch)
+    }
+// Helper methods for the NestedStructSchemaMapping
+/// Adapt a column to match the target field type, handling nested structs 
+fn adapt_column(source_col: &ArrayRef, target_field: &Field) -> 
Result<ArrayRef> {
+    match target_field.data_type() {
+        DataType::Struct(target_fields) => {
+            // For struct arrays, we need to handle them specially
+            if let Some(struct_array) = 
+            {
+                // Create a vector to store field-array pairs with the correct 
+                let mut children: Vec<(Arc<Field>, Arc<dyn Array>)> = 
+                let num_rows = source_col.len();
+                // For each field in the target schema
+                for target_child_field in target_fields {
+                    // Create Arc<Field> directly (not Arc<Arc<Field>>)
+                    let field_arc = Arc::clone(target_child_field);
+                    // Try to find corresponding field in source
+                    match 
struct_array.column_by_name(target_child_field.name()) {
+                        Some(source_child_col) => {
+                            // Field exists in source, adapt it
+                            let adapted_child =
+                                adapt_column(source_child_col, 
+                            children.push((field_arc, adapted_child));
+                        }
+                        None => {
+                            // Field doesn't exist in source, add null array
+                            children.push((
+                                field_arc,
+                                new_null_array(target_child_field.data_type(), 
+                            ));
+                        }
+                    }
+                }
+                // Create new struct array with all target fields
+                let struct_array = StructArray::from(children);
+                Ok(Arc::new(struct_array))
+            } else {
+                // Not a struct array, but target expects struct - return nulls
+                Ok(new_null_array(target_field.data_type(), source_col.len()))
+            }
+        }
+        // For non-struct types, just cast
+        _ => Ok(cast(source_col, target_field.data_type())?),
+    }
+mod tests {
+    use super::*;
+    use arrow::array::{
+        Array, Int32Array, StringBuilder, StructArray, 
+    };
+    use arrow::datatypes::{DataType, TimeUnit};
+    #[test]
+    fn test_nested_struct_evolution() -> Result<()> {
+        // Create source and target schemas using helper functions
+        let source_schema = create_basic_nested_schema();
+        let target_schema = create_deep_nested_schema();
+        let adapter =
+            NestedStructSchemaAdapter::new(target_schema.clone(), 
+        let adapted = adapter.adapt_schema(source_schema)?;
+        // Verify the adapted schema matches target
+        assert_eq!(adapted.fields(), target_schema.fields());
+        Ok(())
+    }
+    /// Helper function to create a basic schema with a simple nested struct
+    fn create_basic_nested_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            create_additional_info_field(false), // without reason field
+        ]))
+    }
+    /// Helper function to create an enhanced schema with deeper nested structs
+    fn create_deep_nested_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            create_additional_info_field(true), // with reason field
+        ]))
+    }
+    /// Helper function to create the additionalInfo field with or without the 
reason subfield
+    fn create_additional_info_field(with_reason: bool) -> Field {
+        let mut field_children = vec![
+            Field::new("location", DataType::Utf8, true),
+            Field::new(
+                "timestamp_utc",
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                true,
+            ),
+        ];
+        // Add the reason field if requested (for target schema)
+        if with_reason {
+            field_children.push(create_reason_field());
+        }
+        Field::new(
+            "additionalInfo",
+            DataType::Struct(field_children.into()),
+            true,
+        )
+    }
+    /// Helper function to create the reason nested field with its details 
+    fn create_reason_field() -> Field {
+        Field::new(
+            "reason",
+            DataType::Struct(
+                vec![
+                    Field::new("_level", DataType::Float64, true),
+                    // Inline the details field creation
+                    Field::new(
+                        "details",
+                        DataType::Struct(
+                            vec![
+                                Field::new("rurl", DataType::Utf8, true),
+                                Field::new("s", DataType::Float64, true),
+                                Field::new("t", DataType::Utf8, true),
+                            ]
+                            .into(),
+                        ),
+                        true,
+                    ),
+                ]
+                .into(),
+            ),
+            true,
+        )
+    }
+    #[test]
+    fn test_map_schema() -> Result<()> {
+        // Create source schema with a subset of fields
+        let source_schema = Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new(
+                "metadata",
+                DataType::Struct(
+                    vec![
+                        Field::new("created", DataType::Utf8, true),
+                        Field::new("modified", DataType::Utf8, true),
+                    ]
+                    .into(),
+                ),
+                true,
+            ),
+        ]);
+        // Create target schema with additional/different fields
+        let target_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new(
+                "metadata",
+                DataType::Struct(
+                    vec![
+                        Field::new("created", DataType::Utf8, true),
+                        Field::new("modified", DataType::Utf8, true),
+                        Field::new("version", DataType::Int64, true), // Added 
+                    ]
+                    .into(),
+                ),
+                true,
+            ),
+            Field::new("description", DataType::Utf8, true), // Added field
+        ]));
+        let adapter =
+            NestedStructSchemaAdapter::new(target_schema.clone(), 
+        let (_, projection) = adapter.map_schema(&source_schema)?;
+        // Verify projection contains all columns from source schema
+        assert_eq!(projection.len(), 3);
+        assert_eq!(projection, vec![0, 1, 2]);
+        // Verify adapted schema separately
+        let adapted = adapter.adapt_schema(Arc::new(source_schema))?;
+        assert_eq!(adapted.fields().len(), 4); // Should have all target fields
+        // Check if description field exists
+        let description_idx = adapted.index_of("description");
+        assert!(description_idx.is_ok(), "Should have description field");
+        // Check nested struct has the new field
+        let metadata_idx = adapted.index_of("metadata").unwrap();
+        let metadata_field = adapted.field(metadata_idx);
+        if let DataType::Struct(fields) = metadata_field.data_type() {
+            assert_eq!(fields.len(), 3); // Should have all 3 fields including 
+            // Find version field in the Fields collection
+            let version_exists = fields.iter().any(|f| f.name() == "version");
+            assert!(
+                version_exists,
+                "Should have version field in metadata struct"
+            );
+        } else {
+            panic!("Expected struct type for metadata field");
+        }
+        Ok(())
+    }
+    #[test]
+    fn test_create_appropriate_adapter() -> Result<()> {
+        // Setup test schemas
+        let simple_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("age", DataType::Int16, true),
+        ]));
+        let nested_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new(
+                "metadata",
+                DataType::Struct(
+                    vec![
+                        Field::new("created", DataType::Utf8, true),
+                        Field::new("modified", DataType::Utf8, true),
+                    ]
+                    .into(),
+                ),
+                true,
+            ),
+        ]));
+        // Create source schema with missing field in struct
+        let source_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new(
+                "metadata",
+                DataType::Struct(
+                    vec![
+                        Field::new("created", DataType::Utf8, true),
+                        // "modified" field is missing
+                    ]
+                    .into(),
+                ),
+                true,
+            ),
+        ]));
+        // Test has_nested_structs detection
+        assert!(!NestedStructSchemaAdapterFactory::has_nested_structs(
+            &simple_schema
+        ));
+        assert!(NestedStructSchemaAdapterFactory::has_nested_structs(
+            &nested_schema
+        ));
+        // Test DefaultSchemaAdapter fails with nested schema evolution
+        let default_adapter = DefaultSchemaAdapterFactory
+            .create(nested_schema.clone(), nested_schema.clone());
+        let default_result = default_adapter.map_schema(&source_schema);
+        assert!(default_result.is_err());
+        if let Err(e) = default_result {
+            assert!(
+                format!("{}", e).contains("Cannot cast file schema field 
+                "Expected casting error, got: {e}"
+            );
+        }
+        // Test NestedStructSchemaAdapter handles the same case successfully
+        let nested_adapter = NestedStructSchemaAdapterFactory
+            .create(nested_schema.clone(), nested_schema.clone());
+        assert!(nested_adapter.map_schema(&source_schema).is_ok());
+        // Test factory selects appropriate adapter based on schema
+        let complex_adapter = NestedStructSchemaAdapterFactory::create_adapter(
+            nested_schema.clone(),
+            nested_schema.clone(),
+        );
+        // Verify complex_adapter can handle schema evolution
+        assert!(
+            complex_adapter.map_schema(&source_schema).is_ok(),
+            "Complex adapter should handle schema with missing fields"
+        );
+        Ok(())
+    }
+    #[test]
+    fn test_adapt_simple_to_nested_schema() -> Result<()> {
+        // Simple source schema with flat fields
+        let source_schema = create_flat_schema();
+        // Target schema with nested struct fields
+        let target_schema = create_nested_schema();
+        // Create mapping with our adapter - should handle missing nested 
+        let nested_adapter =
+            NestedStructSchemaAdapter::new(target_schema.clone(), 
+        let adapted = nested_adapter.adapt_schema(source_schema.clone())?;
+        // Verify structure of adapted schema
+        assert_eq!(adapted.fields().len(), 2); // Should have id and user_info
+        // Check that user_info is a struct
+        if let Ok(idx) = adapted.index_of("user_info") {
+            let user_info_field = adapted.field(idx);
+            assert!(matches!(user_info_field.data_type(), 
+            if let DataType::Struct(fields) = user_info_field.data_type() {
+                assert_eq!(fields.len(), 3); // Should have name, created_at, 
and settings
+                // Check that settings field exists and is a struct
+                let settings_idx = fields.iter().position(|f| f.name() == 
+                assert!(settings_idx.is_some(), "Settings field should exist");
+                let settings_field = &fields[settings_idx.unwrap()];
+                assert!(matches!(settings_field.data_type(), 
+                if let DataType::Struct(settings_fields) = 
settings_field.data_type() {
+                    assert_eq!(settings_fields.len(), 2); // Should have theme 
and notifications
+                    // Verify field names within settings
+                    let theme_exists =
+                        settings_fields.iter().any(|f| f.name() == "theme");
+                    let notif_exists =
+                        settings_fields.iter().any(|f| f.name() == 
+                    assert!(theme_exists, "Settings should contain theme 
+                    assert!(notif_exists, "Settings should contain 
notifications field");
+                } else {
+                    panic!("Expected struct type for settings field");
+                }
+            } else {
+                panic!("Expected struct type for user_info field");
+            }
+        } else {
+            panic!("Expected user_info field in adapted schema");
+        }
+        // Test mapper creation
+        let (_mapper, projection) = nested_adapter.map_schema(&source_schema)?;
+        // Verify the mapper was created successfully and projection includes 
expected columns
+        assert_eq!(projection.len(), source_schema.fields().len());
+        // Or check against the adapted schema we already confirmed is correct
+        assert_eq!(adapted.fields().len(), 2);
+        Ok(())
+    }
+    fn create_nested_schema() -> Arc<Schema> {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new(

