alamb commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1760068055
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -95,17 +101,29 @@ pub trait SchemaMapper: Debug + Send + Sync {
/// Basic implementation of [`SchemaAdapterFactory`] that maps columns by name
/// and casts columns to the expected type.
#[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
- fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
- Box::new(DefaultSchemaAdapter { table_schema })
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter> {
+ Box::new(DefaultSchemaAdapter {
+ projected_table_schema,
+ table_schema,
+ })
}
}
+// This SchemaAdapter requires both the table schema and the projected table
schema because of the
+// needs of the [`SchemaMapping`] it creates. Read its documentation for more
details
Review Comment:
```suggestion
/// This SchemaAdapter requires both the table schema and the projected
table schema because of the
/// needs of the [`SchemaMapping`] it creates. Read its documentation for
more details
```
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because
its different
+/// functions have different needs. The `map_batch` function is only used by
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however,
is used to
#[derive(Debug)]
pub struct SchemaMapping {
- /// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
- table_schema: SchemaRef,
- /// Mapping from field index in `table_schema` to index in projected
file_schema
+ /// The schema of the table. This is the expected schema after conversion
and it should match
+ /// the schema of the query result.
+ projected_table_schema: SchemaRef,
+ /// Mapping from field index in `projected_table_schema` to index in
projected file_schema.
+ /// They are Options instead of just plain `usize`s because the table
could have fields that
+ /// don't exist in the file.
field_mappings: Vec<Option<usize>>,
+ /// The entire table schema, as opposed to the projected_table_schema
(which only contains the
+ /// columns that we are projecting out of this query). This contains all
fields in the table,
+ /// regardless of if they will be projected out or not.
+ table_schema: SchemaRef,
}
impl SchemaMapper for SchemaMapping {
- /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ /// Adapts a `RecordBatch` to match the `projected_table_schema` using the
stored mapping and
+ /// conversions. The produced RecordBatch has a schema that contains only
the projected
+ /// columns, so if one needs a RecordBatch with a schema that references
columns which are not
+ /// in the projected, it would be better to use `map_partial_batch`
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
let cols = self
- .table_schema
+ .projected_table_schema
+ // go through each field in the projected schema
.fields()
.iter()
+ // and zip it with the index that maps fields from the projected
table schema to the
+ // projected file schema in `batch`
.zip(&self.field_mappings)
- .map(|(field, file_idx)| match file_idx {
- Some(batch_idx) => cast(&batch_cols[*batch_idx],
field.data_type()),
- None => Ok(new_null_array(field.data_type(), batch_rows)),
+ // and for each one...
+ .map(|(field, file_idx)| {
+ file_idx.map_or_else(
+ // If this field only exists in the table, and not in the
file, then we know
+ // that it's null, so just return that.
+ || Ok(new_null_array(field.data_type(), batch_rows)),
+ // However, if it does exist in both, then try to cast it
to the correct output
+ // type
+ |batch_idx| cast(&batch_cols[batch_idx],
field.data_type()),
+ )
})
.collect::<datafusion_common::Result<Vec<_>, _>>()?;
// Necessary to handle empty batches
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
- let schema = self.table_schema.clone();
+ let schema = self.projected_table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
Ok(record_batch)
}
+ /// Adapts a [`RecordBatch`]'s schema into one that has all the correct
output types and only
+ /// contains the fields that exist in both the file schema and table
schema.
Review Comment:
```suggestion
/// contains the fields that exist in both the file schema and table
schema.
///
/// Unlike `map_batch` this method also preserves the columns that
/// may not appear in the final output (`projected_table_schema`) but may
/// appear in push down predicates
```
##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,16 +81,11 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
----
logical_plan
01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2)
-04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b > Int32(2)]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b >
Int32(2)]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN
b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END,
required_guarantees=[]
+03)----ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1
= b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[]
Review Comment:

