This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c89814ef1 refactor: simplify dynamic state for Avro record projection 
(#9419)
8c89814ef1 is described below

commit 8c89814ef12be9603eee6aa6edeacedef0a6c5a3
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Thu Mar 5 01:56:08 2026 +0200

    refactor: simplify dynamic state for Avro record projection (#9419)
    
    # Rationale for this change
    
    The inner loop in `Projector::project_record` gives the optimizer
    somewhat complicated dynamic data to branch through.
    The sparse arrays in `Projector` are redundantly coded: `None` in the
    index positions of `writer_to_reader` must match `Some` in
    `skip_decoders` and vice versa.
    
    # What changes are included in this PR?
    
    Refactor record projection state with a single array of directive-like
    enums corresponding to each writer schema field.
    
    # Are these changes tested?
    
    Added a benchmark for record projection (the benchmark code is partially
    shared with #9397).
    Somewhat counterintuitively for me, it does not show improvement on a
    more complex case with a mix of projected fields, but does improve the
    simpler one-field projection cases.
    
    Passes the existing tests.
---
 arrow-avro/benches/project_record.rs |  65 ++++++++++++++++++---
 arrow-avro/src/codec.rs              |  74 +++++++++++++-----------
 arrow-avro/src/reader/record.rs      | 108 ++++++++++++++++-------------------
 3 files changed, 149 insertions(+), 98 deletions(-)

diff --git a/arrow-avro/benches/project_record.rs 
b/arrow-avro/benches/project_record.rs
index 9bddfea93b..91bece6d7e 100644
--- a/arrow-avro/benches/project_record.rs
+++ b/arrow-avro/benches/project_record.rs
@@ -121,7 +121,22 @@ fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n: 
usize, prefix: &[u8]) ->
     )
 }
 
-const READER_SCHEMA: &str = r#"
+fn gen_mixed(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> 
Vec<u8> {
+    encode_records_with_prefix(
+        sc,
+        prefix,
+        (0..n).map(|i| {
+            Value::Record(vec![
+                ("f1".into(), Value::Int(rng.random())),
+                ("f2".into(), Value::Long(rng.random())),
+                ("f3".into(), Value::String(format!("name-{i}"))),
+                ("f4".into(), Value::Double(rng.random())),
+            ])
+        }),
+    )
+}
+
+const SKIP_READER_SCHEMA: &str = r#"
     {
         "type":"record",
         "name":"table",
@@ -175,11 +190,42 @@ const DOUBLE_SCHEMA: &str = r#"
     }
     "#;
 
-fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder {
+const MIX_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"Mix",
+        "fields": [
+            { "name": "f1", "type": "int" },
+            { "name": "f2", "type": "long" },
+            { "name": "f3", "type": "string" },
+            { "name": "f4", "type": "double" }
+        ]
+    }
+    "#;
+
+// Project the record type writen to MIX_SCHEMA:
+// skip "f2" and "f4", add "f5" with a default
+const PROJECT_READER_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"Mix",
+        "fields": [
+            { "name": "f1", "type": "int" },
+            { "name": "f3", "type": "string" },
+            { "name": "f5", "type": "long", "default": 0 }
+        ]
+    }
+    "#;
+
+fn new_decoder(
+    schema_json: &'static str,
+    reader_schema_json: &'static str,
+    batch_size: usize,
+) -> Decoder {
     let schema = AvroSchema::new(schema_json.to_owned());
     let mut store = SchemaStore::new();
     store.register(schema).unwrap();
-    let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned());
+    let reader_schema = AvroSchema::new(reader_schema_json.to_owned());
     ReaderBuilder::new()
         .with_writer_schema_store(store)
         .with_batch_size(batch_size)
