alamb commented on code in PR #18998:
URL: https://github.com/apache/datafusion/pull/18998#discussion_r2590672147
##########
datafusion/pruning/src/pruning_predicate.rs:
##########
@@ -1105,6 +1106,20 @@ fn rewrite_expr_to_prunable(
None,
));
Ok((left, op, right))
+ } else if let Some(cast_col) =
column_expr_any.downcast_ref::<CastColumnExpr>() {
Review Comment:
This is a good example where the code for `CastExpr` and `CastColumnExpr`
are almost identical and the duplication seems unfortunate
I took an initial shot at removing the redundancy here
- https://github.com/apache/datafusion/pull/19097
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1257,7 +1257,7 @@ mod tests {
("c3", c3.clone()),
]);
- // batch2: c3(int8), c2(int64), c1(string), c4(string)
+ // batch2: c3(date64), c2(int64), c1(string), c4(date64)
Review Comment:
🤔 batch2 only has 3 columns (c3(date64), c2(int64) and c1(string))
```suggestion
// batch2: c3(date64), c2(int64), c1(string)
```
##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -293,255 +194,128 @@ async fn
test_custom_schema_adapter_and_custom_expression_adapter() {
];
assert_batches_eq!(expected, &batches);
- // Test using a custom schema adapter and no explicit physical expr adapter
- // This should use the custom schema adapter both for projections and
predicate pushdown
+ // Test with a custom physical expr adapter
+ // PhysicalExprAdapterFactory now handles both predicates AND projections
+ // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8
let listing_table_config =
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
.infer_options(&ctx.state())
.await
.unwrap()
.with_schema(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
+
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
let table = ListingTable::try_new(listing_table_config).unwrap();
ctx.deregister_table("t").unwrap();
ctx.register_table("t", Arc::new(table)).unwrap();
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'")
Review Comment:
it took me a while to understand why this test changed -- it is because
`CustomSchemaMapper` used to fill in the value `a` but
`CustomPhysicalExprAdapterFactory` fills in `b`.
Therefore this change makes sense to me 👍
##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -59,97 +51,6 @@ async fn write_parquet(batch: RecordBatch, store: Arc<dyn
ObjectStore>, path: &s
store.put(&Path::from(path), data.into()).await.unwrap();
}
-#[derive(Debug)]
-struct CustomSchemaAdapterFactory;
-
-impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
- fn create(
- &self,
- projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(CustomSchemaAdapter {
- logical_file_schema: projected_table_schema,
- })
- }
-}
-
-#[derive(Debug)]
-struct CustomSchemaAdapter {
- logical_file_schema: SchemaRef,
-}
-
-impl SchemaAdapter for CustomSchemaAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- for (idx, field) in file_schema.fields().iter().enumerate() {
- if field.name() == self.logical_file_schema.field(index).name() {
- return Some(idx);
- }
- }
- None
- }
-
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let projection = (0..file_schema.fields().len()).collect_vec();
- Ok((
- Arc::new(CustomSchemaMapper {
- logical_file_schema: Arc::clone(&self.logical_file_schema),
- }),
- projection,
- ))
- }
-}
-
-#[derive(Debug)]
-struct CustomSchemaMapper {
- logical_file_schema: SchemaRef,
-}
-
-impl SchemaMapper for CustomSchemaMapper {
- fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
- let mut output_columns =
- Vec::with_capacity(self.logical_file_schema.fields().len());
- for field in self.logical_file_schema.fields() {
- if let Some(array) = batch.column_by_name(field.name()) {
- output_columns.push(cast_with_options(
- array,
- field.data_type(),
- &CastOptions::default(),
- )?);
- } else {
- // Create a new array with the default value for the field type
- let default_value = match field.data_type() {
- DataType::Int64 => ScalarValue::Int64(Some(0)),
- DataType::Utf8 => ScalarValue::Utf8(Some("a".to_string())),
- _ => unimplemented!("Unsupported data type: {}",
field.data_type()),
- };
- output_columns
-
.push(default_value.to_array_of_size(batch.num_rows()).unwrap());
- }
- }
- let batch = RecordBatch::try_new_with_options(
- Arc::clone(&self.logical_file_schema),
- output_columns,
- &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
- )
- .unwrap();
- Ok(batch)
- }
-
- fn map_column_statistics(
- &self,
- _file_col_statistics: &[ColumnStatistics],
- ) -> Result<Vec<ColumnStatistics>> {
- Ok(vec![
- ColumnStatistics::new_unknown();
- self.logical_file_schema.fields().len()
- ])
- }
-}
-
// Implement a custom PhysicalExprAdapterFactory that fills in missing columns
with the default value for the field type
Review Comment:
it might help to note here that "default value for the field type" really
means:
* `1` for missing Int64 values
* `'b'` for missing Utf8 columns
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -320,13 +314,42 @@ impl FileOpener for ParquetOpener {
reader_metadata,
);
- let (schema_mapping, adapted_projections) =
- schema_adapter.map_schema(&physical_file_schema)?;
+ let mut projection =
+ ProjectionExprs::from_indices(&projection,
&logical_file_schema);
+ if let Some(expr_adapter_factory) = expr_adapter_factory {
+ let adapter = expr_adapter_factory
+ .create(
+ Arc::clone(&logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ )
+ .with_partition_values(
+ partition_fields
+ .iter()
+ .cloned()
+ .zip(partitioned_file.partition_values.clone())
+ .collect_vec(),
+ );
+ let projection_expressions = projection
Review Comment:
I found some of this somewhat hard to follow, maybe as a follow on some of
it could move to helpers or methods on ProjectionExprs -- for example maybe a
function like
```rust
impl ProjectionExprs {
// Update this ProjectionExpr by applying a closure to all the contained
expressions
fn map<F>(self, f:F) -> Result<Self> { ... }
}
```
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -464,14 +486,32 @@ impl FileOpener for ParquetOpener {
file_metrics.predicate_cache_inner_records.clone();
let predicate_cache_records =
file_metrics.predicate_cache_records.clone();
+ let stream_schema = Arc::clone(stream.schema());
+
+ // Rebase column indices to match the narrowed stream schema.
+ // The projection expressions have indices based on
physical_file_schema,
+ // but the stream only contains the columns selected by the
ProjectionMask.
+ let rebased_exprs = projection
Review Comment:
here is another example where ProjectionExprs::map or something like that
could be used
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -526,43 +499,10 @@ impl FileSource for ParquetSource {
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
let split_projection = self.projection.clone();
- let (expr_adapter_factory, schema_adapter_factory) = match (
Review Comment:
❤️
##########
datafusion/physical-expr/src/expressions/cast.rs:
##########
@@ -740,6 +740,101 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_cast_timestamp_with_timezone_to_timestamp() -> Result<()> {
Review Comment:
These seem like they could be just as easily tested with `.slt` /
`arrow_cast` calls 🤔
That would be less verbose and easier to maintain in my opinion
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -320,13 +314,42 @@ impl FileOpener for ParquetOpener {
reader_metadata,
);
- let (schema_mapping, adapted_projections) =
- schema_adapter.map_schema(&physical_file_schema)?;
+ let mut projection =
+ ProjectionExprs::from_indices(&projection,
&logical_file_schema);
+ if let Some(expr_adapter_factory) = expr_adapter_factory {
+ let adapter = expr_adapter_factory
+ .create(
+ Arc::clone(&logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ )
+ .with_partition_values(
+ partition_fields
+ .iter()
+ .cloned()
+ .zip(partitioned_file.partition_values.clone())
+ .collect_vec(),
+ );
+ let projection_expressions = projection
+ .as_ref()
+ .iter()
+ .cloned()
+ .map(|mut proj| {
+ proj.expr = adapter.rewrite(Arc::clone(&proj.expr))?;
+ Ok(proj)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ projection = ProjectionExprs::new(projection_expressions);
+ }
+ let indices = projection
Review Comment:
this might be a nice function on ProjectionExprs (the inverse of
`ProjectionExprs::from_indices`, it could be `ProjectionExprs::to_indices` or
something)
##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -293,255 +194,128 @@ async fn
test_custom_schema_adapter_and_custom_expression_adapter() {
];
assert_batches_eq!(expected, &batches);
- // Test using a custom schema adapter and no explicit physical expr adapter
- // This should use the custom schema adapter both for projections and
predicate pushdown
+ // Test with a custom physical expr adapter
+ // PhysicalExprAdapterFactory now handles both predicates AND projections
+ // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8
let listing_table_config =
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
.infer_options(&ctx.state())
.await
.unwrap()
.with_schema(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
+
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
let table = ListingTable::try_new(listing_table_config).unwrap();
ctx.deregister_table("t").unwrap();
ctx.register_table("t", Arc::new(table)).unwrap();
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'")
+ .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
.await
.unwrap()
.collect()
.await
.unwrap();
+ // With CustomPhysicalExprAdapterFactory, missing column c2 is filled with
'b'
+ // in both the predicate (c2 = 'b' becomes 'b' = 'b' -> true) and the
projection
let expected = [
"+----+----+",
"| c2 | c1 |",
"+----+----+",
- "| a | 2 |",
+ "| b | 2 |",
"+----+----+",
];
assert_batches_eq!(expected, &batches);
+}
+
+/// Test demonstrating how to implement a custom PhysicalExprAdapterFactory
+/// that fills missing columns with non-null default values.
+///
+/// This is the recommended migration path for users who previously used
+/// SchemaAdapterFactory to fill missing columns with default values.
+/// Instead of transforming batches after reading (SchemaAdapter::map_batch),
+/// the PhysicalExprAdapterFactory rewrites expressions to use literals for
+/// missing columns, achieving the same result more efficiently.
+#[tokio::test]
+async fn test_physical_expr_adapter_with_non_null_defaults() {
+ // File only has c1 column
+ let batch = record_batch!(("c1", Int32, [10, 20, 30])).unwrap();
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+ let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+ write_parquet(batch, store.clone(), "defaults_test.parquet").await;
+
+ // Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't
exist in file
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int64, false), // type differs from file
(Int32 vs Int64)
+ Field::new("c2", DataType::Utf8, true), // missing from file
+ Field::new("c3", DataType::Int64, true), // missing from file
+ ]));
+
+ let mut cfg = SessionConfig::new()
+ .with_collect_statistics(false)
+ .with_parquet_pruning(false);
+ cfg.options_mut().execution.parquet.pushdown_filters = true;
+ let ctx = SessionContext::new_with_config(cfg);
+ ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
- // Do the same test but with a custom physical expr adapter
- // Now the default schema adapter will be used for projections, but the
custom physical expr adapter will be used for predicate pushdown
+ // CustomPhysicalExprAdapterFactory fills:
+ // - missing Utf8 columns with 'b'
+ // - missing Int64 columns with 1
let listing_table_config =
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
.infer_options(&ctx.state())
.await
.unwrap()
.with_schema(table_schema.clone())
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
+
let table = ListingTable::try_new(listing_table_config).unwrap();
- ctx.deregister_table("t").unwrap();
ctx.register_table("t", Arc::new(table)).unwrap();
+
+ // Query all columns - missing columns should have default values
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+ .sql("SELECT c1, c2, c3 FROM t ORDER BY c1")
.await
.unwrap()
.collect()
.await
.unwrap();
+
+ // c1 is cast from Int32 to Int64, c2 defaults to 'b', c3 defaults to 1
let expected = [
- "+----+----+",
- "| c2 | c1 |",
- "+----+----+",
- "| | 2 |",
- "+----+----+",
+ "+----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+----+----+----+",
+ "| 10 | b | 1 |",
+ "| 20 | b | 1 |",
+ "| 30 | b | 1 |",
+ "+----+----+----+",
];
assert_batches_eq!(expected, &batches);
- // If we use both then the custom physical expr adapter will be used for
predicate pushdown and the custom schema adapter will be used for projections
- let listing_table_config =
- ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
- .infer_options(&ctx.state())
- .await
- .unwrap()
- .with_schema(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory))
-
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
- let table = ListingTable::try_new(listing_table_config).unwrap();
- ctx.deregister_table("t").unwrap();
- ctx.register_table("t", Arc::new(table)).unwrap();
+ // Verify predicates work with default values
+ // c3 = 1 should match all rows since default is 1
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+ .sql("SELECT c1 FROM t WHERE c3 = 1 ORDER BY c1")
.await
.unwrap()
.collect()
.await
.unwrap();
+
let expected = [
Review Comment:
minor it would be nice to have this be formatted (maybe ignore rustfmt)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]