This is an automated email from the ASF dual-hosted git repository. kriskras99 pushed a commit to branch chore/split_writer_module in repository https://gitbox.apache.org/repos/asf/avro-rs.git
commit 4488a745ee4137ec0a0c8a98e5065717ad5291bd Author: default <[email protected]> AuthorDate: Tue Mar 3 15:51:29 2026 +0000 chore: Split `writer` module into several submodules --- avro/src/lib.rs | 5 +- avro/src/writer/datum.rs | 387 +++++++++++++++++++++ avro/src/writer/mod.rs | 706 +-------------------------------------- avro/src/writer/single_object.rs | 397 ++++++++++++++++++++++ 4 files changed, 794 insertions(+), 701 deletions(-) diff --git a/avro/src/lib.rs b/avro/src/lib.rs index a56aaad..3a2981f 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -99,8 +99,9 @@ pub use schema::Schema; pub use serde::{AvroSchema, AvroSchemaComponent, from_value, to_value}; pub use uuid::Uuid; pub use writer::{ - Clearable, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer, WriterBuilder, - to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref, + Clearable, Writer, WriterBuilder, + datum::{to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref}, + single_object::{GenericSingleObjectWriter, SpecificSingleObjectWriter}, }; #[cfg(feature = "derive")] diff --git a/avro/src/writer/datum.rs b/avro/src/writer/datum.rs new file mode 100644 index 0000000..14a40a2 --- /dev/null +++ b/avro/src/writer/datum.rs @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Write; + +use serde::Serialize; + +use crate::{ + AvroResult, Schema, + encode::{encode, encode_internal}, + error::Details, + schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema}, + serde::ser_schema::SchemaAwareWriteSerializer, + types::Value, +}; + +/// Encode a value into raw Avro data, also performs schema validation. +/// +/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync +/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what +/// you are doing, instead. +pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> { + let mut buffer = Vec::new(); + write_avro_datum(schema, value, &mut buffer)?; + Ok(buffer) +} + +/// Write the referenced [Serialize]able object to the provided [Write] object. +/// +/// Returns a result with the number of bytes written. +/// +/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync +/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible +/// if you don't know what you are doing, instead. +pub fn write_avro_datum_ref<T: Serialize, W: Write>( + schema: &Schema, + names: &NamesRef, + data: &T, + writer: &mut W, +) -> AvroResult<usize> { + let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None); + data.serialize(&mut serializer) +} + +/// Encode a value into raw Avro data, also performs schema validation. +/// +/// If the provided `schema` is incomplete then its dependencies must be +/// provided in `schemata` +pub fn to_avro_datum_schemata<T: Into<Value>>( + schema: &Schema, + schemata: Vec<&Schema>, + value: T, +) -> AvroResult<Vec<u8>> { + let mut buffer = Vec::new(); + write_avro_datum_schemata(schema, schemata, value, &mut buffer)?; + Ok(buffer) +} + +/// Encode a value into raw Avro data, also performs schema validation. +/// +/// This is an internal function which gets the bytes buffer where to write as parameter instead of +/// creating a new one like `to_avro_datum`. +pub(super) fn write_avro_datum<T: Into<Value>, W: Write>( + schema: &Schema, + value: T, + writer: &mut W, +) -> AvroResult<()> { + let avro = value.into(); + if !avro.validate(schema) { + return Err(Details::Validation.into()); + } + encode(&avro, schema, writer)?; + Ok(()) +} + +pub(super) fn write_avro_datum_schemata<T: Into<Value>>( + schema: &Schema, + schemata: Vec<&Schema>, + value: T, + buffer: &mut Vec<u8>, +) -> AvroResult<usize> { + let avro = value.into(); + let rs = ResolvedSchema::try_from(schemata)?; + let names = rs.get_names(); + let enclosing_namespace = schema.namespace(); + if let Some(_err) = avro.validate_internal(schema, names, enclosing_namespace) { + return Err(Details::Validation.into()); + } + encode_internal(&avro, schema, names, enclosing_namespace, buffer) +} + +pub(super) fn write_value_ref_owned_resolved<W: Write>( + resolved_schema: &ResolvedOwnedSchema, + value: &Value, + writer: &mut W, +) -> AvroResult<usize> { + let root_schema = resolved_schema.get_root_schema(); + if let Some(reason) = value.validate_internal( + root_schema, + resolved_schema.get_names(), + root_schema.namespace(), + ) { + return Err(Details::ValidationWithReason { + value: value.clone(), + schema: root_schema.clone(), + reason, + } + .into()); + } + encode_internal( + value, + root_schema, + resolved_schema.get_names(), + root_schema.namespace(), + writer, + ) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use apache_avro_test_helper::TestResult; + + use crate::{ + Days, Decimal, Duration, Millis, Months, + schema::{DecimalSchema, FixedSchema, InnerDecimalSchema, Name}, + types::Record, + util::zig_i64, + }; + + use super::*; + + const SCHEMA: &str = r#" + { + "type": "record", + "name": "test", + "fields": [ + { + "name": "a", + "type": "long", + "default": 42 + }, + { + "name": "b", + "type": "string" + } + ] + } + "#; + + const UNION_SCHEMA: &str = r#"["null", "long"]"#; + + #[test] + fn test_to_avro_datum() -> TestResult { + let schema = Schema::parse_str(SCHEMA)?; + let mut record = Record::new(&schema).unwrap(); + record.put("a", 27i64); + record.put("b", "foo"); + + let mut expected = Vec::new(); + zig_i64(27, &mut expected)?; + zig_i64(3, &mut expected)?; + expected.extend([b'f', b'o', b'o']); + + assert_eq!(to_avro_datum(&schema, record)?, expected); + + Ok(()) + } + + #[test] + fn avro_rs_193_write_avro_datum_ref() -> TestResult { + #[derive(Serialize)] + struct TestStruct { + a: i64, + b: String, + } + + let schema = Schema::parse_str(SCHEMA)?; + let mut writer: Vec<u8> = Vec::new(); + let data = TestStruct { + a: 27, + b: "foo".to_string(), + }; + + let mut expected = Vec::new(); + zig_i64(27, &mut expected)?; + zig_i64(3, &mut expected)?; + expected.extend([b'f', b'o', b'o']); + + let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?; + + assert_eq!(bytes, expected.len()); + assert_eq!(writer, expected); + + Ok(()) + } + + #[test] + fn test_union_not_null() -> TestResult { + let schema = Schema::parse_str(UNION_SCHEMA)?; + let union = Value::Union(1, Box::new(Value::Long(3))); + + let mut expected = Vec::new(); + zig_i64(1, &mut expected)?; + zig_i64(3, &mut expected)?; + + assert_eq!(to_avro_datum(&schema, union)?, expected); + + Ok(()) + } + + #[test] + fn test_union_null() -> TestResult { + let schema = Schema::parse_str(UNION_SCHEMA)?; + let union = Value::Union(0, Box::new(Value::Null)); + + let mut expected = Vec::new(); + zig_i64(0, &mut expected)?; + + assert_eq!(to_avro_datum(&schema, union)?, expected); + + Ok(()) + } + + fn logical_type_test<T: Into<Value> + Clone>( + schema_str: &'static str, + + expected_schema: &Schema, + value: Value, + + raw_schema: &Schema, + raw_value: T, + ) -> TestResult { + let schema = Schema::parse_str(schema_str)?; + assert_eq!(&schema, expected_schema); + // The serialized format should be the same as the schema. + let ser = to_avro_datum(&schema, value.clone())?; + let raw_ser = to_avro_datum(raw_schema, raw_value)?; + assert_eq!(ser, raw_ser); + + // Should deserialize from the schema into the logical type. + let mut r = ser.as_slice(); + let de = crate::from_avro_datum(&schema, &mut r, None)?; + assert_eq!(de, value); + Ok(()) + } + + #[test] + fn date() -> TestResult { + logical_type_test( + r#"{"type": "int", "logicalType": "date"}"#, + &Schema::Date, + Value::Date(1_i32), + &Schema::Int, + 1_i32, + ) + } + + #[test] + fn time_millis() -> TestResult { + logical_type_test( + r#"{"type": "int", "logicalType": "time-millis"}"#, + &Schema::TimeMillis, + Value::TimeMillis(1_i32), + &Schema::Int, + 1_i32, + ) + } + + #[test] + fn time_micros() -> TestResult { + logical_type_test( + r#"{"type": "long", "logicalType": "time-micros"}"#, + &Schema::TimeMicros, + Value::TimeMicros(1_i64), + &Schema::Long, + 1_i64, + ) + } + + #[test] + fn timestamp_millis() -> TestResult { + logical_type_test( + r#"{"type": "long", "logicalType": "timestamp-millis"}"#, + &Schema::TimestampMillis, + Value::TimestampMillis(1_i64), + &Schema::Long, + 1_i64, + ) + } + + #[test] + fn timestamp_micros() -> TestResult { + logical_type_test( + r#"{"type": "long", "logicalType": "timestamp-micros"}"#, + &Schema::TimestampMicros, + Value::TimestampMicros(1_i64), + &Schema::Long, + 1_i64, + ) + } + + #[test] + fn decimal_fixed() -> TestResult { + let size = 30; + let fixed = FixedSchema { + name: Name::new("decimal")?, + aliases: None, + doc: None, + size, + attributes: Default::default(), + }; + let inner = InnerDecimalSchema::Fixed(fixed.clone()); + let value = vec![0u8; size]; + logical_type_test( + r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#, + &Schema::Decimal(DecimalSchema { + precision: 20, + scale: 5, + inner, + }), + Value::Decimal(Decimal::from(value.clone())), + &Schema::Fixed(fixed), + Value::Fixed(size, value), + ) + } + + #[test] + fn decimal_bytes() -> TestResult { + let value = vec![0u8; 10]; + logical_type_test( + r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#, + &Schema::Decimal(DecimalSchema { + precision: 4, + scale: 3, + inner: InnerDecimalSchema::Bytes, + }), + Value::Decimal(Decimal::from(value.clone())), + &Schema::Bytes, + value, + ) + } + + #[test] + fn duration() -> TestResult { + let inner = Schema::Fixed(FixedSchema { + name: Name::new("duration")?, + aliases: None, + doc: None, + size: 12, + attributes: Default::default(), + }); + let value = Value::Duration(Duration::new( + Months::new(256), + Days::new(512), + Millis::new(1024), + )); + logical_type_test( + r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#, + &Schema::Duration(FixedSchema { + name: Name::try_from("duration").expect("Name is valid"), + aliases: None, + doc: None, + size: 12, + attributes: Default::default(), + }), + value, + &inner, + Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]), + ) + } +} diff --git a/avro/src/writer/mod.rs b/avro/src/writer/mod.rs index fe264a1..20f19a3 100644 --- a/avro/src/writer/mod.rs +++ b/avro/src/writer/mod.rs @@ -20,15 +20,15 @@ use crate::{ AvroResult, Codec, Error, encode::{encode, encode_internal, encode_to_vec}, error::Details, - headers::{HeaderBuilder, RabinFingerprintHeader}, - schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema}, - serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer}, + schema::{ResolvedSchema, Schema}, + serde::ser_schema::SchemaAwareWriteSerializer, types::Value, }; use serde::Serialize; -use std::{ - collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive, -}; +use std::{collections::HashMap, io::Write, mem::ManuallyDrop}; + +pub mod datum; +pub mod single_object; const DEFAULT_BLOCK_SIZE: usize = 16000; const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01"; @@ -614,263 +614,6 @@ impl<W: Write> Drop for Writer<'_, W> { } } -/// Encode a value into raw Avro data, also performs schema validation. -/// -/// This is an internal function which gets the bytes buffer where to write as parameter instead of -/// creating a new one like `to_avro_datum`. -fn write_avro_datum<T: Into<Value>, W: Write>( - schema: &Schema, - value: T, - writer: &mut W, -) -> Result<(), Error> { - let avro = value.into(); - if !avro.validate(schema) { - return Err(Details::Validation.into()); - } - encode(&avro, schema, writer)?; - Ok(()) -} - -fn write_avro_datum_schemata<T: Into<Value>>( - schema: &Schema, - schemata: Vec<&Schema>, - value: T, - buffer: &mut Vec<u8>, -) -> AvroResult<usize> { - let avro = value.into(); - let rs = ResolvedSchema::try_from(schemata)?; - let names = rs.get_names(); - let enclosing_namespace = schema.namespace(); - if let Some(_err) = avro.validate_internal(schema, names, enclosing_namespace) { - return Err(Details::Validation.into()); - } - encode_internal(&avro, schema, names, enclosing_namespace, buffer) -} - -/// Writer that encodes messages according to the single object encoding v1 spec -/// Uses an API similar to the current File Writer -/// Writes all object bytes at once, and drains internal buffer -pub struct GenericSingleObjectWriter { - buffer: Vec<u8>, - resolved: ResolvedOwnedSchema, -} - -impl GenericSingleObjectWriter { - pub fn new_with_capacity( - schema: &Schema, - initial_buffer_cap: usize, - ) -> AvroResult<GenericSingleObjectWriter> { - let header_builder = RabinFingerprintHeader::from_schema(schema); - Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder) - } - - pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>( - schema: &Schema, - initial_buffer_cap: usize, - header_builder: HB, - ) -> AvroResult<GenericSingleObjectWriter> { - let mut buffer = Vec::with_capacity(initial_buffer_cap); - let header = header_builder.build_header(); - buffer.extend_from_slice(&header); - - Ok(GenericSingleObjectWriter { - buffer, - resolved: ResolvedOwnedSchema::try_from(schema.clone())?, - }) - } - - const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize; - - /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header - pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> { - let original_length = self.buffer.len(); - if !Self::HEADER_LENGTH_RANGE.contains(&original_length) { - Err(Details::IllegalSingleObjectWriterState.into()) - } else { - write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; - writer - .write_all(&self.buffer) - .map_err(Details::WriteBytes)?; - let len = self.buffer.len(); - self.buffer.truncate(original_length); - Ok(len) - } - } - - /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header - pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> { - self.write_value_ref(&v, writer) - } -} - -/// Writer that encodes messages according to the single object encoding v1 spec -pub struct SpecificSingleObjectWriter<T> -where - T: AvroSchema, -{ - resolved: ResolvedOwnedSchema, - header: Vec<u8>, - _model: PhantomData<T>, -} - -impl<T> SpecificSingleObjectWriter<T> -where - T: AvroSchema, -{ - pub fn new() -> AvroResult<Self> { - let schema = T::get_schema(); - let header = RabinFingerprintHeader::from_schema(&schema).build_header(); - let resolved = ResolvedOwnedSchema::new(schema)?; - // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice - Ok(Self { - resolved, - header, - _model: PhantomData, - }) - } - - pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> { - let header = header_builder.build_header(); - let resolved = ResolvedOwnedSchema::new(T::get_schema())?; - Ok(Self { - resolved, - header, - _model: PhantomData, - }) - } - - /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead. - #[deprecated(since = "0.22.0", note = "Use new() instead")] - pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> { - Self::new() - } -} - -impl<T> SpecificSingleObjectWriter<T> -where - T: AvroSchema + Into<Value>, -{ - /// Write the value to the writer - /// - /// Returns the number of bytes written. - /// - /// Each call writes a complete single-object encoded message (header + data), - /// making each message independently decodable. - pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { - writer - .write_all(&self.header) - .map_err(Details::WriteBytes)?; - let value: Value = data.into(); - let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?; - Ok(bytes + self.header.len()) - } -} - -impl<T> SpecificSingleObjectWriter<T> -where - T: AvroSchema + Serialize, -{ - /// Write the object to the writer. - /// - /// Returns the number of bytes written. - /// - /// Each call writes a complete single-object encoded message (header + data), - /// making each message independently decodable. - pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> { - writer - .write_all(&self.header) - .map_err(Details::WriteBytes)?; - - let bytes = write_avro_datum_ref( - self.resolved.get_root_schema(), - self.resolved.get_names(), - data, - writer, - )?; - - Ok(bytes + self.header.len()) - } - - /// Write the object to the writer. - /// - /// Returns the number of bytes written. - /// - /// Each call writes a complete single-object encoded message (header + data), - /// making each message independently decodable. - pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { - self.write_ref(&data, writer) - } -} - -fn write_value_ref_owned_resolved<W: Write>( - resolved_schema: &ResolvedOwnedSchema, - value: &Value, - writer: &mut W, -) -> AvroResult<usize> { - let root_schema = resolved_schema.get_root_schema(); - if let Some(reason) = value.validate_internal( - root_schema, - resolved_schema.get_names(), - root_schema.namespace(), - ) { - return Err(Details::ValidationWithReason { - value: value.clone(), - schema: root_schema.clone(), - reason, - } - .into()); - } - encode_internal( - value, - root_schema, - resolved_schema.get_names(), - root_schema.namespace(), - writer, - ) -} - -/// Encode a value into raw Avro data, also performs schema validation. -/// -/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync -/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what -/// you are doing, instead. -pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> { - let mut buffer = Vec::new(); - write_avro_datum(schema, value, &mut buffer)?; - Ok(buffer) -} - -/// Write the referenced [Serialize]able object to the provided [Write] object. -/// -/// Returns a result with the number of bytes written. -/// -/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync -/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible -/// if you don't know what you are doing, instead. -pub fn write_avro_datum_ref<T: Serialize, W: Write>( - schema: &Schema, - names: &NamesRef, - data: &T, - writer: &mut W, -) -> AvroResult<usize> { - let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None); - data.serialize(&mut serializer) -} - -/// Encode a value into raw Avro data, also performs schema validation. -/// -/// If the provided `schema` is incomplete then its dependencies must be -/// provided in `schemata` -pub fn to_avro_datum_schemata<T: Into<Value>>( - schema: &Schema, - schemata: Vec<&Schema>, - value: T, -) -> AvroResult<Vec<u8>> { - let mut buffer = Vec::new(); - write_avro_datum_schemata(schema, schemata, value, &mut buffer)?; - Ok(buffer) -} - #[cfg(not(target_arch = "wasm32"))] fn generate_sync_marker() -> [u8; 16] { rand::random() @@ -892,21 +635,10 @@ mod tests { use std::{cell::RefCell, rc::Rc}; use super::*; - use crate::{ - Reader, - decimal::Decimal, - duration::{Days, Duration, Millis, Months}, - headers::GlueSchemaUuidHeader, - rabin::Rabin, - schema::{DecimalSchema, FixedSchema, Name}, - types::Record, - util::zig_i64, - }; + use crate::{Reader, types::Record, util::zig_i64}; use pretty_assertions::assert_eq; use serde::{Deserialize, Serialize}; - use uuid::Uuid; - use crate::schema::InnerDecimalSchema; use crate::{codec::DeflateSettings, error::Details}; use apache_avro_test_helper::TestResult; @@ -930,53 +662,6 @@ mod tests { } "#; - const UNION_SCHEMA: &str = r#"["null", "long"]"#; - - #[test] - fn test_to_avro_datum() -> TestResult { - let schema = Schema::parse_str(SCHEMA)?; - let mut record = Record::new(&schema).unwrap(); - record.put("a", 27i64); - record.put("b", "foo"); - - let mut expected = Vec::new(); - zig_i64(27, &mut expected)?; - zig_i64(3, &mut expected)?; - expected.extend([b'f', b'o', b'o']); - - assert_eq!(to_avro_datum(&schema, record)?, expected); - - Ok(()) - } - - #[test] - fn avro_rs_193_write_avro_datum_ref() -> TestResult { - #[derive(Serialize)] - struct TestStruct { - a: i64, - b: String, - } - - let schema = Schema::parse_str(SCHEMA)?; - let mut writer: Vec<u8> = Vec::new(); - let data = TestStruct { - a: 27, - b: "foo".to_string(), - }; - - let mut expected = Vec::new(); - zig_i64(27, &mut expected)?; - zig_i64(3, &mut expected)?; - expected.extend([b'f', b'o', b'o']); - - let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?; - - assert_eq!(bytes, expected.len()); - assert_eq!(writer, expected); - - Ok(()) - } - #[test] fn avro_rs_220_flush_write_header() -> TestResult { let schema = Schema::parse_str(SCHEMA)?; @@ -1000,181 +685,6 @@ mod tests { Ok(()) } - #[test] - fn test_union_not_null() -> TestResult { - let schema = Schema::parse_str(UNION_SCHEMA)?; - let union = Value::Union(1, Box::new(Value::Long(3))); - - let mut expected = Vec::new(); - zig_i64(1, &mut expected)?; - zig_i64(3, &mut expected)?; - - assert_eq!(to_avro_datum(&schema, union)?, expected); - - Ok(()) - } - - #[test] - fn test_union_null() -> TestResult { - let schema = Schema::parse_str(UNION_SCHEMA)?; - let union = Value::Union(0, Box::new(Value::Null)); - - let mut expected = Vec::new(); - zig_i64(0, &mut expected)?; - - assert_eq!(to_avro_datum(&schema, union)?, expected); - - Ok(()) - } - - fn logical_type_test<T: Into<Value> + Clone>( - schema_str: &'static str, - - expected_schema: &Schema, - value: Value, - - raw_schema: &Schema, - raw_value: T, - ) -> TestResult { - let schema = Schema::parse_str(schema_str)?; - assert_eq!(&schema, expected_schema); - // The serialized format should be the same as the schema. - let ser = to_avro_datum(&schema, value.clone())?; - let raw_ser = to_avro_datum(raw_schema, raw_value)?; - assert_eq!(ser, raw_ser); - - // Should deserialize from the schema into the logical type. - let mut r = ser.as_slice(); - let de = crate::from_avro_datum(&schema, &mut r, None)?; - assert_eq!(de, value); - Ok(()) - } - - #[test] - fn date() -> TestResult { - logical_type_test( - r#"{"type": "int", "logicalType": "date"}"#, - &Schema::Date, - Value::Date(1_i32), - &Schema::Int, - 1_i32, - ) - } - - #[test] - fn time_millis() -> TestResult { - logical_type_test( - r#"{"type": "int", "logicalType": "time-millis"}"#, - &Schema::TimeMillis, - Value::TimeMillis(1_i32), - &Schema::Int, - 1_i32, - ) - } - - #[test] - fn time_micros() -> TestResult { - logical_type_test( - r#"{"type": "long", "logicalType": "time-micros"}"#, - &Schema::TimeMicros, - Value::TimeMicros(1_i64), - &Schema::Long, - 1_i64, - ) - } - - #[test] - fn timestamp_millis() -> TestResult { - logical_type_test( - r#"{"type": "long", "logicalType": "timestamp-millis"}"#, - &Schema::TimestampMillis, - Value::TimestampMillis(1_i64), - &Schema::Long, - 1_i64, - ) - } - - #[test] - fn timestamp_micros() -> TestResult { - logical_type_test( - r#"{"type": "long", "logicalType": "timestamp-micros"}"#, - &Schema::TimestampMicros, - Value::TimestampMicros(1_i64), - &Schema::Long, - 1_i64, - ) - } - - #[test] - fn decimal_fixed() -> TestResult { - let size = 30; - let fixed = FixedSchema { - name: Name::new("decimal")?, - aliases: None, - doc: None, - size, - attributes: Default::default(), - }; - let inner = InnerDecimalSchema::Fixed(fixed.clone()); - let value = vec![0u8; size]; - logical_type_test( - r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#, - &Schema::Decimal(DecimalSchema { - precision: 20, - scale: 5, - inner, - }), - Value::Decimal(Decimal::from(value.clone())), - &Schema::Fixed(fixed), - Value::Fixed(size, value), - ) - } - - #[test] - fn decimal_bytes() -> TestResult { - let value = vec![0u8; 10]; - logical_type_test( - r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#, - &Schema::Decimal(DecimalSchema { - precision: 4, - scale: 3, - inner: InnerDecimalSchema::Bytes, - }), - Value::Decimal(Decimal::from(value.clone())), - &Schema::Bytes, - value, - ) - } - - #[test] - fn duration() -> TestResult { - let inner = Schema::Fixed(FixedSchema { - name: Name::new("duration")?, - aliases: None, - doc: None, - size: 12, - attributes: Default::default(), - }); - let value = Value::Duration(Duration::new( - Months::new(256), - Days::new(512), - Millis::new(1024), - )); - logical_type_test( - r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#, - &Schema::Duration(FixedSchema { - name: Name::try_from("duration").expect("Name is valid"), - aliases: None, - doc: None, - size: 12, - attributes: Default::default(), - }), - value, - &inner, - Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]), - ) - } - #[test] fn test_writer_append() -> TestResult { let schema = Schema::parse_str(SCHEMA)?; @@ -1564,163 +1074,6 @@ mod tests { Ok(()) } - #[derive(Serialize, Clone)] - struct TestSingleObjectWriter { - a: i64, - b: f64, - c: Vec<String>, - } - - impl AvroSchema for TestSingleObjectWriter { - fn get_schema() -> Schema { - let schema = r#" - { - "type":"record", - "name":"TestSingleObjectWrtierSerialize", - "fields":[ - { - "name":"a", - "type":"long" - }, - { - "name":"b", - "type":"double" - }, - { - "name":"c", - "type":{ - "type":"array", - "items":"string" - } - } - ] - } - "#; - Schema::parse_str(schema).unwrap() - } - } - - impl From<TestSingleObjectWriter> for Value { - fn from(obj: TestSingleObjectWriter) -> Value { - Value::Record(vec![ - ("a".into(), obj.a.into()), - ("b".into(), obj.b.into()), - ( - "c".into(), - Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), - ), - ]) - } - } - - #[test] - fn test_single_object_writer() -> TestResult { - let mut buf: Vec<u8> = Vec::new(); - let obj = TestSingleObjectWriter { - a: 300, - b: 34.555, - c: vec!["cat".into(), "dog".into()], - }; - let mut writer = GenericSingleObjectWriter::new_with_capacity( - &TestSingleObjectWriter::get_schema(), - 1024, - ) - .expect("Should resolve schema"); - let value = obj.into(); - let written_bytes = writer - .write_value_ref(&value, &mut buf) - .expect("Error serializing properly"); - - assert!(buf.len() > 10, "no bytes written"); - assert_eq!(buf.len(), written_bytes); - assert_eq!(buf[0], 0xC3); - assert_eq!(buf[1], 0x01); - assert_eq!( - &buf[2..10], - &TestSingleObjectWriter::get_schema() - .fingerprint::<Rabin>() - .bytes[..] - ); - let mut msg_binary = Vec::new(); - encode( - &value, - &TestSingleObjectWriter::get_schema(), - &mut msg_binary, - ) - .expect("encode should have failed by here as a dependency of any writing"); - assert_eq!(&buf[10..], &msg_binary[..]); - - Ok(()) - } - - #[test] - fn test_single_object_writer_with_header_builder() -> TestResult { - let mut buf: Vec<u8> = Vec::new(); - let obj = TestSingleObjectWriter { - a: 300, - b: 34.555, - c: vec!["cat".into(), "dog".into()], - }; - let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; - let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid); - let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder( - &TestSingleObjectWriter::get_schema(), - 1024, - header_builder, - ) - .expect("Should resolve schema"); - let value = obj.into(); - writer - .write_value_ref(&value, &mut buf) - .expect("Error serializing properly"); - - assert_eq!(buf[0], 0x03); - assert_eq!(buf[1], 0x00); - assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]); - Ok(()) - } - - #[test] - fn test_writer_parity() -> TestResult { - let obj1 = TestSingleObjectWriter { - a: 300, - b: 34.555, - c: vec!["cat".into(), "dog".into()], - }; - - let mut buf1: Vec<u8> = Vec::new(); - let mut buf2: Vec<u8> = Vec::new(); - let mut buf3: Vec<u8> = Vec::new(); - let mut buf4: Vec<u8> = Vec::new(); - - let mut generic_writer = GenericSingleObjectWriter::new_with_capacity( - &TestSingleObjectWriter::get_schema(), - 1024, - ) - .expect("Should resolve schema"); - let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new() - .expect("Resolved should pass"); - specific_writer - .write_ref(&obj1, &mut buf1) - .expect("Serialization expected"); - specific_writer - .write_ref(&obj1, &mut buf2) - .expect("Serialization expected"); - specific_writer - .write_value(obj1.clone(), &mut buf3) - .expect("Serialization expected"); - - generic_writer - .write_value(obj1.into(), &mut buf4) - .expect("Serialization expected"); - - assert_eq!(buf1, buf2); - assert_eq!(buf2, buf3); - assert_eq!(buf3, buf4); - - Ok(()) - } - #[test] fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult { const SCHEMA: &str = r#" @@ -1846,51 +1199,6 @@ mod tests { Ok(()) } - #[test] - fn avro_rs_439_specific_single_object_writer_ref() -> TestResult { - #[derive(Serialize)] - struct Recursive { - field: bool, - recurse: Option<Box<Recursive>>, - } - - impl AvroSchema for Recursive { - fn get_schema() -> Schema { - Schema::parse_str( - r#"{ - "name": "Recursive", - "type": "record", - "fields": [ - { "name": "field", "type": "boolean" }, - { "name": "recurse", "type": ["null", "Recursive"] } - ] - }"#, - ) - .unwrap() - } - } - - let mut buffer = Vec::new(); - let writer = SpecificSingleObjectWriter::new()?; - - writer.write( - Recursive { - field: true, - recurse: Some(Box::new(Recursive { - field: false, - recurse: None, - })), - }, - &mut buffer, - )?; - assert_eq!( - buffer, - &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..] - ); - - Ok(()) - } - #[test] fn avro_rs_310_append_unvalidated_value() -> TestResult { let schema = Schema::String; diff --git a/avro/src/writer/single_object.rs b/avro/src/writer/single_object.rs new file mode 100644 index 0000000..136986a --- /dev/null +++ b/avro/src/writer/single_object.rs @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Write, marker::PhantomData, ops::RangeInclusive}; + +use serde::Serialize; + +use crate::{ + AvroResult, AvroSchema, Schema, + error::Details, + headers::{HeaderBuilder, RabinFingerprintHeader}, + schema::ResolvedOwnedSchema, + types::Value, + write_avro_datum_ref, + writer::datum::write_value_ref_owned_resolved, +}; + +/// Writer that encodes messages according to the single object encoding v1 spec +/// Uses an API similar to the current File Writer +/// Writes all object bytes at once, and drains internal buffer +pub struct GenericSingleObjectWriter { + buffer: Vec<u8>, + resolved: ResolvedOwnedSchema, +} + +impl GenericSingleObjectWriter { + pub fn new_with_capacity( + schema: &Schema, + initial_buffer_cap: usize, + ) -> AvroResult<GenericSingleObjectWriter> { + let header_builder = RabinFingerprintHeader::from_schema(schema); + Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder) + } + + pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>( + schema: &Schema, + initial_buffer_cap: usize, + header_builder: HB, + ) -> AvroResult<GenericSingleObjectWriter> { + let mut buffer = Vec::with_capacity(initial_buffer_cap); + let header = header_builder.build_header(); + buffer.extend_from_slice(&header); + + Ok(GenericSingleObjectWriter { + buffer, + resolved: ResolvedOwnedSchema::try_from(schema.clone())?, + }) + } + + const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize; + + /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header + pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> { + let original_length = self.buffer.len(); + if !Self::HEADER_LENGTH_RANGE.contains(&original_length) { + Err(Details::IllegalSingleObjectWriterState.into()) + } else { + write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; + writer + .write_all(&self.buffer) + .map_err(Details::WriteBytes)?; + let len = self.buffer.len(); + self.buffer.truncate(original_length); + Ok(len) + } + } + + /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header + pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> { + self.write_value_ref(&v, writer) + } +} + +/// Writer that encodes messages according to the single object encoding v1 spec +pub struct SpecificSingleObjectWriter<T> +where + T: AvroSchema, +{ + resolved: ResolvedOwnedSchema, + header: Vec<u8>, + _model: PhantomData<T>, +} + +impl<T> SpecificSingleObjectWriter<T> +where + T: AvroSchema, +{ + pub fn new() -> AvroResult<Self> { + let schema = T::get_schema(); + let header = RabinFingerprintHeader::from_schema(&schema).build_header(); + let resolved = ResolvedOwnedSchema::new(schema)?; + // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice + Ok(Self { + resolved, + header, + _model: PhantomData, + }) + } + + pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> { + let header = header_builder.build_header(); + let resolved = ResolvedOwnedSchema::new(T::get_schema())?; + Ok(Self { + resolved, + header, + _model: PhantomData, + }) + } + + /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead. + #[deprecated(since = "0.22.0", note = "Use new() instead")] + pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> { + Self::new() + } +} + +impl<T> SpecificSingleObjectWriter<T> +where + T: AvroSchema + Into<Value>, +{ + /// Write the value to the writer + /// + /// Returns the number of bytes written. + /// + /// Each call writes a complete single-object encoded message (header + data), + /// making each message independently decodable. + pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { + writer + .write_all(&self.header) + .map_err(Details::WriteBytes)?; + let value: Value = data.into(); + let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?; + Ok(bytes + self.header.len()) + } +} + +impl<T> SpecificSingleObjectWriter<T> +where + T: AvroSchema + Serialize, +{ + /// Write the object to the writer. + /// + /// Returns the number of bytes written. + /// + /// Each call writes a complete single-object encoded message (header + data), + /// making each message independently decodable. + pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> { + writer + .write_all(&self.header) + .map_err(Details::WriteBytes)?; + + let bytes = write_avro_datum_ref( + self.resolved.get_root_schema(), + self.resolved.get_names(), + data, + writer, + )?; + + Ok(bytes + self.header.len()) + } + + /// Write the object to the writer. + /// + /// Returns the number of bytes written. + /// + /// Each call writes a complete single-object encoded message (header + data), + /// making each message independently decodable. + pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { + self.write_ref(&data, writer) + } +} + +#[cfg(test)] +mod tests { + use apache_avro_test_helper::TestResult; + use uuid::Uuid; + + use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin}; + + use super::*; + + #[derive(Serialize, Clone)] + struct TestSingleObjectWriter { + a: i64, + b: f64, + c: Vec<String>, + } + + impl AvroSchema for TestSingleObjectWriter { + fn get_schema() -> Schema { + let schema = r#" + { + "type":"record", + "name":"TestSingleObjectWrtierSerialize", + "fields":[ + { + "name":"a", + "type":"long" + }, + { + "name":"b", + "type":"double" + }, + { + "name":"c", + "type":{ + "type":"array", + "items":"string" + } + } + ] + } + "#; + Schema::parse_str(schema).unwrap() + } + } + + impl From<TestSingleObjectWriter> for Value { + fn from(obj: TestSingleObjectWriter) -> Value { + Value::Record(vec![ + ("a".into(), obj.a.into()), + ("b".into(), obj.b.into()), + ( + "c".into(), + Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), + ), + ]) + } + } + + #[test] + fn test_single_object_writer() -> TestResult { + let mut buf: Vec<u8> = Vec::new(); + let obj = TestSingleObjectWriter { + a: 300, + b: 34.555, + c: vec!["cat".into(), "dog".into()], + }; + let mut writer = GenericSingleObjectWriter::new_with_capacity( + &TestSingleObjectWriter::get_schema(), + 1024, + ) + .expect("Should resolve schema"); + let value = obj.into(); + let written_bytes = writer + .write_value_ref(&value, &mut buf) + .expect("Error serializing properly"); + + assert!(buf.len() > 10, "no bytes written"); + assert_eq!(buf.len(), written_bytes); + assert_eq!(buf[0], 0xC3); + assert_eq!(buf[1], 0x01); + assert_eq!( + &buf[2..10], + &TestSingleObjectWriter::get_schema() + .fingerprint::<Rabin>() + .bytes[..] + ); + let mut msg_binary = Vec::new(); + encode( + &value, + &TestSingleObjectWriter::get_schema(), + &mut msg_binary, + ) + .expect("encode should have failed by here as a dependency of any writing"); + assert_eq!(&buf[10..], &msg_binary[..]); + + Ok(()) + } + + #[test] + fn test_single_object_writer_with_header_builder() -> TestResult { + let mut buf: Vec<u8> = Vec::new(); + let obj = TestSingleObjectWriter { + a: 300, + b: 34.555, + c: vec!["cat".into(), "dog".into()], + }; + let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; + let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid); + let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder( + &TestSingleObjectWriter::get_schema(), + 1024, + header_builder, + ) + .expect("Should resolve schema"); + let value = obj.into(); + writer + .write_value_ref(&value, &mut buf) + .expect("Error serializing properly"); + + assert_eq!(buf[0], 0x03); + assert_eq!(buf[1], 0x00); + assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]); + Ok(()) + } + + #[test] + fn test_writer_parity() -> TestResult { + let obj1 = TestSingleObjectWriter { + a: 300, + b: 34.555, + c: vec!["cat".into(), "dog".into()], + }; + + let mut buf1: Vec<u8> = Vec::new(); + let mut buf2: Vec<u8> = Vec::new(); + let mut buf3: Vec<u8> = Vec::new(); + let mut buf4: Vec<u8> = Vec::new(); + + let mut generic_writer = GenericSingleObjectWriter::new_with_capacity( + &TestSingleObjectWriter::get_schema(), + 1024, + ) + .expect("Should resolve schema"); + let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new() + .expect("Resolved should pass"); + specific_writer + .write_ref(&obj1, &mut buf1) + .expect("Serialization expected"); + specific_writer + .write_ref(&obj1, &mut buf2) + .expect("Serialization expected"); + specific_writer + .write_value(obj1.clone(), &mut buf3) + .expect("Serialization expected"); + + generic_writer + .write_value(obj1.into(), &mut buf4) + .expect("Serialization expected"); + + assert_eq!(buf1, buf2); + assert_eq!(buf2, buf3); + assert_eq!(buf3, buf4); + + Ok(()) + } + + #[test] + fn avro_rs_439_specific_single_object_writer_ref() -> TestResult { + #[derive(Serialize)] + struct Recursive { + field: bool, + recurse: Option<Box<Recursive>>, + } + + impl AvroSchema for Recursive { + fn get_schema() -> Schema { + Schema::parse_str( + r#"{ + "name": "Recursive", + "type": "record", + "fields": [ + { "name": "field", "type": "boolean" }, + { "name": "recurse", "type": ["null", "Recursive"] } + ] + }"#, + ) + .unwrap() + } + } + + let mut buffer = Vec::new(); + let writer = SpecificSingleObjectWriter::new()?; + + writer.write( + Recursive { + field: true, + recurse: Some(Box::new(Recursive { + field: false, + recurse: None, + })), + }, + &mut buffer, + )?; + assert_eq!( + buffer, + &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..] + ); + + Ok(()) + } +}
