adriangb commented on code in PR #16583:
URL: https://github.com/apache/datafusion/pull/16583#discussion_r2180848279


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -67,8 +69,62 @@ pub enum SchemaSource {
 
 /// Configuration for creating a [`ListingTable`]
 ///
+/// # Schema Evolution Support
 ///
-#[derive(Debug, Clone)]
+/// This configuration supports schema evolution through the optional
+/// [`SchemaAdapterFactory`]. You might want to override the default factory 
when:
+///
+/// - **Reading files with evolving schemas**: When your data files have been 
written
+///   over time with different but compatible schemas (e.g., added columns, 
renamed fields)
+/// - **Type coercion requirements**: When you need custom logic for 
converting between
+///   different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
+/// - **Column mapping**: When files have different column names or ordering 
than
+///   your expected table schema
+/// - **Backwards compatibility**: When newer table schemas need to read older 
file
+///   formats gracefully

Review Comment:
   These seem a little too general to me. The default schema adapter handles 
column ordering and the last point is generic / rehashes the whole point of a 
schema adapter.
   
   
   ```suggestion
   /// - **Type coercion requirements**: When you need custom logic for 
converting between
   ///   different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
   /// - **Column mapping**: You need to map columns with a legacy name to a 
new name
   /// - **Custom handling of missing columns**: By default they are filled in 
with nulls, but you may e.g. want to fill them in with `0` or `""`.
   ```



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -123,25 +176,72 @@ impl ListingTableConfig {
     ///
     /// If the schema is provided, it must contain only the fields in the file
     /// without the table partitioning columns.
+    ///
+    /// # Example: Specifying Table Schema
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{Schema, Field, DataType};
+    /// # let table_paths = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
+    /// # let listing_options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
+    /// let schema = Arc::new(Schema::new(vec![
+    ///     Field::new("id", DataType::Int64, false),
+    ///     Field::new("name", DataType::Utf8, true),
+    /// ]));
+    ///
+    /// let config = ListingTableConfig::new(table_paths)
+    ///     .with_listing_options(listing_options)  // Set options first
+    ///     .with_schema(schema);                    // Then set schema
+    /// ```
     pub fn with_schema(self, schema: SchemaRef) -> Self {
+        // Note: We preserve existing options state, but downstream code may 
expect
+        // options to be set. Consider calling with_listing_options() or 
infer_options()
+        // before operations that require options to be present.
+        debug_assert!(
+            self.options.is_some() || cfg!(test),
+            "ListingTableConfig::with_schema called without options set. \
+             Consider calling with_listing_options() or infer_options() first 
to avoid panics in downstream code."
+        );
+
         Self {
-            table_paths: self.table_paths,
             file_schema: Some(schema),
-            options: self.options,
             schema_source: SchemaSource::Specified,
+            ..self

Review Comment:
   Is this related to this change or drive by?



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -123,25 +176,72 @@ impl ListingTableConfig {
     ///
     /// If the schema is provided, it must contain only the fields in the file
     /// without the table partitioning columns.
+    ///
+    /// # Example: Specifying Table Schema
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{Schema, Field, DataType};
+    /// # let table_paths = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
+    /// # let listing_options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
+    /// let schema = Arc::new(Schema::new(vec![
+    ///     Field::new("id", DataType::Int64, false),
+    ///     Field::new("name", DataType::Utf8, true),
+    /// ]));
+    ///
+    /// let config = ListingTableConfig::new(table_paths)
+    ///     .with_listing_options(listing_options)  // Set options first
+    ///     .with_schema(schema);                    // Then set schema
+    /// ```
     pub fn with_schema(self, schema: SchemaRef) -> Self {
+        // Note: We preserve existing options state, but downstream code may 
expect
+        // options to be set. Consider calling with_listing_options() or 
infer_options()
+        // before operations that require options to be present.
+        debug_assert!(
+            self.options.is_some() || cfg!(test),
+            "ListingTableConfig::with_schema called without options set. \
+             Consider calling with_listing_options() or infer_options() first 
to avoid panics in downstream code."
+        );
+
         Self {
-            table_paths: self.table_paths,
             file_schema: Some(schema),
-            options: self.options,
             schema_source: SchemaSource::Specified,
+            ..self
         }
     }
 
     /// Add `listing_options` to [`ListingTableConfig`]
     ///
     /// If not provided, format and other options are inferred via
     /// [`Self::infer_options`].
+    ///
+    /// # Example: Configuring Parquet Files with Custom Options
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # let table_paths = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
+    /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
+    ///     .with_file_extension(".parquet")
+    ///     .with_collect_stat(true);
+    ///
+    /// let config = ListingTableConfig::new(table_paths)
+    ///     .with_listing_options(options);  // Configure file format and 
options
+    /// ```
     pub fn with_listing_options(self, listing_options: ListingOptions) -> Self 
{
+        // Note: This method properly sets options, but be aware that 
downstream
+        // methods like infer_schema() and try_new() require both schema and 
options
+        // to be set to function correctly.
+        debug_assert!(
+            !self.table_paths.is_empty() || cfg!(test),
+            "ListingTableConfig::with_listing_options called without 
table_paths set. \
+             Consider calling new() or new_with_multi_paths() first to 
establish table paths."
+        );
+
         Self {
-            table_paths: self.table_paths,
-            file_schema: self.file_schema,
             options: Some(listing_options),
-            schema_source: self.schema_source,
+            ..self
         }
     }

Review Comment:
   Same question as above



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -302,11 +405,58 @@ impl ListingTableConfig {
                     file_schema: self.file_schema,
                     options: Some(options),
                     schema_source: self.schema_source,
+                    schema_adapter_factory: self.schema_adapter_factory,
                 })
             }
             None => config_err!("No `ListingOptions` set for inferring 
schema"),
         }
     }
+
+    /// Set the [`SchemaAdapterFactory`] for the [`ListingTable`]
+    ///
+    /// The schema adapter factory is used to create schema adapters that can
+    /// handle schema evolution and type conversions when reading files with
+    /// different schemas than the table schema.
+    ///
+    /// If not provided, a default schema adapter factory will be used.
+    ///
+    /// # Example: Custom Schema Adapter for Type Coercion
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaAdapter};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+    /// #
+    /// # #[derive(Debug)]
+    /// # struct MySchemaAdapterFactory;
+    /// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
+    /// #     fn create(&self, _projected_table_schema: SchemaRef, 
_file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+    /// #         unimplemented!()
+    /// #     }
+    /// # }

