alamb commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1760068055


##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -95,17 +101,29 @@ pub trait SchemaMapper: Debug + Send + Sync {
 /// Basic implementation of [`SchemaAdapterFactory`] that maps columns by name
 /// and casts columns to the expected type.
 #[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
 
 impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
-    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
-        Box::new(DefaultSchemaAdapter { table_schema })
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter {
+            projected_table_schema,
+            table_schema,
+        })
     }
 }
 
+// This SchemaAdapter requires both the table schema and the projected table 
schema because of the
+// needs of the [`SchemaMapping`] it creates. Read its documentation for more 
details

Review Comment:
   ```suggestion
   /// This SchemaAdapter requires both the table schema and the projected 
table schema because of the
   /// needs of the [`SchemaMapping`] it creates. Read its documentation for 
more details
   ```



##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 
 /// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
 /// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because 
its different
+/// functions have different needs. The `map_batch` function is only used by 
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the 
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however, 
is used to
 #[derive(Debug)]
 pub struct SchemaMapping {
-    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
-    table_schema: SchemaRef,
-    /// Mapping from field index in `table_schema` to index in projected 
file_schema
+    /// The schema of the table. This is the expected schema after conversion 
and it should match
+    /// the schema of the query result.
+    projected_table_schema: SchemaRef,
+    /// Mapping from field index in `projected_table_schema` to index in 
projected file_schema.
+    /// They are Options instead of just plain `usize`s because the table 
could have fields that
+    /// don't exist in the file.
     field_mappings: Vec<Option<usize>>,
+    /// The entire table schema, as opposed to the projected_table_schema 
(which only contains the
+    /// columns that we are projecting out of this query). This contains all 
fields in the table,
+    /// regardless of if they will be projected out or not.
+    table_schema: SchemaRef,
 }
 
 impl SchemaMapper for SchemaMapping {
-    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the 
stored mapping and
+    /// conversions. The produced RecordBatch has a schema that contains only 
the projected
+    /// columns, so if one needs a RecordBatch with a schema that references 
columns which are not
+    /// in the projected, it would be better to use `map_partial_batch`
     fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
         let batch_rows = batch.num_rows();
         let batch_cols = batch.columns().to_vec();
 
         let cols = self
-            .table_schema
+            .projected_table_schema
+            // go through each field in the projected schema
             .fields()
             .iter()
+            // and zip it with the index that maps fields from the projected 
table schema to the
+            // projected file schema in `batch`
             .zip(&self.field_mappings)
-            .map(|(field, file_idx)| match file_idx {
-                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
-                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            // and for each one...
+            .map(|(field, file_idx)| {
+                file_idx.map_or_else(
+                    // If this field only exists in the table, and not in the 
file, then we know
+                    // that it's null, so just return that.
+                    || Ok(new_null_array(field.data_type(), batch_rows)),
+                    // However, if it does exist in both, then try to cast it 
to the correct output
+                    // type
+                    |batch_idx| cast(&batch_cols[batch_idx], 
field.data_type()),
+                )
             })
             .collect::<datafusion_common::Result<Vec<_>, _>>()?;
 
         // Necessary to handle empty batches
         let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
 
-        let schema = self.table_schema.clone();
+        let schema = self.projected_table_schema.clone();
         let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
         Ok(record_batch)
     }
 
+    /// Adapts a [`RecordBatch`]'s schema into one that has all the correct 
output types and only
+    /// contains the fields that exist in both the file schema and table 
schema.

Review Comment:
   ```suggestion
       /// contains the fields that exist in both the file schema and table 
schema.
       ///
       /// Unlike `map_batch` this method also preserves the columns that 
       /// may not appear in the final output (`projected_table_schema`) but may
       /// appear in push down predicates
   ```



##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,16 +81,11 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2)
-04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2)]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2)]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN 
b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, 
required_guarantees=[]
+03)----ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1 
= b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[]

Review Comment:
   
![so-beautiful](https://github.com/user-attachments/assets/d301cd75-92db-47d9-8a76-e22aa49a388e)
   



##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,16 +81,11 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2)
-04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2)]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2)]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN 
b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, 
required_guarantees=[]

Review Comment:
   A minor nit is we could remove the reference to  
https://github.com/apache/datafusion/issues/4028
   
   So remove this line (I think there are a few in the test):
   
   > # once https://github.com/apache/datafusion/issues/4028 is fixed



##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 
 /// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
 /// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because 
