alamb commented on code in PR #18998:
URL: https://github.com/apache/datafusion/pull/18998#discussion_r2590672147


##########
datafusion/pruning/src/pruning_predicate.rs:
##########
@@ -1105,6 +1106,20 @@ fn rewrite_expr_to_prunable(
             None,
         ));
         Ok((left, op, right))
+    } else if let Some(cast_col) = 
column_expr_any.downcast_ref::<CastColumnExpr>() {

Review Comment:
   This is a good example where the code for `CastExpr` and `CastColumnExpr` 
are  almost identical and the duplication seems unfortunate
   
   I took an initial shot at removing the redundancy here
   - https://github.com/apache/datafusion/pull/19097
   
   



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1257,7 +1257,7 @@ mod tests {
             ("c3", c3.clone()),
         ]);
 
-        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        // batch2: c3(date64), c2(int64), c1(string), c4(date64)

Review Comment:
   🤔 batch2 only has 3 columns (c3(date64), c2(int64) and c1(string))
   
   ```suggestion
           // batch2: c3(date64), c2(int64), c1(string)
   ```
   
   



##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -293,255 +194,128 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
     ];
     assert_batches_eq!(expected, &batches);
 
-    // Test using a custom schema adapter and no explicit physical expr adapter
-    // This should use the custom schema adapter both for projections and 
predicate pushdown
+    // Test with a custom physical expr adapter
+    // PhysicalExprAdapterFactory now handles both predicates AND projections
+    // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8
     let listing_table_config =
         ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
             .infer_options(&ctx.state())
             .await
             .unwrap()
             .with_schema(table_schema.clone())
-            .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
+            
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
     let table = ListingTable::try_new(listing_table_config).unwrap();
     ctx.deregister_table("t").unwrap();
     ctx.register_table("t", Arc::new(table)).unwrap();
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'")

Review Comment:
   it took me a while to understand why this test changed -- it is because 
`CustomSchemaMapper` used to fill in the value `a` but 
`CustomPhysicalExprAdapterFactory` fills in `b`.
   
   Therefore this change makes sense to me 👍 



##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -59,97 +51,6 @@ async fn write_parquet(batch: RecordBatch, store: Arc<dyn 
ObjectStore>, path: &s
     store.put(&Path::from(path), data.into()).await.unwrap();
 }
 
-#[derive(Debug)]
-struct CustomSchemaAdapterFactory;
-
-impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
-    fn create(
-        &self,
-        projected_table_schema: SchemaRef,
-        _table_schema: SchemaRef,
-    ) -> Box<dyn SchemaAdapter> {
-        Box::new(CustomSchemaAdapter {
-            logical_file_schema: projected_table_schema,
-        })
-    }
-}
-
-#[derive(Debug)]
-struct CustomSchemaAdapter {
-    logical_file_schema: SchemaRef,
-}
-
-impl SchemaAdapter for CustomSchemaAdapter {
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        for (idx, field) in file_schema.fields().iter().enumerate() {
-            if field.name() == self.logical_file_schema.field(index).name() {
-                return Some(idx);
-            }
-        }
-        None
-    }
-
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let projection = (0..file_schema.fields().len()).collect_vec();
-        Ok((
-            Arc::new(CustomSchemaMapper {
-                logical_file_schema: Arc::clone(&self.logical_file_schema),
-            }),
-            projection,
-        ))
-    }
-}
-
-#[derive(Debug)]
-struct CustomSchemaMapper {
-    logical_file_schema: SchemaRef,
-}
-
-impl SchemaMapper for CustomSchemaMapper {
-    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-        let mut output_columns =
-            Vec::with_capacity(self.logical_file_schema.fields().len());
-        for field in self.logical_file_schema.fields() {
-            if let Some(array) = batch.column_by_name(field.name()) {
-                output_columns.push(cast_with_options(
-                    array,
-                    field.data_type(),
-                    &CastOptions::default(),
-                )?);
-            } else {
-                // Create a new array with the default value for the field type
-                let default_value = match field.data_type() {
-                    DataType::Int64 => ScalarValue::Int64(Some(0)),
-                    DataType::Utf8 => ScalarValue::Utf8(Some("a".to_string())),
-                    _ => unimplemented!("Unsupported data type: {}", 
field.data_type()),
-                };
-                output_columns
-                    
.push(default_value.to_array_of_size(batch.num_rows()).unwrap());
-            }
-        }
-        let batch = RecordBatch::try_new_with_options(
-            Arc::clone(&self.logical_file_schema),
-            output_columns,
-            &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
-        )
-        .unwrap();
-        Ok(batch)
-    }
-
-    fn map_column_statistics(
-        &self,
-        _file_col_statistics: &[ColumnStatistics],
-    ) -> Result<Vec<ColumnStatistics>> {
-        Ok(vec![
-            ColumnStatistics::new_unknown();
-            self.logical_file_schema.fields().len()
-        ])
-    }
-}
-
 // Implement a custom PhysicalExprAdapterFactory that fills in missing columns 