Review Comment:
   I think the example can just show setting the DefaultSchemaAdapterFactory



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -894,6 +1047,81 @@ impl ListingTable {
         self.schema_source
     }
 
+    /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
+    ///
+    /// The schema adapter factory is used to create schema adapters that can
+    /// handle schema evolution and type conversions when reading files with
+    /// different schemas than the table schema.
+    ///
+    /// # Example: Adding Schema Evolution Support
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTable, 
ListingTableConfig, ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaAdapter};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+    /// # #[derive(Debug)]
+    /// # struct EvolutionAdapterFactory;
+    /// # impl SchemaAdapterFactory for EvolutionAdapterFactory {
+    /// #     fn create(&self, _projected_table_schema: SchemaRef, 
_file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+    /// #         unimplemented!()
+    /// #     }
+    /// # }

Review Comment:
   Same as above, we can use DefaultSchemaAdapterFactory in the examples



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1169,8 +1399,10 @@ impl ListingTable {
             self.options.collect_stat,
             inexact_stats,
         )?;
-        let (schema_mapper, _) = 
DefaultSchemaAdapterFactory::from_schema(self.schema())
-            .map_schema(self.file_schema.as_ref())?;
+
+        let schema_adapter = self.create_schema_adapter();
+        let (schema_mapper, _) = 
schema_adapter.map_schema(self.file_schema.as_ref())?;

Review Comment:
   Part of what smells wrong



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1169,8 +1399,10 @@ impl ListingTable {
             self.options.collect_stat,
             inexact_stats,
         )?;
-        let (schema_mapper, _) = 
DefaultSchemaAdapterFactory::from_schema(self.schema())
-            .map_schema(self.file_schema.as_ref())?;
+
+        let schema_adapter = self.create_schema_adapter();
+        let (schema_mapper, _) = 
schema_adapter.map_schema(self.file_schema.as_ref())?;

