adriangb commented on issue #15220:
URL: https://github.com/apache/datafusion/issues/15220#issuecomment-2727534085

   Here's an example of how this functionality currently breaks with a custom 
SchemaAdapter:
   
   <details>
   
   <summary>Example</summary>
   
   
   ```rust
   use std::any::Any;
   use std::sync::Arc;
   
   use arrow::array::{BooleanArray, RecordBatch, StringArray};
   use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use async_trait::async_trait;
   
   use datafusion::catalog::{Session, TableProvider};
   use datafusion::common::{DFSchema, Result};
   use datafusion::datasource::listing::PartitionedFile;
   use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
   use datafusion::datasource::schema_adapter::{SchemaAdapter, 
SchemaAdapterFactory, SchemaMapper};
   use datafusion::execution::context::SessionContext;
   use datafusion::execution::object_store::ObjectStoreUrl;
   use datafusion::logical_expr::utils::conjunction;
   use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
   use datafusion::parquet::arrow::ArrowWriter;
   use datafusion::physical_plan::ExecutionPlan;
   use datafusion::prelude::lit;
   use futures::StreamExt;
   use object_store::memory::InMemory;
   use object_store::path::Path;
   use object_store::{ObjectStore, PutPayload};
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let (table_schema, batch) = create_sample_data();
   
       let store = InMemory::new();
       let buf = {
           let mut buf = vec![];
   
           let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), 
None).expect("creating writer");
   
           writer.write(&batch).expect("Writing batch");
           writer.close().unwrap();
           buf
       };
       let path = Path::from("example.parquet");
       let payload = PutPayload::from_bytes(buf.into());
       store.put(&path, payload).await?;
   
       let ctx = SessionContext::new();
       ctx.runtime_env()
           .register_object_store(ObjectStoreUrl::parse("memory://")?.as_ref(), 
Arc::new(store));
   
       let table_provider = 
Arc::new(ExampleTableProvider::new(table_schema.clone(), false));
       ctx.register_table("data", table_provider)?;
       let batches = ctx
           .sql("SELECT name FROM data WHERE NOT is_admin")
           .await?
           .collect()
           .await?;
       arrow::util::pretty::print_batches(&batches)?;
   
       ctx.deregister_table("data")?;
       let table_provider = Arc::new(ExampleTableProvider::new(table_schema, 
true));
       ctx.register_table("data", table_provider)?;
       let batches = ctx
           .sql("SELECT name FROM data WHERE NOT is_admin")
           .await?
           .collect()
           .await?;
       arrow::util::pretty::print_batches(&batches)?;
   
       Ok(())
   }
   
   fn create_sample_data() -> (SchemaRef, RecordBatch) {
       let table_schema = Schema::new(vec![
           Field::new("name", DataType::Utf8, false),
           Field::new("is_admin", DataType::Boolean, false),
       ]);
   
       let file_schema = Schema::new(vec![Field::new("name", DataType::Utf8, 
false)]);
   
       // Create a record batch with the data
       let batch = RecordBatch::try_new(
           Arc::new(file_schema.clone()),
           vec![Arc::new(StringArray::from(vec!["Alice", "Bob"]))],
       )
       .unwrap();
   
       (Arc::new(table_schema), batch)
   }
   
   /// Custom TableProvider that uses a StructFieldRewriter
   #[derive(Debug)]
   struct ExampleTableProvider {
       schema: SchemaRef,
       pushdown_filters: bool,
   }
   
   impl ExampleTableProvider {
       fn new(schema: SchemaRef, pushdown_filters: bool) -> Self {
           Self {
               schema,
               pushdown_filters,
           }
       }
   }
   
   #[async_trait]
   impl TableProvider for ExampleTableProvider {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           self.schema.clone()
       }
   
       fn table_type(&self) -> TableType {
           TableType::Base
       }
   
       fn supports_filters_pushdown(&self, filters: &[&Expr]) -> 
Result<Vec<TableProviderFilterPushDown>> {
           Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
       }
   
       async fn scan(
           &self,
           state: &dyn Session,
           projection: Option<&Vec<usize>>,
           filters: &[Expr],
           limit: Option<usize>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           let schema = self.schema.clone();
           let df_schema = DFSchema::try_from(schema.clone())?;
           let filter = state.create_physical_expr(
               conjunction(filters.iter().cloned()).unwrap_or_else(|| 
lit(true)),
               &df_schema,
           )?;
   
           let parquet_source = ParquetSource::default()
               .with_predicate(self.schema.clone(), filter)
               .with_pushdown_filters(self.pushdown_filters)
               
.with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
   
           let object_store_url = ObjectStoreUrl::parse("memory://")?;
   
           let store = state.runtime_env().object_store(object_store_url)?;
   
           let mut files = vec![];
           let mut listing = store.list(None);
           while let Some(file) = listing.next().await {
               if let Ok(file) = file {
                   files.push(file);
               }
           }
   
           let file_group = files
               .iter()
               .map(|file| PartitionedFile::new(file.location.clone(), 
u64::try_from(file.size).expect("fits in a u64")))
               .collect();
   
           let file_scan_config =
               FileScanConfig::new(ObjectStoreUrl::parse("memory://")?, schema, 
Arc::new(parquet_source))
                   .with_projection(projection.cloned())
                   .with_limit(limit)
                   .with_file_group(file_group);
   
           Ok(file_scan_config.build())
       }
   }
   
   #[derive(Debug)]
   struct CustomSchemaMapper;
   
   impl SchemaMapper for CustomSchemaMapper {
       fn map_batch(&self, batch: RecordBatch) -> 
datafusion::common::Result<RecordBatch> {
           // add an is_admin column to the batch
           let is_admin = BooleanArray::from(vec![false, false]);
           let mut new_columns = batch.columns().to_vec();
           new_columns.push(Arc::new(is_admin));
           let new_schema = Schema::new(vec![
               Field::new("name", DataType::Utf8, false),
               Field::new("is_admin", DataType::Boolean, false),
           ]);
           let new_batch = RecordBatch::try_new(Arc::new(new_schema), 
new_columns)?;
           Ok(new_batch)
       }
   
       fn map_partial_batch(&self, batch: RecordBatch) -> 
datafusion::common::Result<RecordBatch> {
           Ok(batch)
       }
   }
   
   #[derive(Debug)]
   struct CustomSchemaAdapter;
   
   impl SchemaAdapter for CustomSchemaAdapter {
       fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
           if index < file_schema.fields().len() {
               Some(index)
           } else {
               None
           }
       }
   
       fn map_schema(&self, file_schema: &Schema) -> 
datafusion::common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
           let schema_mapper = Arc::new(CustomSchemaMapper);
           let column_indices = (0..file_schema.fields().len()).collect();
           Ok((schema_mapper, column_indices))
       }
   }
   
   #[derive(Debug)]
   struct CustomSchemaAdapterFactory;
   
   impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
       fn create(&self, _projected_table_schema: SchemaRef, _table_schema: 
SchemaRef) -> Box<dyn SchemaAdapter> {
           Box::new(CustomSchemaAdapter)
       }
   }
   ```
   
   </details>
   
   The point is that predicate pushdown assumes that any column that is in the 
table schema but not the file schema can be treated as if it is all nulls, 
which is only true for the default SchemaAdapter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to