jecsand838 commented on code in PR #17861:
URL: https://github.com/apache/datafusion/pull/17861#discussion_r2938075944


##########
datafusion/datasource-avro/src/source.rs:
##########
@@ -56,21 +54,16 @@ impl AvroSource {
         }
     }
 
-    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, 
R>> {
-        let file_schema = self.table_schema.file_schema();
-        let projection = Some(
-            self.projection
-                .file_indices
-                .iter()
-                .map(|&idx| file_schema.field(idx).name().clone())
-                .collect::<Vec<_>>(),
-        );
-        AvroReader::try_new(
-            reader,
-            &Arc::clone(self.table_schema.file_schema()),
-            self.batch_size.expect("Batch size must set before open"),
-            projection.as_ref(),
-        )
+    fn open<R: std::io::BufRead>(&self, reader: R) -> Result<Reader<R>> {
+        let mut builder = ReaderBuilder::new()
+            .with_batch_size(self.batch_size.expect("Batch size must set 
before open"));
+
+        // Avoid pushing an empty projection into arrow-avro.
+        if !self.projection.file_indices.is_empty() {
+            builder = 
builder.with_projection(self.projection.file_indices.clone());
+        }
+
+        builder.build(reader).map_err(Into::into)

Review Comment:
   ```suggestion
   use arrow_avro::schema::AvroSchema;
   
   fn open<R: std::io::BufRead>(&self, reader: R) -> Result<Reader<R>> {
       let logical_file_schema = self.table_schema.file_schema();
       // NOTE: We may need to add a DataFusion centric Arrow to Avro 
conversion method if the `avro.schema` metadata isn't preserved. 
`AvroSchema::try_from` may not output the exact schema required without it.
       let reader_schema = AvroSchema::try_from(logical_file_schema.as_ref())?; 
   
       let mut builder = ReaderBuilder::new()
           .with_batch_size(self.batch_size.expect("Batch size must set before 
open"))
           .with_reader_schema(reader_schema);
   
       if !self.projection.file_indices.is_empty() {
           builder = 
builder.with_projection(self.projection.file_indices.clone());
       }
   
       builder.build(reader).map_err(Into::into)
   }
   ```
   
   I think this still needs one more change before Avro projection pushdown is 
safe.
   
   `arrow_avro::ReaderBuilder::with_projection` prunes the effective reader 
schema, and because we never call `with_reader_schema(...)` here, that schema 
is derived from each file’s writer schema. So this is currently pushing logical 
ordinals into writer-schema projection.
   
   Concrete failure case:
   
   - file A writer schema: `[id, name]`
   - file B writer schema: `[name, id]`
   - merged logical file schema: `[id, name]`
   
   For `SELECT id`, DataFusion computes `file_indices = [0]`. With the current 
code, file A reads `id`, but file B reads `name`, because 
`with_projection([0])` is applied to file B’s writer order. `ProjectionOpener` 
still treats slot 0 as logical `id`, so this looks like a potential silent 
wrong-results bug rather than a reliable error.
   
   Also I’d consider adding a regression test that scans two files with the 
same fields in different orders and verifies `SELECT id` returns `id` for both.



##########
datafusion/sqllogictest/test_files/avro.slt:
##########
@@ -99,15 +99,15 @@ CREATE EXTERNAL TABLE alltypes_plain_zstandard (
   float_col FLOAT NOT NULL,
   double_col DOUBLE NOT NULL,
   date_string_col BYTEA NOT NULL,
-  string_col VARCHAR NOT NULL,
+  string_col BYTEA NOT NULL,
   timestamp_col TIMESTAMP NOT NULL,
 )
 STORED AS AVRO
 LOCATION '../../testing/data/avro/alltypes_plain.zstandard.avro';
 
 statement ok
 CREATE EXTERNAL TABLE single_nan (
-  mycol FLOAT
+  mycol DOUBLE

Review Comment:
   I think these sqllogictest edits are codifying the same reader-schema issue 
I called out in my other command rather than just reflecting a low-level type 
mapping change.
   
   Before this PR, an external table declaration like `string_col VARCHAR` was 
part of the decode contract. After this change, the test now declares the 
physical column as `BYTEA` and casts at query time. That means the 
writer-schema output is becoming authoritative and the logical table schema 
(reader schema) is not participating in decode.
   
   If the reader path is fixed to align decoded batches back to the logical 
file schema, I’d keep the original DDL/query expectations here.
   
   If this semantic change is intentional, I’d still avoid encoding it only in 
`.slt` rewrites and instead call it out explicitly in the upgrade guide, 
because it is a larger contract change.



##########
datafusion/datasource-avro/src/mod.rs:
##########
@@ -27,9 +27,113 @@
 
 //! An [Avro](https://avro.apache.org/) based 
[`FileSource`](datafusion_datasource::file::FileSource) implementation and 
related functionality.
 
-pub mod avro_to_arrow;
 pub mod file_format;
 pub mod source;
 
-pub use apache_avro;
+use arrow::datatypes::{DataType, Field, Fields, Schema};
+pub use arrow_avro;
+use arrow_avro::reader::ReaderBuilder;
 pub use file_format::*;
+use std::io::{BufReader, Read};
+use std::sync::Arc;
+
+/// Read Avro schema given a reader
+pub fn read_avro_schema_from_reader<R: Read>(
+    reader: &mut R,
+) -> datafusion_common::Result<Schema> {
+    let avro_reader = ReaderBuilder::new().build(BufReader::new(reader))?;
+    // Avro readers perform strict schema resolution rules (e.g. record 
identity checks)
+    // that are stricter than DataFusion's table schema handling needs for 
inferred schemas.
+    // Drop metadata from inferred schemas so runtime batches and inferred 
table schemas
+    // compare consistently without requiring strict Avro metadata identity.
+    Ok(strip_metadata_from_schema(avro_reader.schema().as_ref()))
+}
+
+fn strip_metadata_from_schema(schema: &Schema) -> Schema {
+    let fields = schema
+        .fields
+        .into_iter()
+        .map(|f| Arc::new(strip_metadata_from_field(f.as_ref())))
+        .collect::<Fields>();
+    // Intentionally drop schema-level metadata
+    Schema::new(fields)
+}
+
+fn strip_metadata_from_field(field: &Field) -> Field {
+    // Intentionally drop field-level metadata
+    Field::new(
+        field.name(),
+        strip_metadata_from_data_type(field.data_type()),
+        field.is_nullable(),
+    )
+}
+
+fn strip_metadata_from_data_type(data_type: &DataType) -> DataType {
+    match data_type {
+        DataType::Struct(fields) => DataType::Struct(
+            fields
+                .iter()
+                .map(|f| Arc::new(strip_metadata_from_field(f.as_ref())))
+                .collect(),
+        ),
+        DataType::List(field) => {
+            DataType::List(Arc::new(strip_metadata_from_field(field.as_ref())))
+        }
+        DataType::LargeList(field) => {
+            
DataType::LargeList(Arc::new(strip_metadata_from_field(field.as_ref())))
+        }
+        DataType::FixedSizeList(field, size) => DataType::FixedSizeList(
+            Arc::new(strip_metadata_from_field(field.as_ref())),
+            *size,
+        ),
+        DataType::Map(field, sorted) => {
+            DataType::Map(Arc::new(strip_metadata_from_field(field.as_ref())), 
*sorted)
+        }
+        _ => data_type.clone(),

Review Comment:
   ```suggestion
           DataType::Map(field, sorted) => {
               
DataType::Map(Arc::new(strip_metadata_from_field(field.as_ref())), *sorted)
           }
           DataType::Union(fields, mode) => {
               let (type_ids, children): (Vec<_>, Vec<_>) = fields
                   .iter()
                   .map(|(type_id, field)| {
                       (*type_id, 
Arc::new(strip_metadata_from_field(field.as_ref())))
                   })
                   .unzip();
   
               DataType::Union(
                   UnionFields::try_new(type_ids, children)
                       .expect("existing union fields should remain valid"),
                   *mode,
               )
           }
           _ => data_type.clone(),
   ```
   
   I think the recursion should cover `DataType::Union` too.
   
   Arrow `DataType` explicitly includes `Union`, so skipping it means 
union-heavy / nested schemas can retain Avro metadata in some branches while 
sibling nested containers are normalized away. That undermines the stable 
runtime matching goal of this helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to