itsjunetime commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1761651689


##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 
 /// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
 /// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because 
its different
+/// functions have different needs. The `map_batch` function is only used by 
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the 
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however, 
is used to
 #[derive(Debug)]
 pub struct SchemaMapping {
-    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
-    table_schema: SchemaRef,
-    /// Mapping from field index in `table_schema` to index in projected 
file_schema
+    /// The schema of the table. This is the expected schema after conversion 
and it should match
+    /// the schema of the query result.
+    projected_table_schema: SchemaRef,
+    /// Mapping from field index in `projected_table_schema` to index in 
projected file_schema.
+    /// They are Options instead of just plain `usize`s because the table 
could have fields that
+    /// don't exist in the file.
     field_mappings: Vec<Option<usize>>,
+    /// The entire table schema, as opposed to the projected_table_schema 
(which only contains the
+    /// columns that we are projecting out of this query). This contains all 
fields in the table,
+    /// regardless of if they will be projected out or not.
+    table_schema: SchemaRef,
 }
 
 impl SchemaMapper for SchemaMapping {
-    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the 
stored mapping and
+    /// conversions. The produced RecordBatch has a schema that contains only 
the projected
+    /// columns, so if one needs a RecordBatch with a schema that references 
columns which are not
+    /// in the projected, it would be better to use `map_partial_batch`
     fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
         let batch_rows = batch.num_rows();
         let batch_cols = batch.columns().to_vec();
 
         let cols = self
-            .table_schema
+            .projected_table_schema
+            // go through each field in the projected schema
             .fields()
             .iter()
+            // and zip it with the index that maps fields from the projected 
table schema to the
+            // projected file schema in `batch`
             .zip(&self.field_mappings)
-            .map(|(field, file_idx)| match file_idx {
-                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
-                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            // and for each one...
+            .map(|(field, file_idx)| {
+                file_idx.map_or_else(
+                    // If this field only exists in the table, and not in the 
file, then we know
+                    // that it's null, so just return that.
+                    || Ok(new_null_array(field.data_type(), batch_rows)),
+                    // However, if it does exist in both, then try to cast it 
to the correct output
+                    // type
+                    |batch_idx| cast(&batch_cols[batch_idx], 
field.data_type()),
+                )
             })
             .collect::<datafusion_common::Result<Vec<_>, _>>()?;
 
         // Necessary to handle empty batches
         let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
 
-        let schema = self.table_schema.clone();
+        let schema = self.projected_table_schema.clone();
         let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
         Ok(record_batch)
     }
 
+    /// Adapts a [`RecordBatch`]'s schema into one that has all the correct 
output types and only
+    /// contains the fields that exist in both the file schema and table 
schema.
     fn map_partial_batch(
         &self,
         batch: RecordBatch,
     ) -> datafusion_common::Result<RecordBatch> {
         let batch_cols = batch.columns().to_vec();
         let schema = batch.schema();
 
-        let mut cols = vec![];
-        let mut fields = vec![];
-        for (i, f) in schema.fields().iter().enumerate() {
-            let table_field = self.table_schema.field_with_name(f.name());
-            if let Ok(tf) = table_field {
-                cols.push(cast(&batch_cols[i], tf.data_type())?);
-                fields.push(tf.clone());
-            }
-        }
+        // for each field in the batch's schema...
+        let (cols, fields) = schema
+            .fields()
+            .iter()
+            .zip(batch_cols.iter())
+            .flat_map(|(field, batch_col)| {
+                self.table_schema
+                    // try to get the same field from the table schema that we 
have stored in self
+                    .field_with_name(field.name())
+                    // and if we don't have it, that's fine, ignore it. This 
*is* an unexpected error
+                    // if we do run into it (since a file schema should be 
wholly a subset of the table
+                    // schema) but these errors should've been resolved by 
now. I guess.

Review Comment:
   Thanks for pointing this out - I've updated the comment to reflect this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to