alamb commented on code in PR #19345:
URL: https://github.com/apache/datafusion/pull/19345#discussion_r2624086682


##########
docs/source/library-user-guide/upgrading.md:
##########
@@ -490,6 +490,24 @@ If you were using a custom `SchemaAdapterFactory` for 
schema adaptation (e.g., d
 
 See the [default column values 
example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs)
 for how to implement a custom `PhysicalExprAdapterFactory`.
 
+### `SchemaAdapter` and `SchemaAdapterFactory` completely removed
+
+The following symbols have been completely removed from DataFusion:
+
+- `SchemaAdapter` trait
+- `SchemaAdapterFactory` trait
+- `SchemaMapper` trait
+- `SchemaMapping` struct
+- `DefaultSchemaAdapterFactory` struct
+
+These types were previously used to adapt record batch schemas during file 
reading.

Review Comment:
   This is likely to cause non trivial pain for anyone who uses the 
SchemaAdapter during upgrade
   
   However, I am not sure if leaving the code in but disconnected would be any 
better.
   
   Thus I think we should go with this PR and we can help with some more 
writeups when we start testing the upgrade with downstream crates (like 
delta.rs) 



##########
datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs:
##########
@@ -1,752 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use arrow::array::RecordBatch;
-
-use arrow_schema::{DataType, Field, Schema, SchemaRef};
-use bytes::{BufMut, BytesMut};
-use datafusion::common::Result;
-use datafusion::config::{ConfigOptions, TableParquetOptions};
-use datafusion::datasource::listing::PartitionedFile;
-#[cfg(feature = "parquet")]
-use datafusion::datasource::physical_plan::ParquetSource;
-use datafusion::datasource::physical_plan::{
-    ArrowSource, CsvSource, FileSource, JsonSource,
-};
-use datafusion::logical_expr::{col, lit};
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use datafusion_common::config::CsvOptions;
-use datafusion_common::record_batch;
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::ColumnStatistics;
-use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
-use datafusion_datasource::schema_adapter::{
-    SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
-};
-
-use datafusion::assert_batches_eq;
-use datafusion_datasource::source::DataSourceExec;
-use datafusion_datasource::TableSchema;
-use datafusion_execution::object_store::ObjectStoreUrl;
-use datafusion_expr::Expr;
-use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::planner::logical2physical;
-use datafusion_physical_expr::projection::ProjectionExprs;
-use datafusion_physical_expr_adapter::{PhysicalExprAdapter, 
PhysicalExprAdapterFactory};
-use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use object_store::{memory::InMemory, path::Path, ObjectStore};
-use parquet::arrow::ArrowWriter;
-
-async fn write_parquet(batch: RecordBatch, store: Arc<dyn ObjectStore>, path: 
&str) {
-    write_batches_to_parquet(&[batch], store, path).await;
-}
-
-/// Write RecordBatches to a Parquet file with each batch in its own row group.
-async fn write_batches_to_parquet(
-    batches: &[RecordBatch],
-    store: Arc<dyn ObjectStore>,
-    path: &str,
-) -> usize {
-    let mut out = BytesMut::new().writer();
-    {
-        let mut writer =
-            ArrowWriter::try_new(&mut out, batches[0].schema(), None).unwrap();
-        for batch in batches {
-            writer.write(batch).unwrap();
-            writer.flush().unwrap();
-        }
-        writer.finish().unwrap();
-    }
-    let data = out.into_inner().freeze();
-    let file_size = data.len();
-    store.put(&Path::from(path), data.into()).await.unwrap();
-    file_size
-}
-
-/// A schema adapter factory that transforms column names to uppercase
-#[derive(Debug, PartialEq)]
-struct UppercaseAdapterFactory {}
-
-impl SchemaAdapterFactory for UppercaseAdapterFactory {
-    fn create(
-        &self,
-        projected_table_schema: SchemaRef,
-        _table_schema: SchemaRef,
-    ) -> Box<dyn SchemaAdapter> {
-        Box::new(UppercaseAdapter {
-            table_schema: projected_table_schema,
-        })
-    }
-}
-
-/// Schema adapter that transforms column names to uppercase
-#[derive(Debug)]
-struct UppercaseAdapter {
-    table_schema: SchemaRef,
-}
-
-impl SchemaAdapter for UppercaseAdapter {
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        let field = self.table_schema.field(index);
-        let uppercase_name = field.name().to_uppercase();
-        file_schema
-            .fields()
-            .iter()
-            .position(|f| f.name().to_uppercase() == uppercase_name)
-    }
-
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let mut projection = Vec::new();
-
-        // Map each field in the table schema to the corresponding field in 
the file schema
-        for table_field in self.table_schema.fields() {
-            let uppercase_name = table_field.name().to_uppercase();
-            if let Some(pos) = file_schema
-                .fields()
-                .iter()
-                .position(|f| f.name().to_uppercase() == uppercase_name)
-            {
-                projection.push(pos);
-            }
-        }
-
-        let mapper = UppercaseSchemaMapper {
-            output_schema: self.output_schema(),
-            projection: projection.clone(),
-        };
-
-        Ok((Arc::new(mapper), projection))
-    }
-}
-
-impl UppercaseAdapter {
-    fn output_schema(&self) -> SchemaRef {
-        let fields: Vec<Field> = self
-            .table_schema
-            .fields()
-            .iter()
-            .map(|f| {
-                Field::new(
-                    f.name().to_uppercase().as_str(),
-                    f.data_type().clone(),
-                    f.is_nullable(),
-                )
-            })
-            .collect();
-
-        Arc::new(Schema::new(fields))
-    }
-}
-
-#[derive(Debug)]
-struct UppercaseSchemaMapper {
-    output_schema: SchemaRef,
-    projection: Vec<usize>,
-}
-
-impl SchemaMapper for UppercaseSchemaMapper {
-    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-        let columns = self
-            .projection
-            .iter()
-            .map(|&i| batch.column(i).clone())
-            .collect::<Vec<_>>();
-        Ok(RecordBatch::try_new(self.output_schema.clone(), columns)?)
-    }
-
-    fn map_column_statistics(
-        &self,
-        stats: &[ColumnStatistics],
-    ) -> Result<Vec<ColumnStatistics>> {
-        Ok(self
-            .projection
-            .iter()
-            .map(|&i| stats.get(i).cloned().unwrap_or_default())
-            .collect())
-    }
-}
-
-/// A physical expression adapter factory that maps uppercase column names to 
lowercase
-#[derive(Debug)]
-struct UppercasePhysicalExprAdapterFactory;
-
-impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory {
-    fn create(
-        &self,
-        _logical_file_schema: SchemaRef,
-        physical_file_schema: SchemaRef,
-    ) -> Arc<dyn PhysicalExprAdapter> {
-        Arc::new(UppercasePhysicalExprAdapter {
-            physical_file_schema,
-        })
-    }
-}
-
-#[derive(Debug)]
-struct UppercasePhysicalExprAdapter {
-    physical_file_schema: SchemaRef,
-}
-
-impl PhysicalExprAdapter for UppercasePhysicalExprAdapter {
-    fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
-        expr.transform(|e| {
-            if let Some(column) = e.as_any().downcast_ref::<Column>() {
-                // Map uppercase column name (from logical schema) to 
lowercase (in physical file)
-                let lowercase_name = column.name().to_lowercase();
-                if let Ok(idx) = 
self.physical_file_schema.index_of(&lowercase_name) {
-                    return Ok(Transformed::yes(
-                        Arc::new(Column::new(&lowercase_name, idx))
-                            as Arc<dyn PhysicalExpr>,
-                    ));
-                }
-            }
-            Ok(Transformed::no(e))
-        })
-        .data()
-    }
-}
-
-#[derive(Clone)]
-struct ParquetTestCase {
-    table_schema: TableSchema,
-    batches: Vec<RecordBatch>,
-    predicate: Option<Expr>,
-    projection: Option<ProjectionExprs>,
-    push_down_filters: bool,
-}
-
-impl ParquetTestCase {
-    fn new(table_schema: TableSchema, batches: Vec<RecordBatch>) -> Self {
-        Self {
-            table_schema,
-            batches,
-            predicate: None,
-            projection: None,
-            push_down_filters: true,
-        }
-    }
-
-    fn push_down_filters(mut self, pushdown_filters: bool) -> Self {
-        self.push_down_filters = pushdown_filters;
-        self
-    }
-
-    fn with_predicate(mut self, predicate: Expr) -> Self {
-        self.predicate = Some(predicate);
-        self
-    }
-
-    fn with_projection(mut self, projection: ProjectionExprs) -> Self {
-        self.projection = Some(projection);
-        self
-    }
-
-    async fn execute(self) -> Result<Vec<RecordBatch>> {
-        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
-        let store_url = ObjectStoreUrl::parse("memory://").unwrap();
-        let path = "test.parquet";
-        let file_size =
-            write_batches_to_parquet(&self.batches, store.clone(), path).await;
-
-        let ctx = SessionContext::new();
-        ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
-
-        let mut table_options = TableParquetOptions::default();
-        // controlled via ConfigOptions flag; ParquetSources ORs them so if 
either is true then pushdown is enabled
-        table_options.global.pushdown_filters = false;
-        let mut file_source = Arc::new(
-            ParquetSource::new(self.table_schema.table_schema().clone())
-                .with_table_parquet_options(table_options),
-        ) as Arc<dyn FileSource>;
-
-        if let Some(projection) = self.projection {
-            file_source = 
file_source.try_pushdown_projection(&projection)?.unwrap();
-        }
-
-        if let Some(predicate) = &self.predicate {
-            let filter_expr =
-                logical2physical(predicate, self.table_schema.table_schema());
-            let mut config = ConfigOptions::default();
-            config.execution.parquet.pushdown_filters = self.push_down_filters;
-            let result = file_source.try_pushdown_filters(vec![filter_expr], 
&config)?;
-            file_source = result.updated_node.unwrap();
-        }
-
-        let config = FileScanConfigBuilder::new(store_url.clone(), file_source)
-            .with_file(PartitionedFile::new(path, file_size as u64)) // size 0 
for test
-            .with_expr_adapter(None)
-            .build();
-
-        let exec = DataSourceExec::from_data_source(config);
-        let task_ctx = ctx.task_ctx();
-        let stream = exec.execute(0, task_ctx)?;
-        datafusion::physical_plan::common::collect(stream).await
-    }
-}
-
-/// Test reading and filtering a Parquet file where the table schema is 
flipped (c, b, a) vs. the physical file schema (a, b, c)

Review Comment:
   are these scenarios covered elsewhere? I feel like (now) we could write 
these all as .slt tests 
   
   Looks like some of it is covered here:
   
   
https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/schema_evolution.slt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to