Review Comment:
   I guess this is just for the column statistics?



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -67,8 +69,62 @@ pub enum SchemaSource {
 
 /// Configuration for creating a [`ListingTable`]
 ///
+/// # Schema Evolution Support
 ///
-#[derive(Debug, Clone)]
+/// This configuration supports schema evolution through the optional
+/// [`SchemaAdapterFactory`]. You might want to override the default factory 
when:
+///
+/// - **Reading files with evolving schemas**: When your data files have been 
written
+///   over time with different but compatible schemas (e.g., added columns, 
renamed fields)
+/// - **Type coercion requirements**: When you need custom logic for 
converting between
+///   different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
+/// - **Column mapping**: When files have different column names or ordering 
than
+///   your expected table schema
+/// - **Backwards compatibility**: When newer table schemas need to read older 
file
+///   formats gracefully
+///
+/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which 
handles
+/// basic schema compatibility cases.
+///
+/// # Complete Example: Schema Evolution Setup
+/// ```rust
+/// # use std::sync::Arc;
+/// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
+/// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaAdapter};
+/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
+/// #
+/// # // Custom schema adapter for handling schema evolution
+/// # #[derive(Debug)]
+/// # struct EvolutionSchemaAdapterFactory;
+/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory {
+/// #     fn create(&self, projected_table_schema: SchemaRef, file_schema: 
SchemaRef) -> Box<dyn SchemaAdapter> {
+/// #         unimplemented!("Custom schema adapter implementation")
+/// #     }
+/// # }
+/// #

Review Comment:
   I don't think this should be here. It belongs alongside the `SchemaAdapter` 
trait / might already be there.



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -894,6 +1047,81 @@ impl ListingTable {
         self.schema_source
     }
 
+    /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
+    ///
+    /// The schema adapter factory is used to create schema adapters that can
+    /// handle schema evolution and type conversions when reading files with
+    /// different schemas than the table schema.
+    ///
+    /// # Example: Adding Schema Evolution Support
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTable, 
ListingTableConfig, ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaAdapter};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+    /// # #[derive(Debug)]
+    /// # struct EvolutionAdapterFactory;
+    /// # impl SchemaAdapterFactory for EvolutionAdapterFactory {
+    /// #     fn create(&self, _projected_table_schema: SchemaRef, 
_file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+    /// #         unimplemented!()
+    /// #     }
+    /// # }
+    /// # let table_path = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
+    /// # let options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
+    /// # let schema = Arc::new(Schema::new(vec![Field::new("id", 
DataType::Int64, false)]));
+    /// # let config = 
ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
+    /// # let table = ListingTable::try_new(config).unwrap();
+    /// let table_with_evolution = table
+    ///     .with_schema_adapter_factory(Arc::new(EvolutionAdapterFactory));
+    /// ```
+    pub fn with_schema_adapter_factory(
+        self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Self {
+        Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self
+        }
+    }
+
+    /// Get the [`SchemaAdapterFactory`] for this table
+    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn 
SchemaAdapterFactory>> {
+        self.schema_adapter_factory.as_ref()
+    }
+
+    /// Creates a schema adapter for mapping between file and table schemas
+    ///
+    /// Uses the configured schema adapter factory if available, otherwise 
falls back
+    /// to the default implementation.
+    fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
+        let table_schema = self.schema();
+        match &self.schema_adapter_factory {
+            Some(factory) => {
+                factory.create(Arc::clone(&table_schema), 
Arc::clone(&table_schema))
+            }
+            None => 
DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
+        }
+    }
+
+    /// Creates a file source and applies schema adapter factory if available
+    fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn 
FileSource>> {
+        let mut source = self.options.format.file_source();
+        // Apply schema adapter to source if available
+        //
+        // NOTE: This may layer the ListingTable's schema adapter factory on 
top of any
+        // existing factory that the file source already has. The composition 
semantics are:
+        // 1. The file format's existing adapter (if any) handles 
format-specific schema mapping
+        // 2. Our adapter handles table-level schema evolution requirements
+        //
+        // This layering is intentional but may need adjustment if the 
underlying source
+        // already handles the same schema evolution cases we're trying to 
address.
+        if let Some(factory) = &self.schema_adapter_factory {
+            source = source.with_schema_adapter_factory(Arc::clone(factory))?;
+        }
+        Ok(source)

Review Comment:
   Hmm this smells wrong to me. I don't think we should need two layers of 
schema adapter. Is it possible that the one on `ListingTable` is used just for 
statistics? I.e. we have:
   - A SchemaAdapter on the source that is used to adapt the batches as they 
flow out
   - A SchemaAdapter on ListingTable to adapt the statistics if they are 
collected ahead of time



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -894,6 +1047,81 @@ impl ListingTable {
         self.schema_source
     }
 
+    /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
+    ///
+    /// The schema adapter factory is used to create schema adapters that can
+    /// handle schema evolution and type conversions when reading files with
+    /// different schemas than the table schema.
+    ///
+    /// # Example: Adding Schema Evolution Support
+    /// ```rust
+    /// # use std::sync::Arc;
+    /// # use datafusion::datasource::listing::{ListingTable, 
ListingTableConfig, ListingOptions, ListingTableUrl};
+    /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaAdapter};
+    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
+    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+    /// # #[derive(Debug)]
+    /// # struct EvolutionAdapterFactory;
+    /// # impl SchemaAdapterFactory for EvolutionAdapterFactory {
+    /// #     fn create(&self, _projected_table_schema: SchemaRef, 
_file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+    /// #         unimplemented!()
+    /// #     }
+    /// # }
+    /// # let table_path = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
+    /// # let options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
+    /// # let schema = Arc::new(Schema::new(vec![Field::new("id", 
DataType::Int64, false)]));
+    /// # let config = 
ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
+    /// # let table = ListingTable::try_new(config).unwrap();
+    /// let table_with_evolution = table
+    ///     .with_schema_adapter_factory(Arc::new(EvolutionAdapterFactory));
+    /// ```
+    pub fn with_schema_adapter_factory(
+        self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Self {
+        Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self
+        }
+    }
+
+    /// Get the [`SchemaAdapterFactory`] for this table
+    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn 
SchemaAdapterFactory>> {
+        self.schema_adapter_factory.as_ref()
+    }
+
+    /// Creates a schema adapter for mapping between file and table schemas
+    ///
+    /// Uses the configured schema adapter factory if available, otherwise 
falls back
+    /// to the default implementation.
+    fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
+        let table_schema = self.schema();
+        match &self.schema_adapter_factory {
+            Some(factory) => {
+                factory.create(Arc::clone(&table_schema), 
Arc::clone(&table_schema))

Review Comment:
   Not this PR's fault but this is such a sad unfortunate API... why in the 
world do we pass in the same thing twice!?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to