with the default value for the field type

Review Comment:
   it might help to note here that "default value for the field type" really 
means:
   * `1` for missing Int64 values
   * `'b'` for missing Utf8 columns



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -320,13 +314,42 @@ impl FileOpener for ParquetOpener {
                 reader_metadata,
             );
 
-            let (schema_mapping, adapted_projections) =
-                schema_adapter.map_schema(&physical_file_schema)?;
+            let mut projection =
+                ProjectionExprs::from_indices(&projection, 
&logical_file_schema);
+            if let Some(expr_adapter_factory) = expr_adapter_factory {
+                let adapter = expr_adapter_factory
+                    .create(
+                        Arc::clone(&logical_file_schema),
+                        Arc::clone(&physical_file_schema),
+                    )
+                    .with_partition_values(
+                        partition_fields
+                            .iter()
+                            .cloned()
+                            .zip(partitioned_file.partition_values.clone())
+                            .collect_vec(),
+                    );
+                let projection_expressions = projection

Review Comment:
   I found some of this somewhat hard to follow, maybe as a follow on some of 
it could move to helpers or methods on ProjectionExprs -- for example maybe a 
function like
   ```rust
   impl ProjectionExprs { 
     // Update this ProjectionExpr by applying  a closure to all the contained 
expressions
     fn map<F>(self, f:F)  -> Result<Self> { ... }
   }
   ```
   
   



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -464,14 +486,32 @@ impl FileOpener for ParquetOpener {
                 file_metrics.predicate_cache_inner_records.clone();
             let predicate_cache_records = 
file_metrics.predicate_cache_records.clone();
 
+            let stream_schema = Arc::clone(stream.schema());
+
+            // Rebase column indices to match the narrowed stream schema.
+            // The projection expressions have indices based on 
physical_file_schema,
+            // but the stream only contains the columns selected by the 
ProjectionMask.
+            let rebased_exprs = projection

Review Comment:
   here is another example where ProjectionExprs::map or something like that 
could be used



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -526,43 +499,10 @@ impl FileSource for ParquetSource {
     ) -> datafusion_common::Result<Arc<dyn FileOpener>> {
         let split_projection = self.projection.clone();
 
-        let (expr_adapter_factory, schema_adapter_factory) = match (

Review Comment:
   ❤️ 



##########
datafusion/physical-expr/src/expressions/cast.rs:
##########
@@ -740,6 +740,101 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_cast_timestamp_with_timezone_to_timestamp() -> Result<()> {

Review Comment:
   These seem like they could be just as easily tested with `.slt` / 
`arrow_cast` calls 🤔 
   
    That would be less verbose and easier to maintain in my opinion



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -320,13 +314,42 @@ impl FileOpener for ParquetOpener {
                 reader_metadata,
             );
 
-            let (schema_mapping, adapted_projections) =
-                schema_adapter.map_schema(&physical_file_schema)?;
+            let mut projection =
+                ProjectionExprs::from_indices(&projection, 
&logical_file_schema);
+            if let Some(expr_adapter_factory) = expr_adapter_factory {
+                let adapter = expr_adapter_factory
+                    .create(
+                        Arc::clone(&logical_file_schema),
+                        Arc::clone(&physical_file_schema),
+                    )
+                    .with_partition_values(
+                        partition_fields
+                            .iter()
+                            .cloned()
+                            .zip(partitioned_file.partition_values.clone())
+                            .collect_vec(),
+                    );
+                let projection_expressions = projection
+                    .as_ref()
+                    .iter()
+                    .cloned()
+                    .map(|mut proj| {
+                        proj.expr = adapter.rewrite(Arc::clone(&proj.expr))?;
+                        Ok(proj)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                projection = ProjectionExprs::new(projection_expressions);
+            }
+            let indices = projection

Review Comment:
   this might be a nice function on ProjectionExprs (the inverse  of 
`ProjectionExprs::from_indices`, it could be `ProjectionExprs::to_indices` or 
something)



##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -293,255 +194,128 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
     ];
     assert_batches_eq!(expected, &batches);
 
-    // Test using a custom schema adapter and no explicit physical expr adapter
-    // This should use the custom schema adapter both for projections and 
predicate pushdown
+    // Test with a custom physical expr adapter
+    // PhysicalExprAdapterFactory now handles both predicates AND projections
+    // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8
     let listing_table_config =
         ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
             .infer_options(&ctx.state())
             .await
             .unwrap()
             .with_schema(table_schema.clone())
-            .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
+            
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
     let table = ListingTable::try_new(listing_table_config).unwrap();
     ctx.deregister_table("t").unwrap();
     ctx.register_table("t", Arc::new(table)).unwrap();
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'")
+        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
         .await
         .unwrap()
         .collect()
         .await
         .unwrap();
+    // With CustomPhysicalExprAdapterFactory, missing column c2 is filled with 
'b'
+    // in both the predicate (c2 = 'b' becomes 'b' = 'b' -> true) and the 
projection
     let expected = [
         "+----+----+",
         "| c2 | c1 |",
         "+----+----+",
-        "| a  | 2  |",
+        "| b  | 2  |",
         "+----+----+",
     ];
     assert_batches_eq!(expected, &batches);
+}
+
+/// Test demonstrating how to implement a custom PhysicalExprAdapterFactory
+/// that fills missing columns with non-null default values.
+///
+/// This is the recommended migration path for users who previously used
+/// SchemaAdapterFactory to fill missing columns with default values.
+/// Instead of transforming batches after reading (SchemaAdapter::map_batch),
+/// the PhysicalExprAdapterFactory rewrites expressions to use literals for
+/// missing columns, achieving the same result more efficiently.
+#[tokio::test]
+async fn test_physical_expr_adapter_with_non_null_defaults() {
+    // File only has c1 column
+    let batch = record_batch!(("c1", Int32, [10, 20, 30])).unwrap();
+
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+    write_parquet(batch, store.clone(), "defaults_test.parquet").await;
+
+    // Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't 
exist in file
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int64, false), // type differs from file 
(Int32 vs Int64)
+        Field::new("c2", DataType::Utf8, true),   // missing from file
+        Field::new("c3", DataType::Int64, true),  // missing from file
+    ]));
+
+    let mut cfg = SessionConfig::new()
+        .with_collect_statistics(false)
+        .with_parquet_pruning(false);
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    let ctx = SessionContext::new_with_config(cfg);
+    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
 
-    // Do the same test but with a custom physical expr adapter
-    // Now the default schema adapter will be used for projections, but the 
custom physical expr adapter will be used for predicate pushdown
+    // CustomPhysicalExprAdapterFactory fills:
+    // - missing Utf8 columns with 'b'
+    // - missing Int64 columns with 1
     let listing_table_config =
         ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
             .infer_options(&ctx.state())
             .await
             .unwrap()
             .with_schema(table_schema.clone())
             
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
+
     let table = ListingTable::try_new(listing_table_config).unwrap();
-    ctx.deregister_table("t").unwrap();
     ctx.register_table("t", Arc::new(table)).unwrap();
+
+    // Query all columns - missing columns should have default values
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+        .sql("SELECT c1, c2, c3 FROM t ORDER BY c1")
         .await
         .unwrap()
         .collect()
         .await
         .unwrap();
+
+    // c1 is cast from Int32 to Int64, c2 defaults to 'b', c3 defaults to 1
     let expected = [
-        "+----+----+",
-        "| c2 | c1 |",
-        "+----+----+",
-        "|    | 2  |",
-        "+----+----+",
+        "+----+----+----+",
+        "| c1 | c2 | c3 |",
+        "+----+----+----+",
+        "| 10 | b  | 1  |",
+        "| 20 | b  | 1  |",
+        "| 30 | b  | 1  |",
+        "+----+----+----+",
     ];
     assert_batches_eq!(expected, &batches);
 
-    // If we use both then the custom physical expr adapter will be used for 
predicate pushdown and the custom schema adapter will be used for projections
-    let listing_table_config =
-        ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
-            .infer_options(&ctx.state())
-            .await
-            .unwrap()
-            .with_schema(table_schema.clone())
-            .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory))
-            
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
-    let table = ListingTable::try_new(listing_table_config).unwrap();
-    ctx.deregister_table("t").unwrap();
-    ctx.register_table("t", Arc::new(table)).unwrap();
+    // Verify predicates work with default values
+    // c3 = 1 should match all rows since default is 1
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+        .sql("SELECT c1 FROM t WHERE c3 = 1 ORDER BY c1")
         .await
         .unwrap()
         .collect()
         .await
         .unwrap();
+
     let expected = [

Review Comment:
   minor it would be nice to have this be formatted (maybe ignore rustfmt)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to