@@ -215,19 +261,24 @@ fn bench_with_decoder<F>(
 fn criterion_benches(c: &mut Criterion) {
     let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int);
     bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || {
-        new_decoder(INT_SCHEMA, BATCH_SIZE)
+        new_decoder(INT_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
     });
     let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long);
     bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || {
-        new_decoder(LONG_SCHEMA, BATCH_SIZE)
+        new_decoder(LONG_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
     });
     let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float);
     bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || {
-        new_decoder(FLOAT_SCHEMA, BATCH_SIZE)
+        new_decoder(FLOAT_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
     });
     let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double);
     bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || {
-        new_decoder(DOUBLE_SCHEMA, BATCH_SIZE)
+        new_decoder(DOUBLE_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
+    });
+
+    let data = gen_avro_data_with(MIX_SCHEMA, NUM_ROWS, gen_mixed);
+    bench_with_decoder(c, "project_primitives", &data, NUM_ROWS, || {
+        new_decoder(MIX_SCHEMA, PROJECT_READER_SCHEMA, BATCH_SIZE)
     });
 }
 
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index d20a71425d..fc2a914d35 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -84,14 +84,20 @@ pub(crate) enum AvroLiteral {
 /// Contains the necessary information to resolve a writer's record against a 
reader's record schema.
 #[derive(Debug, Clone, PartialEq)]
 pub(crate) struct ResolvedRecord {
-    /// Maps a writer's field index to the corresponding reader's field index.
-    /// `None` if the writer's field is not present in the reader's schema.
-    pub(crate) writer_to_reader: Arc<[Option<usize>]>,
+    /// Maps a writer's field index to the field's resolution against the 
reader's schema.
+    pub(crate) writer_fields: Arc<[ResolvedField]>,
     /// A list of indices in the reader's schema for fields that have a 
default value.
     pub(crate) default_fields: Arc<[usize]>,
+}
+
+/// Resolution information for record fields in the writer schema.
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) enum ResolvedField {
+    /// Resolves to a field indexed in the reader schema.
+    ToReader(usize),
     /// For fields present in the writer's schema but not the reader's, this 
stores their data type.
     /// This is needed to correctly skip over these fields during 
deserialization.
-    pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
+    Skip(AvroDataType),
 }
 
 /// Defines the type of promotion to be applied during schema resolution.
@@ -2281,24 +2287,27 @@ impl<'a> Maker<'a> {
                 data_type: dt,
             });
         }
-        // Build skip_fields in writer order; pre-size and push.
-        let mut skip_fields: Vec<Option<AvroDataType>> =
-            Vec::with_capacity(writer_record.fields.len());
-        for (writer_index, writer_field) in 
writer_record.fields.iter().enumerate() {
-            if writer_to_reader[writer_index].is_some() {
-                skip_fields.push(None);
-            } else {
-                skip_fields.push(Some(self.parse_type(&writer_field.r#type, 
writer_ns)?));
-            }
-        }
+        // Build writer field map.
+        let writer_fields = writer_record
+            .fields
+            .iter()
+            .enumerate()
+            .map(|(writer_index, writer_field)| {
+                if let Some(reader_index) = writer_to_reader[writer_index] {
+                    Ok(ResolvedField::ToReader(reader_index))
+                } else {
+                    let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
+                    Ok(ResolvedField::Skip(dt))
+                }
+            })
+            .collect::<Result<_, ArrowError>>()?;
         let resolved = AvroDataType::new_with_resolution(
             Codec::Struct(Arc::from(reader_fields)),
             reader_md,
             None,
             Some(ResolutionInfo::Record(ResolvedRecord {
-                writer_to_reader: Arc::from(writer_to_reader),
+                writer_fields,
                 default_fields: Arc::from(default_fields),
-                skip_fields: Arc::from(skip_fields),
             })),
         );
         // Register a resolved record by reader name+namespace for potential 
named type refs.
@@ -2792,16 +2801,13 @@ mod tests {
         };
         match resolution {
             ResolutionInfo::Record(ResolvedRecord {
-                writer_to_reader,
+                writer_fields,
                 default_fields,
-                skip_fields,
             }) => {
-                assert_eq!(writer_to_reader.len(), 1);
-                assert_eq!(writer_to_reader[0], Some(0));
+                assert_eq!(writer_fields.len(), 1);
+                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
                 assert_eq!(default_fields.len(), 1);
                 assert_eq!(default_fields[0], 1);
-                assert_eq!(skip_fields.len(), 1);
-                assert_eq!(skip_fields[0], None);
             }
             other => panic!("unexpected resolution {other:?}"),
         }
@@ -2888,16 +2894,13 @@ mod tests {
         };
         match resolution {
             ResolutionInfo::Record(ResolvedRecord {
-                writer_to_reader,
+                writer_fields,
                 default_fields,
-                skip_fields,
             }) => {
-                assert_eq!(writer_to_reader.len(), 1);
-                assert_eq!(writer_to_reader[0], Some(0));
+                assert_eq!(writer_fields.len(), 1);
+                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
                 assert_eq!(default_fields.len(), 1);
                 assert_eq!(default_fields[0], 1);
-                assert_eq!(skip_fields.len(), 1);
-                assert_eq!(skip_fields[0], None);
             }
             other => panic!("unexpected resolution {other:?}"),
         }
@@ -3714,11 +3717,18 @@ mod tests {
             Some(ResolutionInfo::Record(ref r)) => r.clone(),
             other => panic!("expected record resolution, got {other:?}"),
         };
-        assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
+        assert!(matches!(
+            &rec.writer_fields[..],
+            &[
+                ResolvedField::ToReader(1),
+                ResolvedField::Skip(_),
+                ResolvedField::ToReader(0),
+            ]
+        ));
         assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
