kosiew commented on code in PR #16583:
URL: https://github.com/apache/datafusion/pull/16583#discussion_r2184169486


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -894,6 +1047,81 @@ impl ListingTable {
         self.schema_source
     }
 
+    /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
+    ///
+    /// The schema adapter factory is used to create schema adapters that can
+    /// handle schema evolution and type conversions when reading files with
+    /// different schemas than the table schema.
+    ///
+    /// # Example: Adding Schema Evolution Support
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTable, 
ListingTableConfig, ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaAdapter};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+    /// # #[derive(Debug)]
+    /// # struct EvolutionAdapterFactory;
+    /// # impl SchemaAdapterFactory for EvolutionAdapterFactory {
+    /// #     fn create(&self, _projected_table_schema: SchemaRef, 
_file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+    /// #         unimplemented!()
+    /// #     }
+    /// # }
+    /// # let table_path = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
+    /// # let options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
+    /// # let schema = Arc::new(Schema::new(vec![Field::new("id", 
DataType::Int64, false)]));
+    /// # let config = 
ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
+    /// # let table = ListingTable::try_new(config).unwrap();
+    /// let table_with_evolution = table
+    ///     .with_schema_adapter_factory(Arc::new(EvolutionAdapterFactory));
+    /// ```
+    pub fn with_schema_adapter_factory(
+        self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Self {
+        Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self
+        }
+    }
+
+    /// Get the [`SchemaAdapterFactory`] for this table
+    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn 
SchemaAdapterFactory>> {
+        self.schema_adapter_factory.as_ref()
+    }
+
+    /// Creates a schema adapter for mapping between file and table schemas
+    ///
+    /// Uses the configured schema adapter factory if available, otherwise 
falls back
+    /// to the default implementation.
+    fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
+        let table_schema = self.schema();
+        match &self.schema_adapter_factory {
+            Some(factory) => {
+                factory.create(Arc::clone(&table_schema), 
Arc::clone(&table_schema))
+            }
+            None => 
DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
+        }
+    }
+
+    /// Creates a file source and applies schema adapter factory if available
+    fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn 
FileSource>> {
+        let mut source = self.options.format.file_source();
+        // Apply schema adapter to source if available
+        //
+        // NOTE: This may layer the ListingTable's schema adapter factory on 
top of any
+        // existing factory that the file source already has. The composition 
semantics are:
+        // 1. The file format's existing adapter (if any) handles 
format-specific schema mapping
+        // 2. Our adapter handles table-level schema evolution requirements
+        //
+        // This layering is intentional but may need adjustment if the 
underlying source
+        // already handles the same schema evolution cases we're trying to 
address.
+        if let Some(factory) = &self.schema_adapter_factory {
+            source = source.with_schema_adapter_factory(Arc::clone(factory))?;
+        }
+        Ok(source)

Review Comment:
   Yep.
   
    The SchemaAdapter on ListingTable is used for both adapting statistics and 
adapting batches.
   
     Here's a breakdown of how it's used:
   
      1. Adapting Statistics:
         In the list_files_for_scan method, a schema_adapter is created to map 
the column statistics from the
     file's schema to the table's schema. This ensures that the statistics are 
consistent with the overall table
      schema, especially when schema evolution is involved.
   
      2. Adapting Batches:
         In the scan method, the schema_adapter_factory is passed to the 
FileSource via
     create_file_source_with_schema_adapter. This FileSource is then used by 
the FileFormat to create the
     physical execution plan. The FileSource is responsible for reading the 
data from the files, and it uses
     the provided SchemaAdapter to adapt the RecordBatches to the table's 
schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to