This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 8c89814ef1 refactor: simplify dynamic state for Avro record projection
(#9419)
8c89814ef1 is described below
commit 8c89814ef12be9603eee6aa6edeacedef0a6c5a3
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Thu Mar 5 01:56:08 2026 +0200
refactor: simplify dynamic state for Avro record projection (#9419)
# Rationale for this change
The inner loop in `Projector::project_record` gives the optimizer
somewhat complicated dynamic data to branch through.
The sparse arrays in `Projector` are redundantly coded: `None` in the
index positions of `writer_to_reader` must match `Some` in
`skip_decoders` and vice versa.
# What changes are included in this PR?
Refactor record projection state with a single array of directive-like
enums corresponding to each writer schema field.
# Are these changes tested?
Added a benchmark for record projection (the benchmark code is partially
shared with #9397).
Somewhat counterintuitively for me, it does not show improvement on a
more complex case with a mix of projected fields, but does improve the
simpler one-field projection cases.
Passes the existing tests.
---
arrow-avro/benches/project_record.rs | 65 ++++++++++++++++++---
arrow-avro/src/codec.rs | 74 +++++++++++++-----------
arrow-avro/src/reader/record.rs | 108 ++++++++++++++++-------------------
3 files changed, 149 insertions(+), 98 deletions(-)
diff --git a/arrow-avro/benches/project_record.rs
b/arrow-avro/benches/project_record.rs
index 9bddfea93b..91bece6d7e 100644
--- a/arrow-avro/benches/project_record.rs
+++ b/arrow-avro/benches/project_record.rs
@@ -121,7 +121,22 @@ fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n:
usize, prefix: &[u8]) ->
)
}
-const READER_SCHEMA: &str = r#"
+fn gen_mixed(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) ->
Vec<u8> {
+ encode_records_with_prefix(
+ sc,
+ prefix,
+ (0..n).map(|i| {
+ Value::Record(vec![
+ ("f1".into(), Value::Int(rng.random())),
+ ("f2".into(), Value::Long(rng.random())),
+ ("f3".into(), Value::String(format!("name-{i}"))),
+ ("f4".into(), Value::Double(rng.random())),
+ ])
+ }),
+ )
+}
+
+const SKIP_READER_SCHEMA: &str = r#"
{
"type":"record",
"name":"table",
@@ -175,11 +190,42 @@ const DOUBLE_SCHEMA: &str = r#"
}
"#;
-fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder {
+const MIX_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"Mix",
+ "fields": [
+ { "name": "f1", "type": "int" },
+ { "name": "f2", "type": "long" },
+ { "name": "f3", "type": "string" },
+ { "name": "f4", "type": "double" }
+ ]
+ }
+ "#;
+
+// Project the record type writen to MIX_SCHEMA:
+// skip "f2" and "f4", add "f5" with a default
+const PROJECT_READER_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"Mix",
+ "fields": [
+ { "name": "f1", "type": "int" },
+ { "name": "f3", "type": "string" },
+ { "name": "f5", "type": "long", "default": 0 }
+ ]
+ }
+ "#;
+
+fn new_decoder(
+ schema_json: &'static str,
+ reader_schema_json: &'static str,
+ batch_size: usize,
+) -> Decoder {
let schema = AvroSchema::new(schema_json.to_owned());
let mut store = SchemaStore::new();
store.register(schema).unwrap();
- let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned());
+ let reader_schema = AvroSchema::new(reader_schema_json.to_owned());
ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(batch_size)
@@ -215,19 +261,24 @@ fn bench_with_decoder<F>(
fn criterion_benches(c: &mut Criterion) {
let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int);
bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || {
- new_decoder(INT_SCHEMA, BATCH_SIZE)
+ new_decoder(INT_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
});
let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long);
bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || {
- new_decoder(LONG_SCHEMA, BATCH_SIZE)
+ new_decoder(LONG_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
});
let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float);
bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || {
- new_decoder(FLOAT_SCHEMA, BATCH_SIZE)
+ new_decoder(FLOAT_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
});
let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double);
bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || {
- new_decoder(DOUBLE_SCHEMA, BATCH_SIZE)
+ new_decoder(DOUBLE_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
+ });
+
+ let data = gen_avro_data_with(MIX_SCHEMA, NUM_ROWS, gen_mixed);
+ bench_with_decoder(c, "project_primitives", &data, NUM_ROWS, || {
+ new_decoder(MIX_SCHEMA, PROJECT_READER_SCHEMA, BATCH_SIZE)
});
}
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index d20a71425d..fc2a914d35 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -84,14 +84,20 @@ pub(crate) enum AvroLiteral {
/// Contains the necessary information to resolve a writer's record against a
reader's record schema.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ResolvedRecord {
- /// Maps a writer's field index to the corresponding reader's field index.
- /// `None` if the writer's field is not present in the reader's schema.
- pub(crate) writer_to_reader: Arc<[Option<usize>]>,
+ /// Maps a writer's field index to the field's resolution against the
reader's schema.
+ pub(crate) writer_fields: Arc<[ResolvedField]>,
/// A list of indices in the reader's schema for fields that have a
default value.
pub(crate) default_fields: Arc<[usize]>,
+}
+
+/// Resolution information for record fields in the writer schema.
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) enum ResolvedField {
+ /// Resolves to a field indexed in the reader schema.
+ ToReader(usize),
/// For fields present in the writer's schema but not the reader's, this
stores their data type.
/// This is needed to correctly skip over these fields during
deserialization.
- pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
+ Skip(AvroDataType),
}
/// Defines the type of promotion to be applied during schema resolution.
@@ -2281,24 +2287,27 @@ impl<'a> Maker<'a> {
data_type: dt,
});
}
- // Build skip_fields in writer order; pre-size and push.
- let mut skip_fields: Vec<Option<AvroDataType>> =
- Vec::with_capacity(writer_record.fields.len());
- for (writer_index, writer_field) in
writer_record.fields.iter().enumerate() {
- if writer_to_reader[writer_index].is_some() {
- skip_fields.push(None);
- } else {
- skip_fields.push(Some(self.parse_type(&writer_field.r#type,
writer_ns)?));
- }
- }
+ // Build writer field map.
+ let writer_fields = writer_record
+ .fields
+ .iter()
+ .enumerate()
+ .map(|(writer_index, writer_field)| {
+ if let Some(reader_index) = writer_to_reader[writer_index] {
+ Ok(ResolvedField::ToReader(reader_index))
+ } else {
+ let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
+ Ok(ResolvedField::Skip(dt))
+ }
+ })
+ .collect::<Result<_, ArrowError>>()?;
let resolved = AvroDataType::new_with_resolution(
Codec::Struct(Arc::from(reader_fields)),
reader_md,
None,
Some(ResolutionInfo::Record(ResolvedRecord {
- writer_to_reader: Arc::from(writer_to_reader),
+ writer_fields,
default_fields: Arc::from(default_fields),
- skip_fields: Arc::from(skip_fields),
})),
);
// Register a resolved record by reader name+namespace for potential
named type refs.
@@ -2792,16 +2801,13 @@ mod tests {
};
match resolution {
ResolutionInfo::Record(ResolvedRecord {
- writer_to_reader,
+ writer_fields,
default_fields,
- skip_fields,
}) => {
- assert_eq!(writer_to_reader.len(), 1);
- assert_eq!(writer_to_reader[0], Some(0));
+ assert_eq!(writer_fields.len(), 1);
+ assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
assert_eq!(default_fields.len(), 1);
assert_eq!(default_fields[0], 1);
- assert_eq!(skip_fields.len(), 1);
- assert_eq!(skip_fields[0], None);
}
other => panic!("unexpected resolution {other:?}"),
}
@@ -2888,16 +2894,13 @@ mod tests {
};
match resolution {
ResolutionInfo::Record(ResolvedRecord {
- writer_to_reader,
+ writer_fields,
default_fields,
- skip_fields,
}) => {
- assert_eq!(writer_to_reader.len(), 1);
- assert_eq!(writer_to_reader[0], Some(0));
+ assert_eq!(writer_fields.len(), 1);
+ assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
assert_eq!(default_fields.len(), 1);
assert_eq!(default_fields[0], 1);
- assert_eq!(skip_fields.len(), 1);
- assert_eq!(skip_fields[0], None);
}
other => panic!("unexpected resolution {other:?}"),
}
@@ -3714,11 +3717,18 @@ mod tests {
Some(ResolutionInfo::Record(ref r)) => r.clone(),
other => panic!("expected record resolution, got {other:?}"),
};
- assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
+ assert!(matches!(
+ &rec.writer_fields[..],
+ &[
+ ResolvedField::ToReader(1),
+ ResolvedField::Skip(_),
+ ResolvedField::ToReader(0),
+ ]
+ ));
assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
- assert!(rec.skip_fields[0].is_none());
- assert!(rec.skip_fields[2].is_none());
- let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
+ let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
+ panic!("should skip field 1")
+ };
assert!(matches!(skip1.codec(), Codec::Utf8));
let name_md = &fields[2].data_type().metadata;
assert_eq!(
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 5e281d1fc6..605d296973 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -18,8 +18,8 @@
//! Avro Decoder for Arrow types.
use crate::codec::{
- AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo,
ResolvedRecord,
- ResolvedUnion,
+ AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo,
ResolvedField,
+ ResolvedRecord, ResolvedUnion,
};
use crate::errors::AvroError;
use crate::reader::cursor::AvroCursor;
@@ -2416,11 +2416,16 @@ fn values_equal_at(arr: &dyn Array, i: usize, j: usize)
-> bool {
#[derive(Debug)]
struct Projector {
- writer_to_reader: Arc<[Option<usize>]>,
- skip_decoders: Vec<Option<Skipper>>,
+ writer_projections: Vec<FieldProjection>,
default_injections: Arc<[(usize, AvroLiteral)]>,
}
+#[derive(Debug)]
+enum FieldProjection {
+ ToReader(usize),
+ Skip(Skipper),
+}
+
#[derive(Debug)]
struct ProjectorBuilder<'a> {
rec: &'a ResolvedRecord,
@@ -2448,18 +2453,20 @@ impl<'a> ProjectorBuilder<'a> {
.unwrap_or(AvroLiteral::Null);
default_injections.push((idx, lit));
}
- let mut skip_decoders: Vec<Option<Skipper>> =
- Vec::with_capacity(self.rec.skip_fields.len());
- for datatype in self.rec.skip_fields.as_ref() {
- let skipper = match datatype {
- Some(datatype) => Some(Skipper::from_avro(datatype)?),
- None => None,
- };
- skip_decoders.push(skipper);
- }
+ let writer_projections = self
+ .rec
+ .writer_fields
+ .iter()
+ .map(|field| match field {
+ ResolvedField::ToReader(index) =>
Ok(FieldProjection::ToReader(*index)),
+ ResolvedField::Skip(datatype) => {
+ let skipper = Skipper::from_avro(datatype)?;
+ Ok(FieldProjection::Skip(skipper))
+ }
+ })
+ .collect::<Result<_, AvroError>>()?;
Ok(Projector {
- writer_to_reader: self.rec.writer_to_reader.clone(),
- skip_decoders,
+ writer_projections,
default_injections: default_injections.into(),
})
}
@@ -2472,25 +2479,10 @@ impl Projector {
buf: &mut AvroCursor<'_>,
encodings: &mut [Decoder],
) -> Result<(), AvroError> {
- debug_assert_eq!(
- self.writer_to_reader.len(),
- self.skip_decoders.len(),
- "internal invariant: mapping and skipper lists must have equal
length"
- );
- for (i, (mapping, skipper_opt)) in self
- .writer_to_reader
- .iter()
- .zip(self.skip_decoders.iter())
- .enumerate()
- {
- match (mapping, skipper_opt.as_ref()) {
- (Some(reader_index), _) =>
encodings[*reader_index].decode(buf)?,
- (None, Some(skipper)) => skipper.skip(buf)?,
- (None, None) => {
- return Err(AvroError::SchemaError(format!(
- "No skipper available for writer-only field at index
{i}",
- )));
- }
+ for field_proj in self.writer_projections.iter() {
+ match field_proj {
+ FieldProjection::ToReader(index) =>
encodings[*index].decode(buf)?,
+ FieldProjection::Skip(skipper) => skipper.skip(buf)?,
}
}
for (reader_index, lit) in self.default_injections.as_ref() {
@@ -4128,8 +4120,7 @@ mod tests {
fn make_record_resolved_decoder(
reader_fields: &[(&str, DataType, bool)],
- writer_to_reader: Vec<Option<usize>>,
- skip_decoders: Vec<Option<Skipper>>,
+ writer_projections: Vec<FieldProjection>,
) -> Decoder {
let mut field_refs: Vec<FieldRef> =
Vec::with_capacity(reader_fields.len());
let mut encodings: Vec<Decoder> =
Vec::with_capacity(reader_fields.len());
@@ -4151,8 +4142,7 @@ mod tests {
encodings,
vec![None; reader_fields.len()],
Some(Projector {
- writer_to_reader: Arc::from(writer_to_reader),
- skip_decoders,
+ writer_projections,
default_injections: Arc::from(Vec::<(usize,
AvroLiteral)>::new()),
}),
)
@@ -4162,8 +4152,10 @@ mod tests {
fn test_skip_writer_trailing_field_int32() {
let mut dec = make_record_resolved_decoder(
&[("id", arrow_schema::DataType::Int32, false)],
- vec![Some(0), None],
- vec![None, Some(super::Skipper::Int32)],
+ vec![
+ FieldProjection::ToReader(0),
+ FieldProjection::Skip(super::Skipper::Int32),
+ ],
);
let mut data = Vec::new();
data.extend_from_slice(&encode_avro_int(7));
@@ -4190,8 +4182,11 @@ mod tests {
("id", DataType::Int32, false),
("score", DataType::Int64, false),
],
- vec![Some(0), None, Some(1)],
- vec![None, Some(Skipper::String), None],
+ vec![
+ FieldProjection::ToReader(0),
+ FieldProjection::Skip(Skipper::String),
+ FieldProjection::ToReader(1),
+ ],
);
let mut data = Vec::new();
data.extend_from_slice(&encode_avro_int(42));
@@ -4222,8 +4217,10 @@ mod tests {
fn test_skip_writer_array_with_negative_block_count_fast() {
let mut dec = make_record_resolved_decoder(
&[("id", DataType::Int32, false)],
- vec![None, Some(0)],
- vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
+ vec![
+
FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
+ FieldProjection::ToReader(0),
+ ],
);
let mut array_payload = Vec::new();
array_payload.extend_from_slice(&encode_avro_int(1));
@@ -4254,8 +4251,10 @@ mod tests {
fn test_skip_writer_map_with_negative_block_count_fast() {
let mut dec = make_record_resolved_decoder(
&[("id", DataType::Int32, false)],
- vec![None, Some(0)],
- vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
+ vec![
+ FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
+ FieldProjection::ToReader(0),
+ ],
);
let mut entries = Vec::new();
entries.extend_from_slice(&encode_avro_bytes(b"k1"));
@@ -4287,13 +4286,12 @@ mod tests {
fn test_skip_writer_nullable_field_union_nullfirst() {
let mut dec = make_record_resolved_decoder(
&[("id", DataType::Int32, false)],
- vec![None, Some(0)],
vec![
- Some(super::Skipper::Nullable(
+ FieldProjection::Skip(super::Skipper::Nullable(
Nullability::NullFirst,
Box::new(super::Skipper::Int32),
)),
- None,
+ FieldProjection::ToReader(0),
],
);
let mut row1 = Vec::new();
@@ -4503,7 +4501,6 @@ mod tests {
reader_fields: &[(&str, DataType, bool)],
field_defaults: Vec<Option<AvroLiteral>>,
default_injections: Vec<(usize, AvroLiteral)>,
- writer_to_reader_len: usize,
) -> Decoder {
assert_eq!(
field_defaults.len(),
@@ -4526,11 +4523,8 @@ mod tests {
encodings.push(enc);
}
let fields: Fields = field_refs.into();
- let skip_decoders: Vec<Option<Skipper>> =
- (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
let projector = Projector {
- writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
- skip_decoders,
+ writer_projections: vec![],
default_injections: Arc::from(default_injections),
};
Decoder::Record(fields, encodings, field_defaults, Some(projector))
@@ -4979,7 +4973,6 @@ mod tests {
&[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
field_defaults,
vec![],
- 0,
);
let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
map.insert("a".to_string(), AvroLiteral::Int(7));
@@ -5012,7 +5005,6 @@ mod tests {
&[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
field_defaults,
vec![],
- 0,
);
rec.append_default(&AvroLiteral::Null).unwrap();
let arr = rec.flush(None).unwrap();
@@ -5065,8 +5057,7 @@ mod tests {
encoders.push(enc_b);
let field_defaults = vec![None, None]; // no defaults -> append_null
let projector = Projector {
- writer_to_reader: Arc::from(vec![]),
- skip_decoders: vec![],
+ writer_projections: vec![],
default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
};
let mut rec = Decoder::Record(field_refs.into(), encoders,
field_defaults, Some(projector));
@@ -5106,7 +5097,6 @@ mod tests {
],
defaults,
injections,
- 0,
);
rec.decode(&mut AvroCursor::new(&[])).unwrap();
let arr = rec.flush(None).unwrap();