This is an automated email from the ASF dual-hosted git repository. kriskras99 pushed a commit to branch feat/enums in repository https://gitbox.apache.org/repos/asf/avro-rs.git
commit 0bac7649c1304d34203493a2e0b9137df6f4eb98 Author: Martin Grigorov <[email protected]> AuthorDate: Fri Feb 20 10:54:53 2026 +0200 feat!: Use bon builder style instead of several methods (#472) * feat!: Use bon builder style instead of several methods Instead of having several methods like: Schema::array(Schema) and Schema::array_with_attributes(Schema, BTreeMap) we could use Bon's function builders with start_fn: ``` Schema::array(Schema).call() Schema::array(Schema).attributes(BTreeMap).call() ``` The only annoying part is the `.call()` * Fix doc tests. * Use `.build()` as a finish_fn for the bon style function builders * Use bon function builder for Schema::map() too * Use Bon function style builder for Reader constructors * Re-add the docstring for the convenience Reader::new() ctor Simplify the resolution of `schemata` * Rename Reader's schema to reader_schema for more clarity * Add arguments for the new `default` fields in Schema::array() and Schema::map() Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * Add a Schema::r#enum() builder method Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * Add Schema::fixed(name, size) builder method Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * Add Schema::record() builder Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * Extract the builder methods and their tests into a module. Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * Fix the issue number in the unit tests' names * Move Schema::union() to schema/builders.rs It does not use Bon builders as the other methods because it has just a single argument (Vec<String>) and we don't expect that more will be ever added. --------- Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- avro/benches/serde.rs | 2 +- avro/examples/benchmark.rs | 4 +- avro/src/decode.rs | 8 +- avro/src/documentation/dynamic.rs | 4 +- avro/src/encode.rs | 14 +- avro/src/lib.rs | 10 +- avro/src/reader.rs | 65 +++--- avro/src/schema/builders.rs | 318 ++++++++++++++++++++++++++++++ avro/src/schema/mod.rs | 67 ++----- avro/src/schema_compatibility.rs | 16 +- avro/src/schema_equality.rs | 48 +++-- avro/src/serde/derive.rs | 12 +- avro/src/serde/ser_schema.rs | 4 +- avro/src/types.rs | 4 +- avro/src/writer.rs | 10 +- avro/tests/schema.rs | 68 +++++-- avro/tests/shared.rs | 12 +- avro/tests/to_from_avro_datum_schemata.rs | 5 +- avro/tests/union_schema.rs | 5 +- avro_derive/tests/derive.rs | 5 +- avro_derive/tests/serde.rs | 4 +- 21 files changed, 516 insertions(+), 169 deletions(-) diff --git a/avro/benches/serde.rs b/avro/benches/serde.rs index fad858b..c966fa5 100644 --- a/avro/benches/serde.rs +++ b/avro/benches/serde.rs @@ -251,7 +251,7 @@ fn write_ser<T: Serialize>(schema: &Schema, records: &[T]) -> AvroResult<Vec<u8> } fn read(schema: &Schema, bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> { - let reader = Reader::with_schema(schema, bytes)?; + let reader = Reader::builder(bytes).reader_schema(schema).build()?; for record in reader { let _ = record?; diff --git a/avro/examples/benchmark.rs b/avro/examples/benchmark.rs index 7d5922b..8d94f54 100644 --- a/avro/examples/benchmark.rs +++ b/avro/examples/benchmark.rs @@ -78,7 +78,9 @@ fn benchmark( for _ in 0..runs { let start = Instant::now(); - let reader = Reader::with_schema(schema, BufReader::new(&bytes[..]))?; + let reader = Reader::builder(BufReader::new(&bytes[..])) + .reader_schema(schema) + .build()?; let mut read_records = Vec::with_capacity(count); for record in reader { diff --git a/avro/src/decode.rs b/avro/src/decode.rs index d88ab5f..dfa4bd3 100644 --- a/avro/src/decode.rs +++ b/avro/src/decode.rs @@ -376,7 +376,7 @@ mod tests { #[test] fn test_decode_array_without_size() -> TestResult { let mut input: &[u8] = &[6, 2, 4, 6, 0]; - let result = decode(&Schema::array(Schema::Int), &mut input); + let result = decode(&Schema::array(Schema::Int).build(), &mut input); assert_eq!(Array(vec!(Int(1), Int(2), Int(3))), result?); Ok(()) @@ -385,7 +385,7 @@ mod tests { #[test] fn test_decode_array_with_size() -> TestResult { let mut input: &[u8] = &[5, 6, 2, 4, 6, 0]; - let result = decode(&Schema::array(Schema::Int), &mut input); + let result = decode(&Schema::array(Schema::Int).build(), &mut input); assert_eq!(Array(vec!(Int(1), Int(2), Int(3))), result?); Ok(()) @@ -394,7 +394,7 @@ mod tests { #[test] fn test_decode_map_without_size() -> TestResult { let mut input: &[u8] = &[0x02, 0x08, 0x74, 0x65, 0x73, 0x74, 0x02, 0x00]; - let result = decode(&Schema::map(Schema::Int), &mut input); + let result = decode(&Schema::map(Schema::Int).build(), &mut input); let mut expected = HashMap::new(); expected.insert(String::from("test"), Int(1)); assert_eq!(Map(expected), result?); @@ -405,7 +405,7 @@ mod tests { #[test] fn test_decode_map_with_size() -> TestResult { let mut input: &[u8] = &[0x01, 0x0C, 0x08, 0x74, 0x65, 0x73, 0x74, 0x02, 0x00]; - let result = decode(&Schema::map(Schema::Int), &mut input); + let result = decode(&Schema::map(Schema::Int).build(), &mut input); let mut expected = HashMap::new(); expected.insert(String::from("test"), Int(1)); assert_eq!(Map(expected), result?); diff --git a/avro/src/documentation/dynamic.rs b/avro/src/documentation/dynamic.rs index 4a426fb..a6e3088 100644 --- a/avro/src/documentation/dynamic.rs +++ b/avro/src/documentation/dynamic.rs @@ -208,7 +208,7 @@ //! let reader_schema = Schema::parse_str(reader_raw_schema).unwrap(); //! //! // reader creation can fail in case the input to read from is not Avro-compatible or malformed -//! let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap(); +//! let reader = Reader::builder(&input[..]).reader_schema(&reader_schema).build().unwrap(); //! //! // value is a Result of an Avro Value in case the read operation fails //! for value in reader { @@ -268,7 +268,7 @@ //! writer.append_ser(test)?; //! //! let input = writer.into_inner()?; -//! let reader = Reader::with_schema(&schema, &input[..])?; +//! let reader = Reader::builder(&input[..]).reader_schema(&schema).build()?; //! //! for record in reader { //! println!("{:?}", from_value::<Test>(&record?)); diff --git a/avro/src/encode.rs b/avro/src/encode.rs index 7efe4a6..610e1b8 100644 --- a/avro/src/encode.rs +++ b/avro/src/encode.rs @@ -392,10 +392,13 @@ pub(crate) mod tests { let empty: Vec<Value> = Vec::new(); encode( &Value::Array(empty.clone()), - &Schema::array(Schema::Int), + &Schema::array(Schema::Int).build(), &mut buf, ) - .expect(&success(&Value::Array(empty), &Schema::array(Schema::Int))); + .expect(&success( + &Value::Array(empty), + &Schema::array(Schema::Int).build(), + )); assert_eq!(vec![0u8], buf); } @@ -405,10 +408,13 @@ pub(crate) mod tests { let empty: HashMap<String, Value> = HashMap::new(); encode( &Value::Map(empty.clone()), - &Schema::map(Schema::Int), + &Schema::map(Schema::Int).build(), &mut buf, ) - .expect(&success(&Value::Map(empty), &Schema::map(Schema::Int))); + .expect(&success( + &Value::Map(empty), + &Schema::map(Schema::Int).build(), + )); assert_eq!(vec![0u8], buf); } diff --git a/avro/src/lib.rs b/avro/src/lib.rs index d166c51..4800bfa 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -193,7 +193,10 @@ mod tests { record.put("b", "foo"); writer.append_value(record).unwrap(); let input = writer.into_inner().unwrap(); - let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap(); + let mut reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build() + .unwrap(); assert_eq!( reader.next().unwrap().unwrap(), Value::Record(vec![ @@ -235,7 +238,10 @@ mod tests { record.put("c", "clubs"); writer.append_value(record).unwrap(); let input = writer.into_inner().unwrap(); - let mut reader = Reader::with_schema(&schema, &input[..]).unwrap(); + let mut reader = Reader::builder(&input[..]) + .reader_schema(&schema) + .build() + .unwrap(); assert_eq!( reader.next().unwrap().unwrap(), Value::Record(vec![ diff --git a/avro/src/reader.rs b/avro/src/reader.rs index 8af522c..c9e355f 100644 --- a/avro/src/reader.rs +++ b/avro/src/reader.rs @@ -30,6 +30,7 @@ use crate::{ types::Value, util, }; +use bon::bon; use log::warn; use serde::de::DeserializeOwned; use serde_json::from_slice; @@ -88,7 +89,7 @@ impl<'r, R: Read> Block<'r, R> { return Err(Details::HeaderMagic.into()); } - let meta_schema = Schema::map(Schema::Bytes); + let meta_schema = Schema::map(Schema::Bytes).build(); match decode(&meta_schema, &mut self.reader)? { Value::Map(metadata) => { self.read_writer_schema(&metadata)?; @@ -337,57 +338,40 @@ pub struct Reader<'a, R> { should_resolve_schema: bool, } +#[bon] impl<'a, R: Read> Reader<'a, R> { /// Creates a `Reader` given something implementing the `io::Read` trait to read from. /// No reader `Schema` will be set. /// /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`. pub fn new(reader: R) -> AvroResult<Reader<'a, R>> { - let block = Block::new(reader, vec![])?; - let reader = Reader { - block, - reader_schema: None, - errored: false, - should_resolve_schema: false, - }; - Ok(reader) + Reader::builder(reader).build() } - /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait - /// to read from. - /// - /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`. - pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> { - let block = Block::new(reader, vec![schema])?; - let mut reader = Reader { - block, - reader_schema: Some(schema), - errored: false, - should_resolve_schema: false, - }; - // Check if the reader and writer schemas disagree. - reader.should_resolve_schema = reader.writer_schema() != schema; - Ok(reader) - } - - /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait - /// to read from. + /// Creates a `Reader` given something implementing the `io::Read` trait to read from. + /// With an optional reader `Schema` and optional schemata to use for resolving schema + /// references. /// /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`. - pub fn with_schemata( - schema: &'a Schema, - schemata: Vec<&'a Schema>, - reader: R, + #[builder(finish_fn = build)] + pub fn builder( + #[builder(start_fn)] reader: R, + reader_schema: Option<&'a Schema>, + schemata: Option<Vec<&'a Schema>>, ) -> AvroResult<Reader<'a, R>> { + let schemata = + schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default()); + let block = Block::new(reader, schemata)?; let mut reader = Reader { block, - reader_schema: Some(schema), + reader_schema, errored: false, should_resolve_schema: false, }; // Check if the reader and writer schemas disagree. - reader.should_resolve_schema = reader.writer_schema() != schema; + reader.should_resolve_schema = + reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema); Ok(reader) } @@ -744,7 +728,7 @@ mod tests { #[test] fn test_reader_iterator() -> TestResult { let schema = Schema::parse_str(SCHEMA)?; - let reader = Reader::with_schema(&schema, ENCODED)?; + let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?; let mut record1 = Record::new(&schema).unwrap(); record1.put("a", 27i64); @@ -767,7 +751,12 @@ mod tests { fn test_reader_invalid_header() -> TestResult { let schema = Schema::parse_str(SCHEMA)?; let mut invalid = &ENCODED[1..]; - assert!(Reader::with_schema(&schema, &mut invalid).is_err()); + assert!( + Reader::builder(&mut invalid) + .reader_schema(&schema) + .build() + .is_err() + ); Ok(()) } @@ -776,7 +765,9 @@ mod tests { fn test_reader_invalid_block() -> TestResult { let schema = Schema::parse_str(SCHEMA)?; let mut invalid = &ENCODED[0..ENCODED.len() - 19]; - let reader = Reader::with_schema(&schema, &mut invalid)?; + let reader = Reader::builder(&mut invalid) + .reader_schema(&schema) + .build()?; for value in reader { assert!(value.is_err()); } diff --git a/avro/src/schema/builders.rs b/avro/src/schema/builders.rs new file mode 100644 index 0000000..dd16b3e --- /dev/null +++ b/avro/src/schema/builders.rs @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::schema::{ + Alias, ArraySchema, EnumSchema, FixedSchema, MapSchema, Name, RecordField, RecordSchema, + UnionSchema, +}; +use crate::types::Value; +use crate::{AvroResult, Schema}; +use bon::bon; +use serde_json::Value as JsonValue; +use std::collections::{BTreeMap, HashMap}; + +#[bon] +impl Schema { + /// Returns a `Schema::Map` with the given types and optional default + /// and custom attributes. + #[builder(finish_fn = build)] + pub fn map( + #[builder(start_fn)] types: Schema, + default: Option<HashMap<String, Value>>, + attributes: Option<BTreeMap<String, JsonValue>>, + ) -> Self { + let attributes = attributes.unwrap_or_default(); + Schema::Map(MapSchema { + types: Box::new(types), + default, + attributes, + }) + } + + /// Returns a `Schema::Array` with the given items and optional default + /// and custom attributes. + #[builder(finish_fn = build)] + pub fn array( + #[builder(start_fn)] items: Schema, + default: Option<Vec<Value>>, + attributes: Option<BTreeMap<String, JsonValue>>, + ) -> Self { + let attributes = attributes.unwrap_or_default(); + Schema::Array(ArraySchema { + items: Box::new(items), + default, + attributes, + }) + } + + /// Returns a `Schema::Enum` with the given name, symbols and optional + /// aliases, doc, default and custom attributes. + #[builder(finish_fn = build)] + pub fn r#enum( + #[builder(start_fn)] name: Name, + #[builder(start_fn)] symbols: Vec<impl Into<String>>, + aliases: Option<Vec<Alias>>, + doc: Option<String>, + default: Option<String>, + attributes: Option<BTreeMap<String, JsonValue>>, + ) -> Self { + let attributes = attributes.unwrap_or_default(); + let symbols = symbols.into_iter().map(Into::into).collect(); + Schema::Enum(EnumSchema { + name, + symbols, + aliases, + doc, + default, + attributes, + }) + } + + /// Returns a `Schema::Fixed` with the given name, size and optional + /// aliases, doc and custom attributes. + #[builder(finish_fn = build)] + pub fn fixed( + #[builder(start_fn)] name: Name, + #[builder(start_fn)] size: usize, + aliases: Option<Vec<Alias>>, + doc: Option<String>, + attributes: Option<BTreeMap<String, JsonValue>>, + ) -> Self { + let attributes = attributes.unwrap_or_default(); + Schema::Fixed(FixedSchema { + name, + size, + aliases, + doc, + attributes, + }) + } + + /// Returns a `Schema::Record` with the given name, size and optional + /// aliases, doc and custom attributes. + #[builder(finish_fn = build)] + pub fn record( + #[builder(start_fn)] name: Name, + fields: Option<Vec<RecordField>>, + aliases: Option<Vec<Alias>>, + doc: Option<String>, + attributes: Option<BTreeMap<String, JsonValue>>, + ) -> Self { + let fields = fields.unwrap_or_default(); + let attributes = attributes.unwrap_or_default(); + let record_schema = RecordSchema::builder() + .name(name) + .fields(fields) + .aliases(aliases) + .doc(doc) + .attributes(attributes) + .build(); + Schema::Record(record_schema) + } + + /// Returns a [`Schema::Union`] with the given variants. + /// + /// # Errors + /// Will return an error if `schemas` has duplicate unnamed schemas or if `schemas` + /// contains a union. + pub fn union(schemas: Vec<Schema>) -> AvroResult<Schema> { + UnionSchema::new(schemas).map(Schema::Union) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use apache_avro_test_helper::TestResult; + + #[test] + fn avro_rs_472_enum_builder_only_mandatory() -> TestResult { + let name = Name::new("enum_builder")?; + let symbols = vec!["A", "B", "C", "D", "E"]; + + let schema = Schema::r#enum(name.clone(), symbols.clone()).build(); + + if let Schema::Enum(enum_schema) = schema { + assert_eq!(enum_schema.name, name); + assert_eq!(enum_schema.symbols, symbols); + assert_eq!(enum_schema.aliases, None); + assert_eq!(enum_schema.doc, None); + assert_eq!(enum_schema.default, None); + assert_eq!(enum_schema.attributes, Default::default()); + } else { + panic!("Expected a Schema::Enum, got: {schema}"); + } + + Ok(()) + } + + #[test] + fn avro_rs_472_enum_builder_with_optionals() -> TestResult { + let name = Name::new("enum_builder")?; + let symbols = vec!["A", "B", "C", "D", "E"]; + let aliases = vec![Alias::new("alias")?]; + let doc = "docu"; + let default = "default value"; + let attributes = + BTreeMap::from_iter([("key".to_string(), JsonValue::String("value".into()))]); + + let schema = Schema::r#enum(name.clone(), symbols.clone()) + .aliases(aliases.clone()) + .doc(doc.into()) + .default(default.into()) + .attributes(attributes.clone()) + .build(); + + if let Schema::Enum(enum_schema) = schema { + assert_eq!(enum_schema.name, name); + assert_eq!(enum_schema.symbols, symbols); + assert_eq!(enum_schema.aliases, Some(aliases)); + assert_eq!(enum_schema.doc, Some(doc.into())); + assert_eq!(enum_schema.default, Some(default.into())); + assert_eq!(enum_schema.attributes, attributes); + } else { + panic!("Expected a Schema::Enum, got: {schema}"); + } + + Ok(()) + } + + #[test] + fn avro_rs_472_fixed_builder_only_mandatory() -> TestResult { + let name = Name::new("fixed_builder")?; + let size = 123; + + let schema = Schema::fixed(name.clone(), size).build(); + + if let Schema::Fixed(fixed_schema) = schema { + assert_eq!(fixed_schema.name, name); + assert_eq!(fixed_schema.size, size); + assert_eq!(fixed_schema.aliases, None); + assert_eq!(fixed_schema.doc, None); + assert_eq!(fixed_schema.attributes, Default::default()); + } else { + panic!("Expected a Schema::Fixed, got: {schema}"); + } + + Ok(()) + } + + #[test] + fn avro_rs_472_fixed_builder_with_optionals() -> TestResult { + let name = Name::new("fixed_builder")?; + let size = 234; + let aliases = vec![Alias::new("alias")?]; + let doc = "docu"; + let attributes = + BTreeMap::from_iter([("key".to_string(), JsonValue::String("value".into()))]); + + let schema = Schema::fixed(name.clone(), size) + .aliases(aliases.clone()) + .doc(doc.into()) + .attributes(attributes.clone()) + .build(); + + if let Schema::Fixed(fixed_schema) = schema { + assert_eq!(fixed_schema.name, name); + assert_eq!(fixed_schema.size, size); + assert_eq!(fixed_schema.aliases, Some(aliases)); + assert_eq!(fixed_schema.doc, Some(doc.into())); + assert_eq!(fixed_schema.attributes, attributes); + } else { + panic!("Expected a Schema::Fixed, got: {schema}"); + } + + Ok(()) + } + + #[test] + fn avro_rs_472_record_builder_only_mandatory() -> TestResult { + let name = Name::new("record_builder")?; + + let schema = Schema::record(name.clone()).build(); + + if let Schema::Record(record_schema) = schema { + assert_eq!(record_schema.name, name); + assert_eq!(record_schema.fields, vec![]); + assert_eq!(record_schema.aliases, None); + assert_eq!(record_schema.doc, None); + assert_eq!(record_schema.lookup, Default::default()); + assert_eq!(record_schema.attributes, Default::default()); + } else { + panic!("Expected a Schema::Record, got: {schema}"); + } + + Ok(()) + } + + #[test] + fn avro_rs_472_record_builder_with_optionals() -> TestResult { + let name = Name::new("record_builder")?; + let fields = vec![ + RecordField::builder() + .name("f1") + .schema(Schema::Boolean) + .build(), + RecordField::builder() + .name("f2") + .schema(Schema::Int) + .build(), + ]; + let aliases = vec![Alias::new("alias")?]; + let doc = "docu"; + let attributes = + BTreeMap::from_iter([("key".to_string(), JsonValue::String("value".into()))]); + + let schema = Schema::record(name.clone()) + .fields(fields.clone()) + .aliases(aliases.clone()) + .doc(doc.into()) + .attributes(attributes.clone()) + .build(); + + if let Schema::Record(fixed_schema) = schema { + assert_eq!(fixed_schema.name, name); + assert_eq!(fixed_schema.fields, fields); + assert_eq!(fixed_schema.aliases, Some(aliases)); + assert_eq!(fixed_schema.doc, Some(doc.into())); + assert_eq!( + fixed_schema.lookup, + BTreeMap::from_iter([("f1".into(), 0), ("f2".into(), 1)]) + ); + assert_eq!(fixed_schema.attributes, attributes); + } else { + panic!("Expected a Schema::Record, got: {schema}"); + } + + Ok(()) + } + + #[test] + fn avro_rs_472_union_builder() -> TestResult { + let variants = vec![Schema::Null, Schema::Boolean, Schema::Int]; + + let schema = Schema::union(variants.clone())?; + + if let Schema::Union(union_schema) = schema { + assert_eq!(union_schema.variants(), variants); + } else { + panic!("Expected a Schema::Union, got: {schema}"); + } + + Ok(()) + } +} diff --git a/avro/src/schema/mod.rs b/avro/src/schema/mod.rs index 35372ce..50f7db8 100644 --- a/avro/src/schema/mod.rs +++ b/avro/src/schema/mod.rs @@ -17,6 +17,7 @@ //! Logic for parsing and interacting with schemas in Avro format. +mod builders; mod name; mod parser; mod record; @@ -645,51 +646,6 @@ impl Schema { } } - /// Returns a `Schema::Map` with the given types. - pub fn map(types: Schema) -> Self { - Schema::Map(MapSchema { - types: Box::new(types), - default: None, - attributes: Default::default(), - }) - } - - /// Returns a `Schema::Map` with the given types and custom attributes. - pub fn map_with_attributes(types: Schema, attributes: BTreeMap<String, JsonValue>) -> Self { - Schema::Map(MapSchema { - types: Box::new(types), - default: None, - attributes, - }) - } - - /// Returns a `Schema::Array` with the given items. - pub fn array(items: Schema) -> Self { - Schema::Array(ArraySchema { - items: Box::new(items), - default: None, - attributes: Default::default(), - }) - } - - /// Returns a `Schema::Array` with the given items and custom attributes. - pub fn array_with_attributes(items: Schema, attributes: BTreeMap<String, JsonValue>) -> Self { - Schema::Array(ArraySchema { - items: Box::new(items), - default: None, - attributes, - }) - } - - /// Returns a [`Schema::Union`] with the given variants. - /// - /// # Errors - /// Will return an error if `schemas` has duplicate unnamed schemas or if `schemas` - /// contains a union. - pub fn union(schemas: Vec<Schema>) -> AvroResult<Schema> { - UnionSchema::new(schemas).map(Schema::Union) - } - /// Remove all external references from the schema. /// /// `schemata` must contain all externally referenced schemas. @@ -1154,14 +1110,14 @@ mod tests { #[test] fn test_array_schema() -> TestResult { let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?; - assert_eq!(Schema::array(Schema::String), schema); + assert_eq!(Schema::array(Schema::String).build(), schema); Ok(()) } #[test] fn test_map_schema() -> TestResult { let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?; - assert_eq!(Schema::map(Schema::Double), schema); + assert_eq!(Schema::map(Schema::Double).build(), schema); Ok(()) } @@ -1607,7 +1563,8 @@ mod tests { aliases: None, schema: Schema::array(Schema::Ref { name: Name::new("Node")?, - }), + }) + .build(), order: RecordFieldOrder::Ascending, position: 1, custom_attributes: Default::default(), @@ -4682,10 +4639,9 @@ mod tests { #[test] fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult { - let expected = Schema::array_with_attributes( - Schema::Long, - BTreeMap::from([("field-id".to_string(), "1".into())]), - ); + let expected = Schema::array(Schema::Long) + .attributes(BTreeMap::from([("field-id".to_string(), "1".into())])) + .build(); let value = serde_json::to_value(&expected)?; let serialized = serde_json::to_string(&value)?; @@ -4705,10 +4661,9 @@ mod tests { #[test] fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult { - let expected = Schema::map_with_attributes( - Schema::Long, - BTreeMap::from([("field-id".to_string(), "1".into())]), - ); + let expected = Schema::map(Schema::Long) + .attributes(BTreeMap::from([("field-id".to_string(), "1".into())])) + .build(); let value = serde_json::to_value(&expected)?; let serialized = serde_json::to_string(&value)?; diff --git a/avro/src/schema_compatibility.rs b/avro/src/schema_compatibility.rs index 7d9c611..20bbef2 100644 --- a/avro/src/schema_compatibility.rs +++ b/avro/src/schema_compatibility.rs @@ -29,8 +29,8 @@ //! //! ``` //! # use apache_avro::{Schema, schema_compatibility::{Compatibility, SchemaCompatibility}}; -//! let writers_schema = Schema::array(Schema::Int); -//! let readers_schema = Schema::array(Schema::Long); +//! let writers_schema = Schema::array(Schema::Int).build(); +//! let readers_schema = Schema::array(Schema::Long).build(); //! assert_eq!(SchemaCompatibility::can_read(&writers_schema, &readers_schema), Ok(Compatibility::Full)); //! ``` //! @@ -40,8 +40,8 @@ //! //! ``` //! # use apache_avro::{Schema, schema_compatibility::SchemaCompatibility}; -//! let writers_schema = Schema::array(Schema::Long); -//! let readers_schema = Schema::array(Schema::Int); +//! let writers_schema = Schema::array(Schema::Long).build(); +//! let readers_schema = Schema::array(Schema::Int).build(); //! assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_err()); //! ``` //! @@ -1333,7 +1333,9 @@ mod tests { record.put("c", "clubs"); writer.append_value(record).unwrap(); let input = writer.into_inner()?; - let mut reader = Reader::with_schema(&reader_schema, &input[..])?; + let mut reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; assert_eq!( reader.next().unwrap().unwrap(), Value::Record(vec![ @@ -1397,7 +1399,9 @@ mod tests { record.put("c", "hearts"); writer.append_value(record).unwrap(); let input = writer.into_inner()?; - let mut reader = Reader::with_schema(&reader_schema, &input[..])?; + let mut reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; assert_eq!( reader.next().unwrap().unwrap(), Value::Record(vec![ diff --git a/avro/src/schema_equality.rs b/avro/src/schema_equality.rs index e65e28c..f0b8ce5 100644 --- a/avro/src/schema_equality.rs +++ b/avro/src/schema_equality.rs @@ -424,14 +424,18 @@ mod tests { #[test] fn test_avro_3939_compare_schemata_not_including_attributes() { - let schema_one = Schema::map_with_attributes( - Schema::Boolean, - BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]), - ); - let schema_two = Schema::map_with_attributes( - Schema::Boolean, - BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]), - ); + let schema_one = Schema::map(Schema::Boolean) + .attributes(BTreeMap::from_iter([( + "key1".to_string(), + Value::Bool(true), + )])) + .build(); + let schema_two = Schema::map(Schema::Boolean) + .attributes(BTreeMap::from_iter([( + "key2".to_string(), + Value::Bool(true), + )])) + .build(); // STRUCT_FIELD_EQ does not include attributes ! assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two)); } @@ -441,24 +445,28 @@ mod tests { let struct_field_eq = StructFieldEq { include_attributes: true, }; - let schema_one = Schema::map_with_attributes( - Schema::Boolean, - BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]), - ); - let schema_two = Schema::map_with_attributes( - Schema::Boolean, - BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]), - ); + let schema_one = Schema::map(Schema::Boolean) + .attributes(BTreeMap::from_iter([( + "key1".to_string(), + Value::Bool(true), + )])) + .build(); + let schema_two = Schema::map(Schema::Boolean) + .attributes(BTreeMap::from_iter([( + "key2".to_string(), + Value::Bool(true), + )])) + .build(); assert!(!struct_field_eq.compare(&schema_one, &schema_two)); } #[test] fn test_avro_3939_compare_map_schemata() { - let schema_one = Schema::map(Schema::Boolean); + let schema_one = Schema::map(Schema::Boolean).build(); assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean)); assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean)); - let schema_two = Schema::map(Schema::Boolean); + let schema_two = Schema::map(Schema::Boolean).build(); let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two); let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two); @@ -475,11 +483,11 @@ mod tests { #[test] fn test_avro_3939_compare_array_schemata() { - let schema_one = Schema::array(Schema::Boolean); + let schema_one = Schema::array(Schema::Boolean).build(); assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean)); assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean)); - let schema_two = Schema::array(Schema::Boolean); + let schema_two = Schema::array(Schema::Boolean).build(); let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two); let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two); diff --git a/avro/src/serde/derive.rs b/avro/src/serde/derive.rs index 24f5c18..0ce846e 100644 --- a/avro/src/serde/derive.rs +++ b/avro/src/serde/derive.rs @@ -585,7 +585,7 @@ macro_rules! impl_array_schema ( ($type:ty where T: AvroSchemaComponent) => ( impl<T: AvroSchemaComponent> AvroSchemaComponent for $type { fn get_schema_in_ctxt(named_schemas: &mut HashSet<Name>, enclosing_namespace: &Namespace) -> Schema { - Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)) + Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)).build() } fn get_record_fields_in_ctxt(_: usize, _: &mut HashSet<Name>, _: &Namespace) -> Option<Vec<RecordField>> { @@ -612,7 +612,7 @@ where named_schemas: &mut HashSet<Name>, enclosing_namespace: &Namespace, ) -> Schema { - Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)) + Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)).build() } fn get_record_fields_in_ctxt( @@ -637,7 +637,7 @@ where named_schemas: &mut HashSet<Name>, enclosing_namespace: &Namespace, ) -> Schema { - Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)) + Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)).build() } fn get_record_fields_in_ctxt( @@ -903,7 +903,7 @@ mod tests { #[test] fn avro_rs_401_slice() -> TestResult { let schema = <[u8]>::get_schema(); - assert_eq!(schema, Schema::array(Schema::Int)); + assert_eq!(schema, Schema::array(Schema::Int).build()); Ok(()) } @@ -911,7 +911,7 @@ mod tests { #[test] fn avro_rs_401_array() -> TestResult { let schema = <[u8; 55]>::get_schema(); - assert_eq!(schema, Schema::array(Schema::Int)); + assert_eq!(schema, Schema::array(Schema::Int).build()); Ok(()) } @@ -923,7 +923,7 @@ mod tests { schema, Schema::union(vec![ Schema::Null, - Schema::array(Schema::array(Schema::Int)) + Schema::array(Schema::array(Schema::Int).build()).build() ])? ); diff --git a/avro/src/serde/ser_schema.rs b/avro/src/serde/ser_schema.rs index fb222a0..7fb52ab 100644 --- a/avro/src/serde/ser_schema.rs +++ b/avro/src/serde/ser_schema.rs @@ -3270,7 +3270,9 @@ mod tests { e: 5, })?; let encoded = writer.into_inner()?; - let mut reader = Reader::with_schema(&schema, &encoded[..])?; + let mut reader = Reader::builder(&encoded[..]) + .reader_schema(&schema) + .build()?; let decoded = from_value::<Foo>(&reader.next().unwrap()?)?; assert_eq!( decoded, diff --git a/avro/src/types.rs b/avro/src/types.rs index f99dcb9..11de590 100644 --- a/avro/src/types.rs +++ b/avro/src/types.rs @@ -1358,13 +1358,13 @@ mod tests { ), ( Value::Array(vec![Value::Long(42i64)]), - Schema::array(Schema::Long), + Schema::array(Schema::Long).build(), true, "", ), ( Value::Array(vec![Value::Boolean(true)]), - Schema::array(Schema::Long), + Schema::array(Schema::Long).build(), false, "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, default: None, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long", ), diff --git a/avro/src/writer.rs b/avro/src/writer.rs index 61c02de..2ccfca3 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -524,7 +524,11 @@ impl<'a, W: Write> Writer<'a, W> { let mut header = Vec::new(); header.extend_from_slice(AVRO_OBJECT_HEADER); - encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?; + encode( + &metadata.into(), + &Schema::map(Schema::Bytes).build(), + &mut header, + )?; header.extend_from_slice(&self.marker); Ok(header) @@ -1473,7 +1477,9 @@ mod tests { writer.add_user_metadata("a".to_string(), "b")?; let result = writer.into_inner()?; - let reader = Reader::with_schema(&schema, &result[..])?; + let reader = Reader::builder(&result[..]) + .reader_schema(&schema) + .build()?; let mut expected = HashMap::new(); expected.insert("a".to_string(), vec![b'b']); assert_eq!(reader.user_metadata(), &expected); diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 667f056..3bcc895 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -1058,7 +1058,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1124,7 +1126,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1184,7 +1188,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1263,7 +1269,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_namespace() -> Test "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1331,7 +1339,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_namespace() -> Test "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1393,7 +1403,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_namespace() -> Test "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1472,7 +1484,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1540,7 +1554,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1602,7 +1618,9 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1666,7 +1684,9 @@ fn test_avro_3851_read_default_value_for_simple_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = write_schema_for_default_value_test()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1713,7 +1733,9 @@ fn test_avro_3851_read_default_value_for_nested_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = write_schema_for_default_value_test()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1756,7 +1778,9 @@ fn test_avro_3851_read_default_value_for_enum_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = write_schema_for_default_value_test()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1796,7 +1820,9 @@ fn test_avro_3851_read_default_value_for_fixed_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = write_schema_for_default_value_test()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1833,7 +1859,9 @@ fn test_avro_3851_read_default_value_for_array_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = write_schema_for_default_value_test()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1873,7 +1901,9 @@ fn test_avro_3851_read_default_value_for_map_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = write_schema_for_default_value_test()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1951,7 +1981,9 @@ fn test_avro_3851_read_default_value_for_ref_record_field() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); @@ -1997,7 +2029,9 @@ fn test_avro_3851_read_default_value_for_enum() -> TestResult { "#; let reader_schema = Schema::parse_str(reader_schema_str)?; let input = writer.into_inner()?; - let reader = Reader::with_schema(&reader_schema, &input[..])?; + let reader = Reader::builder(&input[..]) + .reader_schema(&reader_schema) + .build()?; let result = reader.collect::<Result<Vec<_>, _>>()?; assert_eq!(1, result.len()); diff --git a/avro/tests/shared.rs b/avro/tests/shared.rs index c0a51b6..37bec84 100644 --- a/avro/tests/shared.rs +++ b/avro/tests/shared.rs @@ -111,8 +111,10 @@ fn test_folder(folder: &Path) -> Result<(), ErrorsDesc> { )); } else { let file: File = File::open(data_path).expect("Can't open data.avro"); - let reader = - Reader::with_schema(&schema, BufReader::new(&file)).expect("Can't read data.avro"); + let reader = Reader::builder(BufReader::new(&file)) + .reader_schema(&schema) + .build() + .expect("Can't read data.avro"); let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null).unwrap(); @@ -128,8 +130,10 @@ fn test_folder(folder: &Path) -> Result<(), ErrorsDesc> { writer.flush().expect("Error on flush"); let bytes: Vec<u8> = writer.into_inner().unwrap(); - let reader_bis = - Reader::with_schema(&schema, &bytes[..]).expect("Can't read flushed vector"); + let reader_bis = Reader::builder(&bytes[..]) + .reader_schema(&schema) + .build() + .expect("Can't read flushed vector"); let mut records_iter: Iter<Value> = records.iter(); for r2 in reader_bis { diff --git a/avro/tests/to_from_avro_datum_schemata.rs b/avro/tests/to_from_avro_datum_schemata.rs index 8617c56..41c0812 100644 --- a/avro/tests/to_from_avro_datum_schemata.rs +++ b/avro/tests/to_from_avro_datum_schemata.rs @@ -112,7 +112,10 @@ fn test_avro_3683_multiple_schemata_writer_reader() -> TestResult { writer.flush()?; drop(writer); //drop the writer so that `output` is no more referenced mutably - let reader = Reader::with_schemata(schema_b, schemata, output.as_slice())?; + let reader = Reader::builder(output.as_slice()) + .reader_schema(schema_b) + .schemata(schemata) + .build()?; let value = reader.into_iter().next().unwrap()?; assert_eq!(value, record); diff --git a/avro/tests/union_schema.rs b/avro/tests/union_schema.rs index 6e48e47..892b69b 100644 --- a/avro/tests/union_schema.rs +++ b/avro/tests/union_schema.rs @@ -76,7 +76,10 @@ where writer.flush()?; drop(writer); //drop the writer so that `encoded` is no more referenced mutably - let mut reader = Reader::with_schemata(schema, schemata.iter().collect(), encoded.as_slice())?; + let mut reader = Reader::builder(encoded.as_slice()) + .reader_schema(schema) + .schemata(schemata.iter().collect()) + .build()?; from_value::<T>(&reader.next().expect("")?) } diff --git a/avro_derive/tests/derive.rs b/avro_derive/tests/derive.rs index 7f536b6..e8916d9 100644 --- a/avro_derive/tests/derive.rs +++ b/avro_derive/tests/derive.rs @@ -64,7 +64,10 @@ where { assert!(!encoded.is_empty()); let schema = T::get_schema(); - let mut reader = Reader::with_schema(&schema, &encoded[..]).unwrap(); + let mut reader = Reader::builder(&encoded[..]) + .reader_schema(&schema) + .build() + .unwrap(); if let Some(res) = reader.next() { match res { Ok(value) => { diff --git a/avro_derive/tests/serde.rs b/avro_derive/tests/serde.rs index e14503d..cf11fa4 100644 --- a/avro_derive/tests/serde.rs +++ b/avro_derive/tests/serde.rs @@ -63,7 +63,9 @@ where { assert!(!encoded.is_empty()); let schema = T::get_schema(); - let mut reader = Reader::with_schema(&schema, &encoded[..])?; + let mut reader = Reader::builder(&encoded[..]) + .reader_schema(&schema) + .build()?; if let Some(res) = reader.next() { return res.and_then(|v| from_value::<T>(&v)); }