-        assert!(rec.skip_fields[0].is_none());
-        assert!(rec.skip_fields[2].is_none());
-        let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
+        let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
+            panic!("should skip field 1")
+        };
         assert!(matches!(skip1.codec(), Codec::Utf8));
         let name_md = &fields[2].data_type().metadata;
         assert_eq!(
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 5e281d1fc6..605d296973 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -18,8 +18,8 @@
 //! Avro Decoder for Arrow types.
 
 use crate::codec::{
-    AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, 
ResolvedRecord,
-    ResolvedUnion,
+    AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, 
ResolvedField,
+    ResolvedRecord, ResolvedUnion,
 };
 use crate::errors::AvroError;
 use crate::reader::cursor::AvroCursor;
@@ -2416,11 +2416,16 @@ fn values_equal_at(arr: &dyn Array, i: usize, j: usize) 
-> bool {
 
 #[derive(Debug)]
 struct Projector {
-    writer_to_reader: Arc<[Option<usize>]>,
-    skip_decoders: Vec<Option<Skipper>>,
+    writer_projections: Vec<FieldProjection>,
     default_injections: Arc<[(usize, AvroLiteral)]>,
 }
 
+#[derive(Debug)]
+enum FieldProjection {
+    ToReader(usize),
+    Skip(Skipper),
+}
+
 #[derive(Debug)]
 struct ProjectorBuilder<'a> {
     rec: &'a ResolvedRecord,
@@ -2448,18 +2453,20 @@ impl<'a> ProjectorBuilder<'a> {
                 .unwrap_or(AvroLiteral::Null);
             default_injections.push((idx, lit));
         }
-        let mut skip_decoders: Vec<Option<Skipper>> =
-            Vec::with_capacity(self.rec.skip_fields.len());
-        for datatype in self.rec.skip_fields.as_ref() {
-            let skipper = match datatype {
-                Some(datatype) => Some(Skipper::from_avro(datatype)?),
-                None => None,
-            };
-            skip_decoders.push(skipper);
-        }
+        let writer_projections = self
+            .rec
+            .writer_fields
+            .iter()
+            .map(|field| match field {
+                ResolvedField::ToReader(index) => 
Ok(FieldProjection::ToReader(*index)),
+                ResolvedField::Skip(datatype) => {
+                    let skipper = Skipper::from_avro(datatype)?;
+                    Ok(FieldProjection::Skip(skipper))
+                }
+            })
+            .collect::<Result<_, AvroError>>()?;
         Ok(Projector {
-            writer_to_reader: self.rec.writer_to_reader.clone(),
-            skip_decoders,
+            writer_projections,
             default_injections: default_injections.into(),
         })
     }
@@ -2472,25 +2479,10 @@ impl Projector {
         buf: &mut AvroCursor<'_>,
         encodings: &mut [Decoder],
     ) -> Result<(), AvroError> {
-        debug_assert_eq!(
-            self.writer_to_reader.len(),
-            self.skip_decoders.len(),
-            "internal invariant: mapping and skipper lists must have equal 
length"
-        );
-        for (i, (mapping, skipper_opt)) in self
-            .writer_to_reader
-            .iter()
-            .zip(self.skip_decoders.iter())
-            .enumerate()
-        {
-            match (mapping, skipper_opt.as_ref()) {
-                (Some(reader_index), _) => 
encodings[*reader_index].decode(buf)?,
-                (None, Some(skipper)) => skipper.skip(buf)?,
-                (None, None) => {
-                    return Err(AvroError::SchemaError(format!(
-                        "No skipper available for writer-only field at index 
{i}",
-                    )));
-                }
+        for field_proj in self.writer_projections.iter() {
+            match field_proj {
+                FieldProjection::ToReader(index) => 
encodings[*index].decode(buf)?,
+                FieldProjection::Skip(skipper) => skipper.skip(buf)?,
             }
         }
         for (reader_index, lit) in self.default_injections.as_ref() {
@@ -4128,8 +4120,7 @@ mod tests {
 
     fn make_record_resolved_decoder(
         reader_fields: &[(&str, DataType, bool)],
-        writer_to_reader: Vec<Option<usize>>,
-        skip_decoders: Vec<Option<Skipper>>,
+        writer_projections: Vec<FieldProjection>,
     ) -> Decoder {
         let mut field_refs: Vec<FieldRef> = 
Vec::with_capacity(reader_fields.len());
         let mut encodings: Vec<Decoder> = 
Vec::with_capacity(reader_fields.len());
@@ -4151,8 +4142,7 @@ mod tests {
             encodings,
             vec![None; reader_fields.len()],
             Some(Projector {
-                writer_to_reader: Arc::from(writer_to_reader),
-                skip_decoders,
+                writer_projections,
                 default_injections: Arc::from(Vec::<(usize, 
AvroLiteral)>::new()),
             }),
         )
@@ -4162,8 +4152,10 @@ mod tests {
     fn test_skip_writer_trailing_field_int32() {
         let mut dec = make_record_resolved_decoder(
             &[("id", arrow_schema::DataType::Int32, false)],
-            vec![Some(0), None],
-            vec![None, Some(super::Skipper::Int32)],
+            vec![
+                FieldProjection::ToReader(0),
+                FieldProjection::Skip(super::Skipper::Int32),
+            ],
         );
         let mut data = Vec::new();
         data.extend_from_slice(&encode_avro_int(7));
@@ -4190,8 +4182,11 @@ mod tests {
                 ("id", DataType::Int32, false),
                 ("score", DataType::Int64, false),
             ],
-            vec![Some(0), None, Some(1)],
-            vec![None, Some(Skipper::String), None],
+            vec![
+                FieldProjection::ToReader(0),
+                FieldProjection::Skip(Skipper::String),
+                FieldProjection::ToReader(1),
+            ],
         );
         let mut data = Vec::new();
         data.extend_from_slice(&encode_avro_int(42));
@@ -4222,8 +4217,10 @@ mod tests {
     fn test_skip_writer_array_with_negative_block_count_fast() {
         let mut dec = make_record_resolved_decoder(
             &[("id", DataType::Int32, false)],
-            vec![None, Some(0)],
-            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
+            vec![
+                
FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
+                FieldProjection::ToReader(0),
+            ],
         );
         let mut array_payload = Vec::new();
         array_payload.extend_from_slice(&encode_avro_int(1));
@@ -4254,8 +4251,10 @@ mod tests {
     fn test_skip_writer_map_with_negative_block_count_fast() {
         let mut dec = make_record_resolved_decoder(
             &[("id", DataType::Int32, false)],
-            vec![None, Some(0)],
-            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
+            vec![
+                FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
+                FieldProjection::ToReader(0),
+            ],
         );
         let mut entries = Vec::new();
         entries.extend_from_slice(&encode_avro_bytes(b"k1"));
@@ -4287,13 +4286,12 @@ mod tests {
     fn test_skip_writer_nullable_field_union_nullfirst() {
         let mut dec = make_record_resolved_decoder(
             &[("id", DataType::Int32, false)],
-            vec![None, Some(0)],
             vec![
-                Some(super::Skipper::Nullable(
+                FieldProjection::Skip(super::Skipper::Nullable(
                     Nullability::NullFirst,
                     Box::new(super::Skipper::Int32),
                 )),
-                None,
+                FieldProjection::ToReader(0),
             ],
         );
         let mut row1 = Vec::new();
@@ -4503,7 +4501,6 @@ mod tests {
         reader_fields: &[(&str, DataType, bool)],
         field_defaults: Vec<Option<AvroLiteral>>,
         default_injections: Vec<(usize, AvroLiteral)>,
-        writer_to_reader_len: usize,
     ) -> Decoder {
         assert_eq!(
             field_defaults.len(),
@@ -4526,11 +4523,8 @@ mod tests {
             encodings.push(enc);
         }
         let fields: Fields = field_refs.into();
-        let skip_decoders: Vec<Option<Skipper>> =
-            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
         let projector = Projector {
-            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
-            skip_decoders,
+            writer_projections: vec![],
             default_injections: Arc::from(default_injections),
         };
         Decoder::Record(fields, encodings, field_defaults, Some(projector))
@@ -4979,7 +4973,6 @@ mod tests {
             &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
             field_defaults,
             vec![],
-            0,
         );
         let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
         map.insert("a".to_string(), AvroLiteral::Int(7));
@@ -5012,7 +5005,6 @@ mod tests {
             &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
             field_defaults,
             vec![],
-            0,
         );
         rec.append_default(&AvroLiteral::Null).unwrap();
         let arr = rec.flush(None).unwrap();
@@ -5065,8 +5057,7 @@ mod tests {
         encoders.push(enc_b);
         let field_defaults = vec![None, None]; // no defaults -> append_null
         let projector = Projector {
-            writer_to_reader: Arc::from(vec![]),
-            skip_decoders: vec![],
+            writer_projections: vec![],
             default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
         };
         let mut rec = Decoder::Record(field_refs.into(), encoders, 
field_defaults, Some(projector));
@@ -5106,7 +5097,6 @@ mod tests {
             ],
             defaults,
             injections,
-            0,
         );
         rec.decode(&mut AvroCursor::new(&[])).unwrap();
         let arr = rec.flush(None).unwrap();

Reply via email to