its different
+/// functions have different needs. The `map_batch` function is only used by 
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the 
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however, 
is used to
 #[derive(Debug)]
 pub struct SchemaMapping {
-    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
-    table_schema: SchemaRef,
-    /// Mapping from field index in `table_schema` to index in projected 
file_schema
+    /// The schema of the table. This is the expected schema after conversion 
and it should match
+    /// the schema of the query result.
+    projected_table_schema: SchemaRef,
+    /// Mapping from field index in `projected_table_schema` to index in 
projected file_schema.
+    /// They are Options instead of just plain `usize`s because the table 
could have fields that
+    /// don't exist in the file.
     field_mappings: Vec<Option<usize>>,
+    /// The entire table schema, as opposed to the projected_table_schema 
(which only contains the
+    /// columns that we are projecting out of this query). This contains all 
fields in the table,
+    /// regardless of if they will be projected out or not.
+    table_schema: SchemaRef,
 }
 
 impl SchemaMapper for SchemaMapping {
-    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the 
stored mapping and
+    /// conversions. The produced RecordBatch has a schema that contains only 
the projected
+    /// columns, so if one needs a RecordBatch with a schema that references 
columns which are not
+    /// in the projected, it would be better to use `map_partial_batch`
     fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
         let batch_rows = batch.num_rows();
         let batch_cols = batch.columns().to_vec();
 
         let cols = self
-            .table_schema
+            .projected_table_schema
+            // go through each field in the projected schema
             .fields()
             .iter()
+            // and zip it with the index that maps fields from the projected 
table schema to the
+            // projected file schema in `batch`
             .zip(&self.field_mappings)
-            .map(|(field, file_idx)| match file_idx {
-                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
-                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            // and for each one...
+            .map(|(field, file_idx)| {
+                file_idx.map_or_else(
+                    // If this field only exists in the table, and not in the 
file, then we know
+                    // that it's null, so just return that.
+                    || Ok(new_null_array(field.data_type(), batch_rows)),
+                    // However, if it does exist in both, then try to cast it 
to the correct output
+                    // type
+                    |batch_idx| cast(&batch_cols[batch_idx], 
field.data_type()),
+                )
             })
             .collect::<datafusion_common::Result<Vec<_>, _>>()?;
 
         // Necessary to handle empty batches
         let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
 
-        let schema = self.table_schema.clone();
+        let schema = self.projected_table_schema.clone();
         let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
         Ok(record_batch)
     }
 
+    /// Adapts a [`RecordBatch`]'s schema into one that has all the correct 
output types and only
+    /// contains the fields that exist in both the file schema and table 
schema.
     fn map_partial_batch(
         &self,
         batch: RecordBatch,
     ) -> datafusion_common::Result<RecordBatch> {
         let batch_cols = batch.columns().to_vec();
         let schema = batch.schema();
 
-        let mut cols = vec![];
-        let mut fields = vec![];
-        for (i, f) in schema.fields().iter().enumerate() {
-            let table_field = self.table_schema.field_with_name(f.name());
-            if let Ok(tf) = table_field {
-                cols.push(cast(&batch_cols[i], tf.data_type())?);
-                fields.push(tf.clone());
-            }
-        }
+        // for each field in the batch's schema...
+        let (cols, fields) = schema
+            .fields()
+            .iter()
+            .zip(batch_cols.iter())
+            .flat_map(|(field, batch_col)| {
+                self.table_schema
+                    // try to get the same field from the table schema that we 
have stored in self
+                    .field_with_name(field.name())
+                    // and if we don't have it, that's fine, ignore it. This 
*is* an unexpected error
+                    // if we do run into it (since a file schema should be 
wholly a subset of the table
+                    // schema) but these errors should've been resolved by 
now. I guess.

Review Comment:
   I don't think it is exepected.
   
   It would happen in the case when a parquet file has *more* columns than the 
table schema.
   
   I think this can happen if you only create a table with a subset of the 
columns
   
   ```sql
   CREATE EXTERNAL TABLE foo(a int)
   STORED AS PARQUET
   LOCATION 'file_with_both_a_and_b.parquet`;
   ```



##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 
 /// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
 /// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because 
its different
+/// functions have different needs. The `map_batch` function is only used by 
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the 
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however, 
is used to

Review Comment:
   This comment appears to stop mid sentence



##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -95,17 +101,29 @@ pub trait SchemaMapper: Debug + Send + Sync {
 /// Basic implementation of [`SchemaAdapterFactory`] that maps columns by name
 /// and casts columns to the expected type.
 #[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
 
 impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
-    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
-        Box::new(DefaultSchemaAdapter { table_schema })
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter {
+            projected_table_schema,
+            table_schema,
+        })
     }
 }
 
+// This SchemaAdapter requires both the table schema and the projected table 
schema because of the
+// needs of the [`SchemaMapping`] it creates. Read its documentation for more 
details
 #[derive(Clone, Debug)]
 pub(crate) struct DefaultSchemaAdapter {
-    /// Schema for the table
+    /// The schema for the table, projected to include only the fields being 
projected by the
+    /// associated ParquetExec

Review Comment:
   ```suggestion
       /// The schema for the table, projected to include only the fields being 
output (projected) by the
       /// associated ParquetExec
   ```



##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -95,17 +101,29 @@ pub trait SchemaMapper: Debug + Send + Sync {
 /// Basic implementation of [`SchemaAdapterFactory`] that maps columns by name
 /// and casts columns to the expected type.
 #[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
 
 impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
-    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
-        Box::new(DefaultSchemaAdapter { table_schema })
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter {
+            projected_table_schema,
+            table_schema,
+        })
     }
 }
 
+// This SchemaAdapter requires both the table schema and the projected table 
schema because of the
+// needs of the [`SchemaMapping`] it creates. Read its documentation for more 
details
 #[derive(Clone, Debug)]
 pub(crate) struct DefaultSchemaAdapter {
-    /// Schema for the table
+    /// The schema for the table, projected to include only the fields being 
projected by the
+    /// associated ParquetExec
+    projected_table_schema: SchemaRef,
+    /// The entire table schema for the table we're using this to adapt.

Review Comment:
   ```suggestion
       /// The entire table schema for the table we're using this to adapt.
       ///
       /// This is used to evaluate any filters pushed down into the scan
       /// which may refer to columns that are not referred to anywhere
       /// else in the plan.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to