This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch df52
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/df52 by this push:
new 847c1b9db fix: [df52] handle case-insensitive column matching in
PhysicalExprAdapter (#3495)
847c1b9db is described below
commit 847c1b9db62bb6ed241f712d70580429716cabe1
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 12 06:57:54 2026 -0700
fix: [df52] handle case-insensitive column matching in PhysicalExprAdapter
(#3495)
The DefaultPhysicalExprAdapter uses exact case-sensitive name matching
(Arrow's field_with_name/index_of) to resolve columns. When a parquet
file has lowercase "a" but the table schema has uppercase "A", the lookup
fails and columns are filled with nulls.
Fix by remapping physical schema field names to match logical names
(case-insensitively) before passing to the default adapter, then
restoring original physical names in the rewritten expressions so that
downstream reassign_expr_columns can find columns in the actual parquet
stream schema.
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
native/core/src/parquet/schema_adapter.rs | 114 +++++++++++++++++++++++++++++-
1 file changed, 111 insertions(+), 3 deletions(-)
diff --git a/native/core/src/parquet/schema_adapter.rs
b/native/core/src/parquet/schema_adapter.rs
index 2a1082942..f19ec39fc 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -26,7 +26,7 @@
use crate::parquet::cast_column::CometCastColumnExpr;
use crate::parquet::parquet_support::{spark_parquet_convert,
SparkParquetOptions};
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::{ColumnStatistics, Result as DataFusionResult};
use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory, SchemaMapper};
@@ -71,24 +71,102 @@ impl SparkPhysicalExprAdapterFactory {
}
}
+/// Remap physical schema field names to match logical schema field names using
+/// case-insensitive matching. This allows the DefaultPhysicalExprAdapter
(which
+/// uses exact name matching) to correctly find columns when the parquet file
has
+/// different casing than the table schema (e.g., file has "a" but table has
"A").
+fn remap_physical_schema_names(
+ logical_schema: &SchemaRef,
+ physical_schema: &SchemaRef,
+) -> SchemaRef {
+ let logical_names: HashMap<String, &str> = logical_schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().to_lowercase(), f.name().as_str()))
+ .collect();
+
+ let remapped_fields: Vec<_> = physical_schema
+ .fields()
+ .iter()
+ .map(|field| {
+ if let Some(logical_name) =
logical_names.get(&field.name().to_lowercase()) {
+ if *logical_name != field.name() {
+ Arc::new(Field::new(
+ *logical_name,
+ field.data_type().clone(),
+ field.is_nullable(),
+ ))
+ } else {
+ Arc::clone(field)
+ }
+ } else {
+ Arc::clone(field)
+ }
+ })
+ .collect();
+
+ Arc::new(Schema::new(remapped_fields))
+}
+
impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
+ // When case-insensitive, remap physical schema field names to match
logical
+ // field names. The DefaultPhysicalExprAdapter uses exact name
matching, so
+ // without this remapping, columns like "a" won't match logical "A"
and will
+ // be filled with nulls.
+ //
+ // We also build a reverse map (logical name -> physical name) so that
after
+ // the default adapter produces expressions, we can remap column names
back
+ // to the original physical names. This is necessary because
downstream code
+ // (reassign_expr_columns) looks up columns by name in the actual
stream
+ // schema, which uses the original physical file column names.
+ let (adapted_physical_schema, logical_to_physical_names) =
+ if !self.parquet_options.case_sensitive {
+ let logical_to_physical: HashMap<String, String> =
logical_file_schema
+ .fields()
+ .iter()
+ .filter_map(|logical_field| {
+ physical_file_schema
+ .fields()
+ .iter()
+ .find(|pf| {
+ pf.name().to_lowercase() ==
logical_field.name().to_lowercase()
+ && pf.name() != logical_field.name()
+ })
+ .map(|pf| (logical_field.name().clone(),
pf.name().clone()))
+ })
+ .collect();
+ let remapped =
+ remap_physical_schema_names(&logical_file_schema,
&physical_file_schema);
+ (
+ remapped,
+ if logical_to_physical.is_empty() {
+ None
+ } else {
+ Some(logical_to_physical)
+ },
+ )
+ } else {
+ (Arc::clone(&physical_file_schema), None)
+ };
+
let default_factory = DefaultPhysicalExprAdapterFactory;
let default_adapter = default_factory.create(
Arc::clone(&logical_file_schema),
- Arc::clone(&physical_file_schema),
+ Arc::clone(&adapted_physical_schema),
);
Arc::new(SparkPhysicalExprAdapter {
logical_file_schema,
- physical_file_schema,
+ physical_file_schema: adapted_physical_schema,
parquet_options: self.parquet_options.clone(),
default_values: self.default_values.clone(),
default_adapter,
+ logical_to_physical_names,
})
}
}
@@ -112,6 +190,13 @@ struct SparkPhysicalExprAdapter {
default_values: Option<HashMap<usize, ScalarValue>>,
/// The default DataFusion adapter to delegate standard handling to
default_adapter: Arc<dyn PhysicalExprAdapter>,
+ /// Mapping from logical column names to original physical column names,
+ /// used for case-insensitive mode where names differ in casing.
+ /// After the default adapter rewrites expressions using the remapped
+ /// physical schema (with logical names), we need to restore the original
+ /// physical names so that downstream reassign_expr_columns can find
+ /// columns in the actual stream schema.
+ logical_to_physical_names: Option<HashMap<String, String>>,
}
impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
@@ -137,6 +222,29 @@ impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
self.wrap_all_type_mismatches(expr)?
}
};
+
+ // For case-insensitive mode: remap column names from logical back to
+ // original physical names. The default adapter was given a remapped
+ // physical schema (with logical names) so it could find columns. But
+ // downstream code (reassign_expr_columns) looks up columns by name in
+ // the actual parquet stream schema, which uses the original physical
names.
+ let expr = if let Some(name_map) = &self.logical_to_physical_names {
+ expr.transform(|e| {
+ if let Some(col) = e.as_any().downcast_ref::<Column>() {
+ if let Some(physical_name) = name_map.get(col.name()) {
+ return Ok(Transformed::yes(Arc::new(Column::new(
+ physical_name,
+ col.index(),
+ ))));
+ }
+ }
+ Ok(Transformed::no(e))
+ })
+ .data()?
+ } else {
+ expr
+ };
+
Ok(expr)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]