##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,16 +81,11 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
----
logical_plan
01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2)
-04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b > Int32(2)]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b >
Int32(2)]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN
b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END,
required_guarantees=[]
Review Comment:
A minor nit is we could remove the reference to
https://github.com/apache/datafusion/issues/4028
So remove this line (I think there are a few in the test):
> # once https://github.com/apache/datafusion/issues/4028 is fixed
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because
its different
+/// functions have different needs. The `map_batch` function is only used by
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however,
is used to
#[derive(Debug)]
pub struct SchemaMapping {
- /// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
- table_schema: SchemaRef,
- /// Mapping from field index in `table_schema` to index in projected
file_schema
+ /// The schema of the table. This is the expected schema after conversion
and it should match
+ /// the schema of the query result.
+ projected_table_schema: SchemaRef,
+ /// Mapping from field index in `projected_table_schema` to index in
projected file_schema.
+ /// They are Options instead of just plain `usize`s because the table
could have fields that
+ /// don't exist in the file.
field_mappings: Vec<Option<usize>>,
+ /// The entire table schema, as opposed to the projected_table_schema
(which only contains the
+ /// columns that we are projecting out of this query). This contains all
fields in the table,
+ /// regardless of if they will be projected out or not.
+ table_schema: SchemaRef,
}
impl SchemaMapper for SchemaMapping {
- /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ /// Adapts a `RecordBatch` to match the `projected_table_schema` using the
stored mapping and
+ /// conversions. The produced RecordBatch has a schema that contains only
the projected
+ /// columns, so if one needs a RecordBatch with a schema that references
columns which are not
+ /// in the projected, it would be better to use `map_partial_batch`
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
let cols = self
- .table_schema
+ .projected_table_schema
+ // go through each field in the projected schema
.fields()
.iter()
+ // and zip it with the index that maps fields from the projected
table schema to the
+ // projected file schema in `batch`
.zip(&self.field_mappings)
- .map(|(field, file_idx)| match file_idx {
- Some(batch_idx) => cast(&batch_cols[*batch_idx],
field.data_type()),
- None => Ok(new_null_array(field.data_type(), batch_rows)),
+ // and for each one...
+ .map(|(field, file_idx)| {
+ file_idx.map_or_else(
+ // If this field only exists in the table, and not in the
file, then we know
+ // that it's null, so just return that.
+ || Ok(new_null_array(field.data_type(), batch_rows)),
+ // However, if it does exist in both, then try to cast it
to the correct output
+ // type
+ |batch_idx| cast(&batch_cols[batch_idx],
field.data_type()),
+ )
})
.collect::<datafusion_common::Result<Vec<_>, _>>()?;
// Necessary to handle empty batches
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
- let schema = self.table_schema.clone();
+ let schema = self.projected_table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
Ok(record_batch)
}
+ /// Adapts a [`RecordBatch`]'s schema into one that has all the correct
output types and only
+ /// contains the fields that exist in both the file schema and table
schema.
fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();
- let mut cols = vec![];
- let mut fields = vec![];
- for (i, f) in schema.fields().iter().enumerate() {
- let table_field = self.table_schema.field_with_name(f.name());
- if let Ok(tf) = table_field {
- cols.push(cast(&batch_cols[i], tf.data_type())?);
- fields.push(tf.clone());
- }
- }
+ // for each field in the batch's schema...
+ let (cols, fields) = schema
+ .fields()
+ .iter()
+ .zip(batch_cols.iter())
+ .flat_map(|(field, batch_col)| {
+ self.table_schema
+ // try to get the same field from the table schema that we
have stored in self
+ .field_with_name(field.name())
+ // and if we don't have it, that's fine, ignore it. This
*is* an unexpected error
+ // if we do run into it (since a file schema should be
wholly a subset of the table
+ // schema) but these errors should've been resolved by
now. I guess.
Review Comment:
I don't think it is exepected.
It would happen in the case when a parquet file has *more* columns than the
table schema.
I think this can happen if you only create a table with a subset of the
columns
```sql
CREATE EXTERNAL TABLE foo(a int)
STORED AS PARQUET
LOCATION 'file_with_both_a_and_b.parquet`;
```
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because
its different
+/// functions have different needs. The `map_batch` function is only used by
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however,
is used to
Review Comment:
This comment appears to stop mid sentence
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -95,17 +101,29 @@ pub trait SchemaMapper: Debug + Send + Sync {
/// Basic implementation of [`SchemaAdapterFactory`] that maps columns by name
/// and casts columns to the expected type.
#[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
- fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
- Box::new(DefaultSchemaAdapter { table_schema })
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter> {
+ Box::new(DefaultSchemaAdapter {
+ projected_table_schema,
+ table_schema,
+ })
}
}
+// This SchemaAdapter requires both the table schema and the projected table
schema because of the
+// needs of the [`SchemaMapping`] it creates. Read its documentation for more
details
#[derive(Clone, Debug)]
pub(crate) struct DefaultSchemaAdapter {
- /// Schema for the table
+ /// The schema for the table, projected to include only the fields being
projected by the
+ /// associated ParquetExec
Review Comment:
```suggestion
/// The schema for the table, projected to include only the fields being
output (projected) by the
/// associated ParquetExec
```
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -95,17 +101,29 @@ pub trait SchemaMapper: Debug + Send + Sync {
/// Basic implementation of [`SchemaAdapterFactory`] that maps columns by name
/// and casts columns to the expected type.
#[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
- fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
- Box::new(DefaultSchemaAdapter { table_schema })
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter> {
+ Box::new(DefaultSchemaAdapter {
+ projected_table_schema,
+ table_schema,
+ })
}
}
+// This SchemaAdapter requires both the table schema and the projected table
schema because of the
+// needs of the [`SchemaMapping`] it creates. Read its documentation for more
details
#[derive(Clone, Debug)]
pub(crate) struct DefaultSchemaAdapter {
- /// Schema for the table
+ /// The schema for the table, projected to include only the fields being
projected by the
+ /// associated ParquetExec
+ projected_table_schema: SchemaRef,
+ /// The entire table schema for the table we're using this to adapt.
Review Comment:
```suggestion
/// The entire table schema for the table we're using this to adapt.
///
/// This is used to evaluate any filters pushed down into the scan
/// which may refer to columns that are not referred to anywhere
/// else in the plan.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]