This is an automated email from the ASF dual-hosted git repository.
spetz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 4f55e9563 feat(connectors): implement Avro payload support with
separate encoder, decoder and transform crates (#3141)
4f55e9563 is described below
commit 4f55e95638a721c4c992129379ee4b9b0a1002a8
Author: Tyooughtul <[email protected]>
AuthorDate: Fri May 1 20:09:38 2026 +0800
feat(connectors): implement Avro payload support with separate encoder,
decoder and transform crates (#3141)
Closes #1846
---
Cargo.lock | 1 +
Cargo.toml | 1 +
core/connectors/runtime/src/configs/connectors.rs | 21 +-
core/connectors/runtime/src/sink.rs | 28 +-
core/connectors/runtime/src/source.rs | 21 +-
core/connectors/sdk/Cargo.toml | 1 +
core/connectors/sdk/src/decoders/avro.rs | 442 +++++++++++++++
core/connectors/sdk/src/decoders/mod.rs | 1 +
core/connectors/sdk/src/encoders/avro.rs | 604 +++++++++++++++++++++
core/connectors/sdk/src/encoders/flatbuffer.rs | 1 +
core/connectors/sdk/src/encoders/mod.rs | 1 +
core/connectors/sdk/src/encoders/proto.rs | 12 +
core/connectors/sdk/src/lib.rs | 17 +-
core/connectors/sdk/src/transforms/avro_convert.rs | 398 ++++++++++++++
core/connectors/sdk/src/transforms/mod.rs | 8 +
.../connectors/sdk/src/transforms/proto_convert.rs | 17 +
core/connectors/sinks/http_sink/src/lib.rs | 8 +
17 files changed, 1565 insertions(+), 17 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c213ee651..84c1cfac4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6796,6 +6796,7 @@ name = "iggy_connector_sdk"
version = "0.3.0"
dependencies = [
"anyhow",
+ "apache-avro",
"async-trait",
"base64 0.22.1",
"dashmap",
diff --git a/Cargo.toml b/Cargo.toml
index 6589c4b2e..1e8afafbb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -72,6 +72,7 @@ aes-gcm = "0.10.3"
ahash = { version = "0.8.12", features = ["serde"] }
aligned-vec = "0.6.4"
anyhow = "1.0.102"
+apache-avro = "0.21.0"
argon2 = "0.5.3"
arrow = "57.3.0"
arrow-array = "57.3.0"
diff --git a/core/connectors/runtime/src/configs/connectors.rs
b/core/connectors/runtime/src/configs/connectors.rs
index 8d8f33097..70e4f18a9 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -32,6 +32,7 @@ use iggy_connector_sdk::transforms::TransformType;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Formatter;
+use std::path::PathBuf;
use strum::Display;
#[derive(
@@ -180,6 +181,9 @@ pub struct StreamConsumerConfig {
pub topics: Vec<String>,
#[config_env(leaf)]
pub schema: Schema,
+ pub avro_schema_json: Option<String>,
+ #[config_env(leaf)]
+ pub avro_schema_path: Option<PathBuf>,
pub batch_length: Option<u32>,
pub poll_interval: Option<String>,
pub consumer_group: Option<String>,
@@ -191,6 +195,9 @@ pub struct StreamProducerConfig {
pub topic: String,
#[config_env(leaf)]
pub schema: Schema,
+ pub avro_schema_json: Option<String>,
+ #[config_env(leaf)]
+ pub avro_schema_path: Option<PathBuf>,
pub batch_length: Option<u32>,
pub linger_time: Option<String>,
}
@@ -346,7 +353,7 @@ impl std::fmt::Display for StreamConsumerConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ stream: {}, topics: {}, schema: {:?}, batch_length: {:?},
poll_interval: {:?}, consumer_group: {:?} }}",
+ "{{ stream: {}, topics: {}, schema: {:?}, avro_schema_json: {:?},
avro_schema_path: {:?}, batch_length: {:?}, poll_interval: {:?},
consumer_group: {:?} }}",
self.stream,
self.topics
.iter()
@@ -354,6 +361,8 @@ impl std::fmt::Display for StreamConsumerConfig {
.collect::<Vec<&str>>()
.join(", "),
self.schema,
+ self.avro_schema_json,
+ self.avro_schema_path,
self.batch_length,
self.poll_interval,
self.consumer_group
@@ -365,8 +374,14 @@ impl std::fmt::Display for StreamProducerConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ stream: {}, topic: {}, schema: {:?}, batch_length: {:?},
linger_time: {:?} }}",
- self.stream, self.topic, self.schema, self.batch_length,
self.linger_time
+ "{{ stream: {}, topic: {}, schema: {:?}, avro_schema_json: {:?},
avro_schema_path: {:?}, batch_length: {:?}, linger_time: {:?} }}",
+ self.stream,
+ self.topic,
+ self.schema,
+ self.avro_schema_json,
+ self.avro_schema_path,
+ self.batch_length,
+ self.linger_time
)
}
}
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index b6c8965d3..a0ab25b1a 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -31,9 +31,10 @@ use iggy::prelude::{
AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration,
IggyMessage,
PollingStrategy,
};
+use iggy_connector_sdk::decoders::avro::{AvroConfig, AvroStreamDecoder};
use iggy_connector_sdk::{
- DecodedMessage, MessagesMetadata, RawMessage, RawMessages,
ReceivedMessage, StreamDecoder,
- TopicMetadata, sink::ConsumeCallback, transforms::Transform,
+ DecodedMessage, MessagesMetadata, RawMessage, RawMessages,
ReceivedMessage, Schema,
+ StreamDecoder, TopicMetadata, sink::ConsumeCallback, transforms::Transform,
};
use std::{
collections::HashMap,
@@ -426,12 +427,23 @@ pub(crate) async fn setup_sink_consumers(
.batch_length(batch_length)
.build();
consumer.init().await?;
- consumers.push((
- consumer,
- stream.schema.decoder(),
- batch_length,
- transforms.clone(),
- ));
+ let decoder: Arc<dyn StreamDecoder> = match stream.schema {
+ Schema::Avro => {
+ let config = AvroConfig {
+ schema_json: stream.avro_schema_json.clone(),
+ schema_path: stream.avro_schema_path.clone(),
+ ..AvroConfig::default()
+ };
+
Arc::new(AvroStreamDecoder::try_new(config).map_err(|error| {
+ RuntimeError::InvalidConfiguration(format!(
+ "Failed to create Avro decoder for stream '{}':
{error}",
+ stream.stream
+ ))
+ })?)
+ }
+ other => other.decoder(),
+ };
+ consumers.push((consumer, decoder, batch_length,
transforms.clone()));
}
}
Ok(consumers)
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index c4af20450..2c20eb1b0 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -24,8 +24,9 @@ use iggy::prelude::{
DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError,
IggyMessage,
IggyProducer,
};
+use iggy_connector_sdk::encoders::avro::{AvroEncoderConfig, AvroStreamEncoder};
use iggy_connector_sdk::{
- ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder,
TopicMetadata,
+ ConnectorState, DecodedMessage, Error, ProducedMessages, Schema,
StreamEncoder, TopicMetadata,
source::HandleCallback, transforms::Transform,
};
use once_cell::sync::Lazy;
@@ -259,7 +260,23 @@ pub(crate) async fn setup_source_producer(
)
.build();
producer.init().await?;
- last_encoder = Some(stream.schema.encoder());
+ let encoder: Arc<dyn StreamEncoder> = match stream.schema {
+ Schema::Avro => {
+ let config = AvroEncoderConfig {
+ schema_json: stream.avro_schema_json.clone(),
+ schema_path: stream.avro_schema_path.clone(),
+ ..AvroEncoderConfig::default()
+ };
+ Arc::new(AvroStreamEncoder::try_new(config).map_err(|error| {
+ RuntimeError::InvalidConfiguration(format!(
+ "Failed to create Avro encoder for stream '{}':
{error}",
+ stream.stream
+ ))
+ })?)
+ }
+ other => other.encoder(),
+ };
+ last_encoder = Some(encoder);
last_producer = Some(producer);
}
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 364b7e6be..0927bdf60 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -38,6 +38,7 @@ api = ["strum"]
[dependencies]
anyhow = { workspace = true }
+apache-avro = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
dashmap = { workspace = true }
diff --git a/core/connectors/sdk/src/decoders/avro.rs
b/core/connectors/sdk/src/decoders/avro.rs
new file mode 100644
index 000000000..67050f716
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/avro.rs
@@ -0,0 +1,442 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+use apache_avro::Schema as AvroSchema;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tracing::{error, info};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AvroConfig {
+ pub schema_path: Option<PathBuf>,
+ pub schema_json: Option<String>,
+ pub field_mappings: Option<HashMap<String, String>>,
+ pub extract_as_json: bool,
+}
+
+impl Default for AvroConfig {
+ fn default() -> Self {
+ Self {
+ schema_path: None,
+ schema_json: None,
+ field_mappings: None,
+ extract_as_json: true,
+ }
+ }
+}
+
+pub struct AvroStreamDecoder {
+ config: AvroConfig,
+ schema: Option<AvroSchema>,
+}
+
+impl AvroStreamDecoder {
+ /// Use [`Self::try_new`] for fail-fast behaviour.
+ pub fn new(config: AvroConfig) -> Self {
+ let mut decoder = Self {
+ config,
+ schema: None,
+ };
+ if let Err(e) = decoder.load_schema() {
+ error!("Failed to load Avro schema during decoder creation: {}",
e);
+ }
+ decoder
+ }
+
+ pub fn try_new(config: AvroConfig) -> Result<Self, Error> {
+ let mut decoder = Self {
+ config,
+ schema: None,
+ };
+ decoder.load_schema()?;
+ Ok(decoder)
+ }
+
+ pub fn new_default() -> Self {
+ Self::new(AvroConfig::default())
+ }
+
+ pub fn update_config(&mut self, config: AvroConfig) -> Result<(), Error> {
+ let old_config = std::mem::replace(&mut self.config, config);
+ if let Err(e) = self.load_schema() {
+ // Rollback on failure to keep config/schema consistent
+ self.config = old_config;
+ return Err(e);
+ }
+ Ok(())
+ }
+
+ fn load_schema(&mut self) -> Result<(), Error> {
+ if let Some(schema_json) = &self.config.schema_json {
+ match AvroSchema::parse_str(schema_json) {
+ Ok(schema) => {
+ info!("Loaded Avro schema from inline JSON");
+ self.schema = Some(schema);
+ return Ok(());
+ }
+ Err(e) => {
+ error!("Failed to parse inline Avro schema: {}", e);
+ return Err(Error::InvalidConfigValue(format!(
+ "Invalid Avro schema JSON: {e}"
+ )));
+ }
+ }
+ }
+
+ if let Some(schema_path) = &self.config.schema_path {
+ match std::fs::read_to_string(schema_path) {
+ Ok(schema_content) => match
AvroSchema::parse_str(&schema_content) {
+ Ok(schema) => {
+ info!("Loaded Avro schema from file: {:?}",
schema_path);
+ self.schema = Some(schema);
+ return Ok(());
+ }
+ Err(e) => {
+ error!("Failed to parse Avro schema file: {}", e);
+ return Err(Error::InvalidConfigValue(format!(
+ "Invalid Avro schema file: {e}"
+ )));
+ }
+ },
+ Err(e) => {
+ error!("Failed to read Avro schema file: {}", e);
+ return Err(Error::InvalidConfigValue(format!(
+ "Cannot read schema file: {e}"
+ )));
+ }
+ }
+ }
+
+ self.schema = None;
+ Ok(())
+ }
+
+ fn decode_as_json(&self, payload: &[u8]) -> Result<Payload, Error> {
+ let schema = self.schema.as_ref().ok_or_else(|| {
+ error!("Cannot decode Avro to JSON without a schema");
+ Error::InvalidConfigValue("Avro schema is required for JSON
extraction".to_string())
+ })?;
+
+ let mut reader = payload;
+ let avro_value = apache_avro::from_avro_datum(schema, &mut reader,
None).map_err(|e| {
+ error!("Failed to decode Avro datum: {}", e);
+ Error::CannotDecode(Schema::Avro)
+ })?;
+
+ if !reader.is_empty() {
+ error!(
+ "Avro payload contains trailing bytes after datum: {} bytes
remaining",
+ reader.len()
+ );
+ return Err(Error::CannotDecode(Schema::Avro));
+ }
+
+ let serde_value = serde_json::Value::try_from(avro_value).map_err(|e| {
+ error!("Failed to convert Avro value to JSON: {}", e);
+ Error::CannotDecode(Schema::Avro)
+ })?;
+
+ let mut json_bytes = serde_json::to_vec(&serde_value).map_err(|e| {
+ error!("Failed to serialize JSON value: {}", e);
+ Error::CannotDecode(Schema::Avro)
+ })?;
+
+ let json_value = simd_json::to_owned_value(&mut
json_bytes).map_err(|e| {
+ error!("Failed to parse JSON into simd_json: {}", e);
+ Error::CannotDecode(Schema::Avro)
+ })?;
+
+ let transformed =
self.apply_field_transformations(Payload::Json(json_value))?;
+ Ok(transformed)
+ }
+
+ fn decode_as_raw(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+ Ok(Payload::Avro(payload))
+ }
+
+ fn apply_field_transformations(&self, payload: Payload) -> Result<Payload,
Error> {
+ if let Some(mappings) = &self.config.field_mappings {
+ match payload {
+ Payload::Json(json_value) => {
+ if let simd_json::OwnedValue::Object(mut map) = json_value
{
+ let mut new_entries = Vec::new();
+
+ for (key, value) in map.iter() {
+ if let Some(new_key) = mappings.get(key) {
+ new_entries.push((new_key.clone(),
value.clone()));
+ } else {
+ new_entries.push((key.clone(), value.clone()));
+ }
+ }
+
+ map.clear();
+ for (key, value) in new_entries {
+ map.insert(key, value);
+ }
+
+ Ok(Payload::Json(simd_json::OwnedValue::Object(map)))
+ } else {
+ Ok(Payload::Json(json_value))
+ }
+ }
+ other => Ok(other),
+ }
+ } else {
+ Ok(payload)
+ }
+ }
+}
+
+impl StreamDecoder for AvroStreamDecoder {
+ fn schema(&self) -> Schema {
+ Schema::Avro
+ }
+
+ fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+ if payload.is_empty() {
+ return Err(Error::InvalidPayloadType);
+ }
+
+ if self.config.extract_as_json {
+ self.decode_as_json(&payload)
+ } else {
+ self.decode_as_raw(payload)
+ }
+ }
+}
+
+impl Default for AvroStreamDecoder {
+ fn default() -> Self {
+ Self::new_default()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn create_test_schema_json() -> String {
+ r#"{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "age", "type": "int"}
+ ]
+ }"#
+ .to_string()
+ }
+
+ fn encode_test_avro_data(schema_json: &str) -> Vec<u8> {
+ use apache_avro::types::Value as AvroValue;
+ let schema = AvroSchema::parse_str(schema_json).unwrap();
+ let record = AvroValue::Record(vec![
+ ("name".to_string(), AvroValue::String("Alice".to_string())),
+ ("age".to_string(), AvroValue::Int(30)),
+ ]);
+ apache_avro::to_avro_datum(&schema, record).unwrap()
+ }
+
+ #[test]
+ fn decode_should_fail_given_empty_payload() {
+ let decoder = AvroStreamDecoder::default();
+ let result = decoder.decode(vec![]);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn decode_should_return_avro_payload_when_extract_as_json_is_disabled() {
+ let config = AvroConfig {
+ extract_as_json: false,
+ ..AvroConfig::default()
+ };
+ let decoder = AvroStreamDecoder::new(config);
+
+ let test_data = b"fake_avro_data".to_vec();
+ let result = decoder.decode(test_data.clone());
+
+ assert!(result.is_ok());
+ if let Ok(Payload::Avro(data)) = result {
+ assert_eq!(data, test_data);
+ } else {
+ panic!("Expected Avro payload");
+ }
+ }
+
+ #[test]
+ fn decode_should_decode_avro_to_json_with_schema() {
+ let schema_json = create_test_schema_json();
+ let avro_data = encode_test_avro_data(&schema_json);
+
+ let config = AvroConfig {
+ schema_json: Some(schema_json),
+ extract_as_json: true,
+ ..AvroConfig::default()
+ };
+ let decoder = AvroStreamDecoder::new(config);
+
+ let result = decoder.decode(avro_data);
+
+ assert!(result.is_ok());
+ if let Ok(Payload::Json(json_value)) = result {
+ if let simd_json::OwnedValue::Object(map) = &json_value {
+ assert!(map.contains_key("name"));
+ assert!(map.contains_key("age"));
+ } else {
+ panic!("Expected JSON object");
+ }
+ } else {
+ panic!("Expected JSON payload");
+ }
+ }
+
+ #[test]
+ fn decode_should_apply_field_mappings_when_configured() {
+ let schema_json = create_test_schema_json();
+ let avro_data = encode_test_avro_data(&schema_json);
+
+ let mut field_mappings = HashMap::new();
+ field_mappings.insert("name".to_string(), "full_name".to_string());
+
+ let config = AvroConfig {
+ schema_json: Some(schema_json),
+ extract_as_json: true,
+ field_mappings: Some(field_mappings),
+ ..AvroConfig::default()
+ };
+ let decoder = AvroStreamDecoder::new(config);
+
+ let result = decoder.decode(avro_data);
+
+ assert!(result.is_ok());
+ if let Ok(Payload::Json(json_value)) = result {
+ if let simd_json::OwnedValue::Object(map) = &json_value {
+ assert!(map.contains_key("full_name"));
+ assert!(!map.contains_key("name"));
+ } else {
+ panic!("Expected JSON object");
+ }
+ } else {
+ panic!("Expected JSON payload");
+ }
+ }
+
+ #[test]
+ fn config_should_have_sensible_defaults() {
+ let config = AvroConfig::default();
+
+ assert!(config.schema_path.is_none());
+ assert!(config.schema_json.is_none());
+ assert!(config.field_mappings.is_none());
+ assert!(config.extract_as_json);
+ }
+
+ #[test]
+ fn decoder_should_be_creatable_with_custom_config() {
+ let config = AvroConfig {
+ schema_path: Some(PathBuf::from("/path/to/schema.avsc")),
+ schema_json: Some(r#"{"type": "string"}"#.to_string()),
+ extract_as_json: false,
+ ..AvroConfig::default()
+ };
+
+ let decoder = AvroStreamDecoder::new(config.clone());
+
+ assert_eq!(decoder.config.schema_path, config.schema_path);
+ assert_eq!(decoder.config.schema_json, config.schema_json);
+ assert_eq!(decoder.config.extract_as_json, config.extract_as_json);
+ }
+
+ #[test]
+ fn decode_should_fail_without_schema_when_extract_as_json_is_true() {
+ let config = AvroConfig {
+ schema_json: None,
+ schema_path: None,
+ extract_as_json: true,
+ ..AvroConfig::default()
+ };
+ let decoder = AvroStreamDecoder::new(config);
+
+ let result = decoder.decode(b"{\"name\": \"test\"}".to_vec());
+ assert!(
+ result.is_err(),
+ "Expected error when schema is missing and extract_as_json=true"
+ );
+ }
+
+ #[test]
+ fn try_new_should_fail_on_invalid_schema() {
+ let config = AvroConfig {
+ schema_json: Some("not a valid schema".to_string()),
+ ..AvroConfig::default()
+ };
+ let result = AvroStreamDecoder::try_new(config);
+ assert!(
+ result.is_err(),
+ "Expected try_new to fail with invalid schema"
+ );
+ }
+
+ #[test]
+ fn update_config_should_rollback_on_invalid_schema() {
+ let valid_schema = create_test_schema_json();
+ let mut decoder = AvroStreamDecoder::new(AvroConfig {
+ schema_json: Some(valid_schema.clone()),
+ extract_as_json: true,
+ ..AvroConfig::default()
+ });
+
+ assert!(decoder.schema.is_some());
+
+ let old_config = decoder.config.clone();
+ let invalid_config = AvroConfig {
+ schema_json: Some("invalid schema".to_string()),
+ extract_as_json: true,
+ ..AvroConfig::default()
+ };
+
+ let result = decoder.update_config(invalid_config);
+ assert!(result.is_err(), "Expected update_config to fail");
+
+ // After rollback, config and schema should remain unchanged
+ assert_eq!(decoder.config.schema_json, old_config.schema_json);
+ assert!(decoder.schema.is_some());
+ }
+
+ #[test]
+ fn decode_should_reject_trailing_bytes() {
+ let schema_json = create_test_schema_json();
+ let mut avro_data = encode_test_avro_data(&schema_json);
+ avro_data.extend_from_slice(b"trailing garbage");
+
+ let config = AvroConfig {
+ schema_json: Some(schema_json),
+ extract_as_json: true,
+ ..AvroConfig::default()
+ };
+ let decoder = AvroStreamDecoder::new(config);
+
+ let result = decoder.decode(avro_data);
+ assert!(
+ result.is_err(),
+ "Expected decode to fail when payload has trailing bytes"
+ );
+ }
+}
diff --git a/core/connectors/sdk/src/decoders/mod.rs
b/core/connectors/sdk/src/decoders/mod.rs
index c7c4cfd80..bb167c3cb 100644
--- a/core/connectors/sdk/src/decoders/mod.rs
+++ b/core/connectors/sdk/src/decoders/mod.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+pub mod avro;
pub mod flatbuffer;
pub mod json;
pub mod proto;
diff --git a/core/connectors/sdk/src/encoders/avro.rs
b/core/connectors/sdk/src/encoders/avro.rs
new file mode 100644
index 000000000..ece592c53
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/avro.rs
@@ -0,0 +1,604 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder,
convert::owned_value_to_serde_json};
+use apache_avro::Schema as AvroSchema;
+use base64::Engine;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tracing::{error, info};
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct AvroEncoderConfig {
+ pub schema_path: Option<PathBuf>,
+ pub schema_json: Option<String>,
+ pub field_mappings: Option<HashMap<String, String>>,
+}
+
+pub struct AvroStreamEncoder {
+ config: AvroEncoderConfig,
+ schema: Option<AvroSchema>,
+}
+
+impl AvroStreamEncoder {
+ /// If you need fail-fast behaviour,
+ /// use [`try_new`](Self::try_new) instead.
+ pub fn new(config: AvroEncoderConfig) -> Self {
+ let mut encoder = Self {
+ config,
+ schema: None,
+ };
+ if let Err(e) = encoder.load_schema() {
+ error!("Failed to load Avro schema during encoder creation: {}",
e);
+ }
+ encoder
+ }
+
+ pub fn try_new(config: AvroEncoderConfig) -> Result<Self, Error> {
+ let mut encoder = Self {
+ config,
+ schema: None,
+ };
+ encoder.load_schema()?;
+ Ok(encoder)
+ }
+
+ pub fn new_default() -> Self {
+ Self::new(AvroEncoderConfig::default())
+ }
+
+ pub fn update_config(&mut self, config: AvroEncoderConfig) -> Result<(),
Error> {
+ let old_config = std::mem::replace(&mut self.config, config);
+ if let Err(e) = self.load_schema() {
+ // Rollback on failure to keep config/schema consistent
+ self.config = old_config;
+ return Err(e);
+ }
+ Ok(())
+ }
+
+ fn load_schema(&mut self) -> Result<(), Error> {
+ if let Some(schema_json) = &self.config.schema_json {
+ match AvroSchema::parse_str(schema_json) {
+ Ok(schema) => {
+ info!("Loaded Avro schema from inline JSON");
+ self.schema = Some(schema);
+ return Ok(());
+ }
+ Err(e) => {
+ error!("Failed to parse inline Avro schema: {}", e);
+ return Err(Error::InvalidConfigValue(format!(
+ "Invalid Avro schema JSON: {e}"
+ )));
+ }
+ }
+ }
+
+ if let Some(schema_path) = &self.config.schema_path {
+ match std::fs::read_to_string(schema_path) {
+ Ok(schema_content) => match
AvroSchema::parse_str(&schema_content) {
+ Ok(schema) => {
+ info!("Loaded Avro schema from file: {:?}",
schema_path);
+ self.schema = Some(schema);
+ return Ok(());
+ }
+ Err(e) => {
+ error!("Failed to parse Avro schema file: {}", e);
+ return Err(Error::InvalidConfigValue(format!(
+ "Invalid Avro schema file: {e}"
+ )));
+ }
+ },
+ Err(e) => {
+ error!("Failed to read Avro schema file: {}", e);
+ return Err(Error::InvalidConfigValue(format!(
+ "Cannot read schema file: {e}"
+ )));
+ }
+ }
+ }
+
+ self.schema = None;
+ Ok(())
+ }
+
+ fn apply_field_transformations(&self, payload: Payload) -> Result<Payload,
Error> {
+ if let Some(mappings) = &self.config.field_mappings {
+ match payload {
+ Payload::Json(json_value) => {
+ if let simd_json::OwnedValue::Object(mut map) = json_value
{
+ let mut new_entries = Vec::new();
+
+ for (key, value) in map.iter() {
+ if let Some(new_key) = mappings.get(key) {
+ new_entries.push((new_key.clone(),
value.clone()));
+ } else {
+ new_entries.push((key.clone(), value.clone()));
+ }
+ }
+
+ map.clear();
+ for (key, value) in new_entries {
+ map.insert(key, value);
+ }
+
+ Ok(Payload::Json(simd_json::OwnedValue::Object(map)))
+ } else {
+ Ok(Payload::Json(json_value))
+ }
+ }
+ other => Ok(other),
+ }
+ } else {
+ Ok(payload)
+ }
+ }
+
+ fn encode_json_to_avro(&self, json_value: simd_json::OwnedValue) ->
Result<Vec<u8>, Error> {
+ let schema = self.schema.as_ref().ok_or_else(|| {
+ error!("Cannot encode JSON to Avro without a schema");
+ Error::InvalidConfigValue("Avro schema is required for
encoding".to_string())
+ })?;
+
+ let serde_value = owned_value_to_serde_json(&json_value);
+ let avro_value = Self::serde_json_to_avro_value(serde_value, schema)?;
+
+ apache_avro::to_avro_datum(schema, avro_value).map_err(|e| {
+ error!("Failed to encode Avro datum: {}", e);
+ Error::Serialization(format!("Avro encoding failed: {e}"))
+ })
+ }
+
+ fn serde_json_to_avro_value(
+ value: serde_json::Value,
+ schema: &apache_avro::Schema,
+ ) -> Result<apache_avro::types::Value, Error> {
+ use apache_avro::Schema as AvroSchema;
+ use apache_avro::types::Value as AvroValue;
+
+ match (value, schema) {
+ (serde_json::Value::Object(map),
AvroSchema::Record(record_schema)) => {
+ let mut record = Vec::new();
+ for field in &record_schema.fields {
+ let field_value = map
+ .get(&field.name)
+ .cloned()
+ .unwrap_or(serde_json::Value::Null);
+ record.push((
+ field.name.clone(),
+ Self::serde_json_to_avro_value(field_value,
&field.schema)?,
+ ));
+ }
+ Ok(AvroValue::Record(record))
+ }
+ (serde_json::Value::Array(arr), AvroSchema::Array(array_schema))
=> {
+ Ok(AvroValue::Array(
+ arr.into_iter()
+ .map(|v| Self::serde_json_to_avro_value(v,
&array_schema.items))
+ .collect::<Result<Vec<_>, _>>()?,
+ ))
+ }
+ (serde_json::Value::Object(map), AvroSchema::Map(map_schema)) =>
Ok(AvroValue::Map(
+ map.into_iter()
+ .map(|(k, v)| {
+ Self::serde_json_to_avro_value(v,
&map_schema.types).map(|av| (k, av))
+ })
+ .collect::<Result<_, _>>()?,
+ )),
+ (value, AvroSchema::Union(union_schema)) => {
+ let schemas = union_schema.variants();
+ for (idx, variant_schema) in schemas.iter().enumerate() {
+ match Self::serde_json_to_avro_value(value.clone(),
variant_schema) {
+ Ok(avro_val) if avro_val.validate(variant_schema) => {
+ return Ok(AvroValue::Union(idx as u32,
Box::new(avro_val)));
+ }
+ _ => continue,
+ }
+ }
+ Err(Error::InvalidPayloadType)
+ }
+ (serde_json::Value::Null, AvroSchema::Null) => Ok(AvroValue::Null),
+ (serde_json::Value::Bool(b), AvroSchema::Boolean) =>
Ok(AvroValue::Boolean(b)),
+ (serde_json::Value::Number(n), AvroSchema::Int) => {
+ let v = n.as_i64().ok_or_else(|| {
+ Error::Serialization(format!("Cannot convert JSON number
to Avro Int: {n}"))
+ })?;
+ let v_i32 = i32::try_from(v).map_err(|_| {
+ Error::Serialization(format!("JSON number {v} out of range
for Avro Int (i32)"))
+ })?;
+ Ok(AvroValue::Int(v_i32))
+ }
+ (serde_json::Value::Number(n), AvroSchema::Long) => {
+ let v = n.as_i64().ok_or_else(|| {
+ Error::Serialization(format!("Cannot convert JSON number
to Avro Long: {n}"))
+ })?;
+ Ok(AvroValue::Long(v))
+ }
+ (serde_json::Value::Number(n), AvroSchema::Float) => {
+ let v = n.as_f64().ok_or_else(|| {
+ Error::Serialization(format!("Cannot convert JSON number
to Avro Float: {n}"))
+ })?;
+ Ok(AvroValue::Float(v as f32))
+ }
+ (serde_json::Value::Number(n), AvroSchema::Double) => {
+ let v = n.as_f64().ok_or_else(|| {
+ Error::Serialization(format!("Cannot convert JSON number
to Avro Double: {n}"))
+ })?;
+ Ok(AvroValue::Double(v))
+ }
+ (serde_json::Value::String(s), AvroSchema::String) =>
Ok(AvroValue::String(s)),
+ (serde_json::Value::String(s), AvroSchema::Bytes) => {
+ Ok(AvroValue::Bytes(s.into_bytes()))
+ }
+ (serde_json::Value::Array(arr), AvroSchema::Bytes) => {
+ let mut bytes = Vec::with_capacity(arr.len());
+ for (i, v) in arr.into_iter().enumerate() {
+ let n = v.as_u64().ok_or_else(|| {
+ Error::Serialization(format!(
+ "Bytes array element at index {i} is not a valid
u8: {v}"
+ ))
+ })?;
+ let b = u8::try_from(n).map_err(|_| {
+ Error::Serialization(format!(
+ "Bytes array element at index {i} out of u8 range:
{n}"
+ ))
+ })?;
+ bytes.push(b);
+ }
+ Ok(AvroValue::Bytes(bytes))
+ }
+
+ (value, schema) => {
+ let avro_val: AvroValue = value.into();
+ if avro_val.validate(schema) {
+ Ok(avro_val)
+ } else {
+ Err(Error::Serialization(format!(
+ "JSON value does not match Avro schema: {schema:?}"
+ )))
+ }
+ }
+ }
+ }
+
+ fn encode_text_to_avro(&self, text: String) -> Result<Vec<u8>, Error> {
+ let mut text_bytes = text.into_bytes();
+ if let Ok(json_value) = simd_json::to_owned_value(&mut text_bytes) {
+ return self.encode_json_to_avro(json_value);
+ }
+
+ Err(Error::InvalidJsonPayload)
+ }
+
+ fn encode_raw_to_avro(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
+ Ok(data)
+ }
+
+ pub fn convert_format(
+ &self,
+ payload: Payload,
+ target_format: Schema,
+ ) -> Result<Payload, Error> {
+ match (payload, target_format) {
+ (Payload::Avro(data), Schema::Json) => {
+ let mut data_copy = data.clone();
+ let json_value = simd_json::to_owned_value(&mut data_copy)
+ .unwrap_or_else(|_| simd_json::json!({ "avro_data_base64":
base64::engine::general_purpose::STANDARD.encode(&data) }));
+ Ok(Payload::Json(json_value))
+ }
+ (Payload::Avro(data), Schema::Text) => {
+ let base64_data =
base64::engine::general_purpose::STANDARD.encode(&data);
+ Ok(Payload::Text(base64_data))
+ }
+ (Payload::Avro(data), Schema::Raw) => Ok(Payload::Raw(data)),
+
+ (Payload::Json(json), Schema::Text) => {
+ let text =
+ simd_json::to_string_pretty(&json).map_err(|_|
Error::InvalidJsonPayload)?;
+ Ok(Payload::Text(text))
+ }
+ (Payload::Json(json), Schema::Raw) => {
+ let bytes = simd_json::to_vec(&json).map_err(|_|
Error::InvalidJsonPayload)?;
+ Ok(Payload::Raw(bytes))
+ }
+
+ (Payload::Text(text), Schema::Json) => {
+ let mut text_bytes = text.into_bytes();
+ let json_value = simd_json::to_owned_value(&mut text_bytes)
+ .map_err(|_| Error::InvalidJsonPayload)?;
+ Ok(Payload::Json(json_value))
+ }
+ (Payload::Text(text), Schema::Raw) =>
Ok(Payload::Raw(text.into_bytes())),
+
+ (Payload::Raw(data), Schema::Text) => {
+ let text = String::from_utf8(data).map_err(|_|
Error::InvalidTextPayload)?;
+ Ok(Payload::Text(text))
+ }
+ (Payload::Raw(mut data), Schema::Json) => {
+ let json_value =
+ simd_json::to_owned_value(&mut data).map_err(|_|
Error::InvalidJsonPayload)?;
+ Ok(Payload::Json(json_value))
+ }
+
+ (payload, _) => Ok(payload),
+ }
+ }
+}
+
+impl StreamEncoder for AvroStreamEncoder {
+ fn schema(&self) -> Schema {
+ Schema::Avro
+ }
+
+ fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+ let transformed_payload = self.apply_field_transformations(payload)?;
+
+ match transformed_payload {
+ Payload::Json(json_value) => self.encode_json_to_avro(json_value),
+ Payload::Text(text) => self.encode_text_to_avro(text),
+ Payload::Raw(data) => self.encode_raw_to_avro(data),
+ Payload::Avro(data) => Ok(data),
+ Payload::Proto(text) => self.encode_text_to_avro(text),
+ Payload::FlatBuffer(data) => self.encode_raw_to_avro(data),
+ }
+ }
+}
+
+impl Default for AvroStreamEncoder {
+ fn default() -> Self {
+ Self::new_default()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn create_test_schema_json() -> String {
+ r#"{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "age", "type": "int"}
+ ]
+ }"#
+ .to_string()
+ }
+
+ #[test]
+ fn encode_should_handle_json_payload_with_schema() {
+ let schema_json = create_test_schema_json();
+ let config = AvroEncoderConfig {
+ schema_json: Some(schema_json),
+ ..AvroEncoderConfig::default()
+ };
+ let encoder = AvroStreamEncoder::new(config);
+
+ let json_value = simd_json::json!({
+ "name": "Alice",
+ "age": 30
+ });
+
+ let result = encoder.encode(Payload::Json(json_value));
+
+ assert!(result.is_ok());
+ let encoded_data = result.unwrap();
+ assert!(!encoded_data.is_empty());
+ }
+
+ #[test]
+ fn encode_should_pass_through_avro_payload() {
+ let encoder = AvroStreamEncoder::default();
+
+ let avro_data = vec![1, 2, 3, 4, 5];
+ let avro_payload = Payload::Avro(avro_data.clone());
+ let result = encoder.encode(avro_payload);
+
+ assert!(result.is_ok());
+ let encoded_data = result.unwrap();
+ assert_eq!(encoded_data, avro_data);
+ }
+
+ #[test]
+ fn encode_should_apply_field_mappings_when_configured() {
+ let schema_json = create_test_schema_json();
+ let mut field_mappings = HashMap::new();
+ field_mappings.insert("full_name".to_string(), "name".to_string());
+
+ let config = AvroEncoderConfig {
+ schema_json: Some(schema_json),
+ field_mappings: Some(field_mappings),
+ ..AvroEncoderConfig::default()
+ };
+ let encoder = AvroStreamEncoder::new(config);
+
+ let json_value = simd_json::json!({
+ "full_name": "Alice",
+ "age": 30
+ });
+
+ let result = encoder.encode(Payload::Json(json_value));
+
+ assert!(result.is_ok());
+ let encoded_data = result.unwrap();
+ assert!(!encoded_data.is_empty());
+ }
+
+ #[test]
+ fn encode_should_fail_without_schema_for_json_payload() {
+ let encoder = AvroStreamEncoder::default();
+
+ let json_value = simd_json::json!({
+ "name": "Alice",
+ "age": 30
+ });
+
+ let result = encoder.encode(Payload::Json(json_value));
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn encode_should_handle_union_null_string_schema() {
+ let schema_json = r#"["null", "string"]"#.to_string();
+ let config = AvroEncoderConfig {
+ schema_json: Some(schema_json),
+ ..AvroEncoderConfig::default()
+ };
+ let encoder = AvroStreamEncoder::new(config);
+
+ let result = encoder.encode(Payload::Json(simd_json::json!("hello")));
+ assert!(result.is_ok(), "Expected string union variant to encode");
+ assert!(!result.unwrap().is_empty());
+ }
+
+ #[test]
+ fn encode_should_handle_union_null_int_schema() {
+ let schema_json = r#"["null", "int"]"#.to_string();
+ let config = AvroEncoderConfig {
+ schema_json: Some(schema_json),
+ ..AvroEncoderConfig::default()
+ };
+ let encoder = AvroStreamEncoder::new(config);
+
+ let result = encoder.encode(Payload::Json(simd_json::json!(42)));
+ assert!(result.is_ok(), "Expected int union variant to encode");
+ assert!(!result.unwrap().is_empty());
+ }
+
+ #[test]
+ fn encode_should_fail_on_int_overflow() {
+ let schema_json = r#"{"type": "int"}"#.to_string();
+ let config = AvroEncoderConfig {
+ schema_json: Some(schema_json),
+ ..AvroEncoderConfig::default()
+ };
+ let encoder = AvroStreamEncoder::new(config);
+
+ let result = encoder.encode(Payload::Json(simd_json::json!(i64::MAX)));
+ assert!(result.is_err(), "Expected overflow to fail");
+ }
+
+ #[test]
+ fn encode_should_fail_on_invalid_bytes_array_element() {
+ let schema_json = r#"{"type": "bytes"}"#.to_string();
+ let config = AvroEncoderConfig {
+ schema_json: Some(schema_json),
+ ..AvroEncoderConfig::default()
+ };
+ let encoder = AvroStreamEncoder::new(config);
+
+ let result = encoder.encode(Payload::Json(simd_json::json!([1, 2,
300])));
+ assert!(result.is_err(), "Expected out-of-range byte to fail");
+ }
+
+ #[test]
+ fn convert_format_should_transform_avro_to_json() {
+ let encoder = AvroStreamEncoder::default();
+
+ let avro_data = vec![1, 2, 3, 4, 5];
+ let result = encoder.convert_format(Payload::Avro(avro_data.clone()),
Schema::Json);
+
+ assert!(result.is_ok());
+ if let Ok(Payload::Json(json_value)) = result {
+ if let simd_json::OwnedValue::Object(map) = json_value {
+ assert!(map.contains_key("avro_data_base64"));
+ } else {
+ panic!("Expected JSON object");
+ }
+ } else {
+ panic!("Expected JSON payload");
+ }
+ }
+
+ #[test]
+ fn convert_format_should_transform_avro_to_text() {
+ let encoder = AvroStreamEncoder::default();
+
+ let avro_data = vec![1, 2, 3, 4, 5];
+ let result = encoder.convert_format(Payload::Avro(avro_data),
Schema::Text);
+
+ assert!(result.is_ok());
+ if let Ok(Payload::Text(text)) = result {
+ assert!(!text.is_empty());
+ } else {
+ panic!("Expected Text payload");
+ }
+ }
+
+ #[test]
+ fn config_should_have_sensible_defaults() {
+ let config = AvroEncoderConfig::default();
+
+ assert!(config.schema_path.is_none());
+ assert!(config.schema_json.is_none());
+ assert!(config.field_mappings.is_none());
+ }
+
+ #[test]
+ fn encoder_should_be_creatable_with_custom_config() {
+ let config = AvroEncoderConfig {
+ schema_path: Some(PathBuf::from("/path/to/schema.avsc")),
+ schema_json: Some(r#"{"type": "string"}"#.to_string()),
+ ..AvroEncoderConfig::default()
+ };
+
+ let encoder = AvroStreamEncoder::new(config.clone());
+
+ assert_eq!(encoder.config.schema_path, config.schema_path);
+ assert_eq!(encoder.config.schema_json, config.schema_json);
+ }
+
+ #[test]
+ fn try_new_should_fail_on_invalid_schema() {
+ let config = AvroEncoderConfig {
+ schema_json: Some("not a valid schema".to_string()),
+ ..AvroEncoderConfig::default()
+ };
+ let result = AvroStreamEncoder::try_new(config);
+ assert!(
+ result.is_err(),
+ "Expected try_new to fail with invalid schema"
+ );
+ }
+
+ #[test]
+ fn update_config_should_rollback_on_invalid_schema() {
+ let valid_schema = r#"{"type": "string"}"#.to_string();
+ let mut encoder = AvroStreamEncoder::new(AvroEncoderConfig {
+ schema_json: Some(valid_schema.clone()),
+ ..AvroEncoderConfig::default()
+ });
+
+ assert!(encoder.schema.is_some());
+
+ let old_config = encoder.config.clone();
+ let invalid_config = AvroEncoderConfig {
+ schema_json: Some("invalid schema".to_string()),
+ ..AvroEncoderConfig::default()
+ };
+
+ let result = encoder.update_config(invalid_config);
+ assert!(result.is_err(), "Expected update_config to fail");
+
+ // After rollback, config and schema should remain unchanged
+ assert_eq!(encoder.config.schema_json, old_config.schema_json);
+ assert!(encoder.schema.is_some());
+ }
+}
diff --git a/core/connectors/sdk/src/encoders/flatbuffer.rs
b/core/connectors/sdk/src/encoders/flatbuffer.rs
index 2b2812aef..3c617ecbf 100644
--- a/core/connectors/sdk/src/encoders/flatbuffer.rs
+++ b/core/connectors/sdk/src/encoders/flatbuffer.rs
@@ -238,6 +238,7 @@ impl StreamEncoder for FlatBufferStreamEncoder {
Payload::Raw(data) => self.encode_raw_to_flatbuffer(data),
Payload::FlatBuffer(data) => Ok(data),
Payload::Proto(text) => self.encode_text_to_flatbuffer(text),
+ Payload::Avro(data) => self.encode_raw_to_flatbuffer(data),
}
}
}
diff --git a/core/connectors/sdk/src/encoders/mod.rs
b/core/connectors/sdk/src/encoders/mod.rs
index c7c4cfd80..bb167c3cb 100644
--- a/core/connectors/sdk/src/encoders/mod.rs
+++ b/core/connectors/sdk/src/encoders/mod.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+pub mod avro;
pub mod flatbuffer;
pub mod json;
pub mod proto;
diff --git a/core/connectors/sdk/src/encoders/proto.rs
b/core/connectors/sdk/src/encoders/proto.rs
index 52daa2829..09ace6cc0 100644
--- a/core/connectors/sdk/src/encoders/proto.rs
+++ b/core/connectors/sdk/src/encoders/proto.rs
@@ -371,6 +371,10 @@ impl ProtoStreamEncoder {
"flatbuffer_size": data.len(),
"data": general_purpose::STANDARD.encode(&data)
}),
+ Payload::Avro(data) => simd_json::json!({
+ "avro_size": data.len(),
+ "data": general_purpose::STANDARD.encode(&data)
+ }),
};
if let simd_json::OwnedValue::Object(json_map) = json_value {
@@ -625,6 +629,13 @@ impl ProtoStreamEncoder {
),
data,
),
+ Payload::Avro(data) => (
+ format!(
+ "{}/google.protobuf.BytesValue",
+ self.config.format_options.type_url_prefix
+ ),
+ data,
+ ),
};
let any = Any {
@@ -644,6 +655,7 @@ impl ProtoStreamEncoder {
Payload::Raw(data) => Ok(data),
Payload::Proto(text) => Ok(text.into_bytes()),
Payload::FlatBuffer(data) => Ok(data),
+ Payload::Avro(data) => Ok(data),
}
}
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index a864407be..2289aa950 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -20,12 +20,12 @@
use async_trait::async_trait;
use base64::{self, Engine};
use decoders::{
- flatbuffer::FlatBufferStreamDecoder, json::JsonStreamDecoder,
proto::ProtoStreamDecoder,
- raw::RawStreamDecoder, text::TextStreamDecoder,
+ avro::AvroStreamDecoder, flatbuffer::FlatBufferStreamDecoder,
json::JsonStreamDecoder,
+ proto::ProtoStreamDecoder, raw::RawStreamDecoder, text::TextStreamDecoder,
};
use encoders::{
- flatbuffer::FlatBufferStreamEncoder, json::JsonStreamEncoder,
proto::ProtoStreamEncoder,
- raw::RawStreamEncoder, text::TextStreamEncoder,
+ avro::AvroStreamEncoder, flatbuffer::FlatBufferStreamEncoder,
json::JsonStreamEncoder,
+ proto::ProtoStreamEncoder, raw::RawStreamEncoder, text::TextStreamEncoder,
};
use iggy::prelude::{HeaderKey, HeaderValue};
use once_cell::sync::OnceCell;
@@ -144,6 +144,7 @@ pub enum Payload {
Text(String),
Proto(String),
FlatBuffer(Vec<u8>),
+ Avro(Vec<u8>),
}
impl Payload {
@@ -157,6 +158,7 @@ impl Payload {
Payload::Text(text) => Ok(text.into_bytes()),
Payload::Proto(text) => Ok(text.into_bytes()),
Payload::FlatBuffer(value) => Ok(value),
+ Payload::Avro(value) => Ok(value),
}
}
@@ -185,6 +187,7 @@ impl Payload {
Payload::Text(text) => Ok(text.as_bytes().to_vec()),
Payload::Proto(text) => Ok(text.as_bytes().to_vec()),
Payload::FlatBuffer(value) => Ok(value.clone()),
+ Payload::Avro(value) => Ok(value.clone()),
}
}
}
@@ -201,6 +204,7 @@ impl std::fmt::Display for Payload {
Payload::Text(text) => write!(f, "Text({text})"),
Payload::Proto(text) => write!(f, "Proto({text})"),
Payload::FlatBuffer(value) => write!(f, "FlatBuffer({} bytes)",
value.len()),
+ Payload::Avro(value) => write!(f, "Avro({} bytes)", value.len()),
}
}
}
@@ -222,6 +226,8 @@ pub enum Schema {
Proto,
#[strum(to_string = "flatbuffer")]
FlatBuffer,
+ #[strum(to_string = "avro")]
+ Avro,
}
impl Schema {
@@ -245,6 +251,7 @@ impl Schema {
Err(_) => Ok(Payload::Raw(value)),
},
Schema::FlatBuffer => Ok(Payload::FlatBuffer(value)),
+ Schema::Avro => Ok(Payload::Avro(value)),
}
}
@@ -255,6 +262,7 @@ impl Schema {
Schema::Text => Arc::new(TextStreamDecoder),
Schema::Proto => Arc::new(ProtoStreamDecoder::default()),
Schema::FlatBuffer => Arc::new(FlatBufferStreamDecoder::default()),
+ Schema::Avro => Arc::new(AvroStreamDecoder::default()),
}
}
@@ -265,6 +273,7 @@ impl Schema {
Schema::Text => Arc::new(TextStreamEncoder),
Schema::Proto => Arc::new(ProtoStreamEncoder::default()),
Schema::FlatBuffer => Arc::new(FlatBufferStreamEncoder::default()),
+ Schema::Avro => Arc::new(AvroStreamEncoder::default()),
}
}
}
diff --git a/core/connectors/sdk/src/transforms/avro_convert.rs
b/core/connectors/sdk/src/transforms/avro_convert.rs
new file mode 100644
index 000000000..f0242e9d9
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/avro_convert.rs
@@ -0,0 +1,398 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use super::{Transform, TransformType};
+use crate::decoders::avro::{AvroConfig, AvroStreamDecoder};
+use crate::encoders::avro::{AvroEncoderConfig, AvroStreamEncoder};
+use crate::{DecodedMessage, Error, Payload, Schema, TopicMetadata};
+use crate::{StreamDecoder, StreamEncoder};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AvroConvertConfig {
+ pub source_format: Schema,
+ pub target_format: Schema,
+ pub schema_path: Option<PathBuf>,
+ pub schema_json: Option<String>,
+ pub field_mappings: Option<HashMap<String, String>>,
+ pub conversion_options: AvroConversionOptions,
+}
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct AvroConversionOptions {
+ pub pretty_json: bool,
+ pub include_metadata: bool,
+ pub strict_mode: bool,
+}
+
+impl Default for AvroConvertConfig {
+ fn default() -> Self {
+ Self {
+ source_format: Schema::Avro,
+ target_format: Schema::Json,
+ schema_path: None,
+ schema_json: None,
+ field_mappings: None,
+ conversion_options: AvroConversionOptions::default(),
+ }
+ }
+}
+
+pub struct AvroConvert {
+ config: AvroConvertConfig,
+}
+
+impl AvroConvert {
+ pub fn new(config: AvroConvertConfig) -> Self {
+ Self { config }
+ }
+
+ pub fn new_default() -> Self {
+ Self::new(AvroConvertConfig::default())
+ }
+
+ fn apply_field_mappings(
+ &self,
+ payload: Payload,
+ field_mappings: &HashMap<String, String>,
+ ) -> Result<Payload, Error> {
+ match payload {
+ Payload::Json(json_value) => {
+ if let simd_json::OwnedValue::Object(mut map) = json_value {
+ let mut new_entries = Vec::new();
+
+ for (key, value) in map.iter() {
+ if let Some(new_key) = field_mappings.get(key) {
+ new_entries.push((new_key.clone(), value.clone()));
+ } else {
+ new_entries.push((key.clone(), value.clone()));
+ }
+ }
+
+ map.clear();
+ for (key, value) in new_entries {
+ map.insert(key, value);
+ }
+
+ Ok(Payload::Json(simd_json::OwnedValue::Object(map)))
+ } else {
+ Ok(Payload::Json(json_value))
+ }
+ }
+ other => Ok(other),
+ }
+ }
+}
+
+impl Transform for AvroConvert {
+ fn r#type(&self) -> TransformType {
+ TransformType::AvroConvert
+ }
+
+ fn transform(
+ &self,
+ _metadata: &TopicMetadata,
+ mut message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ if let Some(field_mappings) = &self.config.field_mappings {
+ message.payload = self.apply_field_mappings(message.payload,
field_mappings)?;
+ }
+
+ message.payload = match (&self.config.source_format,
&self.config.target_format) {
+ (Schema::Json, Schema::Avro) => {
+ let encoder_config = AvroEncoderConfig {
+ schema_path: self.config.schema_path.clone(),
+ schema_json: self.config.schema_json.clone(),
+ field_mappings: self.config.field_mappings.clone(),
+ };
+ let encoder = AvroStreamEncoder::new(encoder_config);
+ let encoded_bytes = encoder.encode(message.payload)?;
+ Payload::Avro(encoded_bytes)
+ }
+ (Schema::Avro, Schema::Json) => {
+ let decoder_config = AvroConfig {
+ schema_path: self.config.schema_path.clone(),
+ schema_json: self.config.schema_json.clone(),
+ field_mappings: self.config.field_mappings.clone(),
+ extract_as_json: true,
+ };
+ let decoder = AvroStreamDecoder::new(decoder_config);
+ if let Payload::Avro(bytes) = message.payload {
+ decoder.decode(bytes)?
+ } else {
+ return Err(Error::InvalidPayloadType);
+ }
+ }
+ (Schema::Avro, Schema::Text) => {
+ let encoder = AvroStreamEncoder::default();
+ encoder.convert_format(message.payload, Schema::Text)?
+ }
+ (Schema::Avro, Schema::Raw) => {
+ let encoder = AvroStreamEncoder::default();
+ encoder.convert_format(message.payload, Schema::Raw)?
+ }
+ (Schema::Text, Schema::Avro) => {
+ let encoder_config = AvroEncoderConfig {
+ schema_path: self.config.schema_path.clone(),
+ schema_json: self.config.schema_json.clone(),
+ field_mappings: self.config.field_mappings.clone(),
+ };
+ let encoder = AvroStreamEncoder::new(encoder_config);
+ let encoded_bytes = encoder.encode(message.payload)?;
+ Payload::Avro(encoded_bytes)
+ }
+ (Schema::Raw, Schema::Avro) => {
+ if let Payload::Raw(data) = message.payload {
+ Payload::Avro(data)
+ } else {
+ return Err(Error::InvalidPayloadType);
+ }
+ }
+ (source, target) if source == target => message.payload,
+ _ => {
+ // For unsupported conversions, pass through unchanged
+ message.payload
+ }
+ };
+
+ Ok(Some(message))
+ }
+}
+
+impl Default for AvroConvert {
+ fn default() -> Self {
+ Self::new_default()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::TopicMetadata;
+
+ fn create_test_message(payload: Payload) -> DecodedMessage {
+ DecodedMessage {
+ id: Some(123),
+ offset: Some(456),
+ checksum: Some(789),
+ timestamp: Some(1234567890),
+ origin_timestamp: Some(1234567890),
+ headers: None,
+ payload,
+ }
+ }
+
+ fn create_test_metadata() -> TopicMetadata {
+ TopicMetadata {
+ stream: "test_stream".to_string(),
+ topic: "test_topic".to_string(),
+ }
+ }
+
+ fn create_test_schema_json() -> String {
+ r#"{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "age", "type": "int"}
+ ]
+ }"#
+ .to_string()
+ }
+
+ fn encode_test_avro_data(schema_json: &str) -> Vec<u8> {
+ use apache_avro::Schema as AvroSchema;
+ use apache_avro::types::Value as AvroValue;
+ let schema = AvroSchema::parse_str(schema_json).unwrap();
+ let record = AvroValue::Record(vec![
+ ("name".to_string(), AvroValue::String("Alice".to_string())),
+ ("age".to_string(), AvroValue::Int(30)),
+ ]);
+ apache_avro::to_avro_datum(&schema, record).unwrap()
+ }
+
+ #[test]
+ fn transform_should_convert_avro_to_json_successfully() {
+ let schema_json = create_test_schema_json();
+ let avro_data = encode_test_avro_data(&schema_json);
+
+ let config = AvroConvertConfig {
+ source_format: Schema::Avro,
+ target_format: Schema::Json,
+ schema_json: Some(schema_json),
+ ..AvroConvertConfig::default()
+ };
+ let converter = AvroConvert::new(config);
+ let metadata = create_test_metadata();
+
+ let avro_payload = Payload::Avro(avro_data);
+ let message = create_test_message(avro_payload);
+
+ let result = converter.transform(&metadata, message);
+
+ assert!(result.is_ok());
+ if let Ok(Some(transformed_message)) = result {
+ if let Payload::Json(json_value) = transformed_message.payload {
+ if let simd_json::OwnedValue::Object(map) = json_value {
+ assert!(map.contains_key("name"));
+ assert!(map.contains_key("age"));
+ } else {
+ panic!("Expected JSON object");
+ }
+ } else {
+ panic!("Expected JSON payload");
+ }
+ } else {
+ panic!("Expected transformed message");
+ }
+ }
+
+ #[test]
+ fn transform_should_convert_json_to_avro_successfully() {
+ let schema_json = create_test_schema_json();
+
+ let config = AvroConvertConfig {
+ source_format: Schema::Json,
+ target_format: Schema::Avro,
+ schema_json: Some(schema_json),
+ ..AvroConvertConfig::default()
+ };
+ let converter = AvroConvert::new(config);
+ let metadata = create_test_metadata();
+
+ let json_payload = Payload::Json(simd_json::json!({
+ "name": "Bob",
+ "age": 25
+ }));
+ let message = create_test_message(json_payload);
+
+ let result = converter.transform(&metadata, message);
+
+ assert!(result.is_ok());
+ if let Ok(Some(transformed_message)) = result {
+ if let Payload::Avro(data) = transformed_message.payload {
+ assert!(!data.is_empty());
+ } else {
+ panic!("Expected Avro payload");
+ }
+ } else {
+ panic!("Expected transformed message");
+ }
+ }
+
+ #[test]
+ fn transform_should_convert_avro_to_text_successfully() {
+ let avro_data = vec![1, 2, 3, 4, 5];
+
+ let config = AvroConvertConfig {
+ source_format: Schema::Avro,
+ target_format: Schema::Text,
+ ..AvroConvertConfig::default()
+ };
+ let converter = AvroConvert::new(config);
+ let metadata = create_test_metadata();
+
+ let avro_payload = Payload::Avro(avro_data);
+ let message = create_test_message(avro_payload);
+
+ let result = converter.transform(&metadata, message);
+
+ assert!(result.is_ok());
+ if let Ok(Some(transformed_message)) = result {
+ if let Payload::Text(text) = transformed_message.payload {
+ assert!(!text.is_empty());
+ } else {
+ panic!("Expected Text payload");
+ }
+ } else {
+ panic!("Expected transformed message");
+ }
+ }
+
+ #[test]
+ fn transform_should_apply_field_mappings_during_conversion() {
+ let mut field_mappings = HashMap::new();
+ field_mappings.insert("name".to_string(), "full_name".to_string());
+
+ let schema_json = create_test_schema_json();
+ let avro_data = encode_test_avro_data(&schema_json);
+
+ let config = AvroConvertConfig {
+ source_format: Schema::Avro,
+ target_format: Schema::Json,
+ schema_json: Some(schema_json),
+ field_mappings: Some(field_mappings),
+ ..AvroConvertConfig::default()
+ };
+ let converter = AvroConvert::new(config);
+ let metadata = create_test_metadata();
+
+ let avro_payload = Payload::Avro(avro_data);
+ let message = create_test_message(avro_payload);
+
+ let result = converter.transform(&metadata, message);
+
+ assert!(result.is_ok());
+ if let Ok(Some(transformed_message)) = result {
+ if let Payload::Json(json_value) = transformed_message.payload {
+ if let simd_json::OwnedValue::Object(map) = json_value {
+ assert!(map.contains_key("full_name"));
+ assert!(!map.contains_key("name"));
+ } else {
+ panic!("Expected JSON object");
+ }
+ } else {
+ panic!("Expected JSON payload");
+ }
+ } else {
+ panic!("Expected transformed message");
+ }
+ }
+
+ #[test]
+ fn transform_should_pass_through_same_format() {
+ let config = AvroConvertConfig {
+ source_format: Schema::Avro,
+ target_format: Schema::Avro,
+ ..AvroConvertConfig::default()
+ };
+ let converter = AvroConvert::new(config);
+ let metadata = create_test_metadata();
+
+ let avro_data = vec![1, 2, 3, 4, 5];
+ let avro_payload = Payload::Avro(avro_data.clone());
+ let message = create_test_message(avro_payload);
+
+ let result = converter.transform(&metadata, message);
+
+ assert!(result.is_ok());
+ if let Ok(Some(transformed_message)) = result {
+ if let Payload::Avro(data) = transformed_message.payload {
+ assert_eq!(data, avro_data);
+ } else {
+ panic!("Expected Avro payload");
+ }
+ } else {
+ panic!("Expected transformed message");
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs
b/core/connectors/sdk/src/transforms/mod.rs
index d3feef125..56acc009b 100644
--- a/core/connectors/sdk/src/transforms/mod.rs
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -17,6 +17,7 @@
*/
mod add_fields;
+pub mod avro_convert;
mod delete_fields;
mod filter_fields;
pub mod flatbuffer_convert;
@@ -25,6 +26,7 @@ pub mod proto_convert;
mod update_fields;
use crate::{DecodedMessage, Error, TopicMetadata};
pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField};
+pub use avro_convert::{AvroConvert, AvroConvertConfig};
pub use delete_fields::{DeleteFields, DeleteFieldsConfig};
pub use filter_fields::{
FilterFields, FilterFieldsConfig, FilterPattern, KeyPattern as
FilterKeyPattern,
@@ -86,6 +88,7 @@ pub enum TransformType {
UpdateFields,
ProtoConvert,
FlatBufferConvert,
+ AvroConvert,
}
pub fn from_config(
@@ -123,5 +126,10 @@ pub fn from_config(
serde_json::from_value(raw.clone()).map_err(|_|
Error::InvalidConfig)?;
Ok(Arc::new(FlatBufferConvert::new(cfg)))
}
+ TransformType::AvroConvert => {
+ let cfg: AvroConvertConfig =
+ serde_json::from_value(raw.clone()).map_err(|_|
Error::InvalidConfig)?;
+ Ok(Arc::new(AvroConvert::new(cfg)))
+ }
}
}
diff --git a/core/connectors/sdk/src/transforms/proto_convert.rs
b/core/connectors/sdk/src/transforms/proto_convert.rs
index b8e558298..f1debc758 100644
--- a/core/connectors/sdk/src/transforms/proto_convert.rs
+++ b/core/connectors/sdk/src/transforms/proto_convert.rs
@@ -471,6 +471,15 @@ impl ProtoConvert {
Err(Error::InvalidPayloadType)
}
}
+ Schema::Avro => {
+ // Convert protobuf to raw bytes first, then wrap as Avro
+ let raw_payload = self.protobuf_to_raw(payload)?;
+ if let Payload::Raw(data) = raw_payload {
+ Ok(Payload::Avro(data))
+ } else {
+ Err(Error::InvalidPayloadType)
+ }
+ }
}
}
@@ -488,6 +497,14 @@ impl ProtoConvert {
Err(Error::InvalidPayloadType)
}
}
+ Schema::Avro => {
+ // Convert Avro to raw bytes first, then to protobuf
+ if let Payload::Avro(data) = payload {
+ self.raw_to_protobuf(Payload::Raw(data))
+ } else {
+ Err(Error::InvalidPayloadType)
+ }
+ }
}
}
diff --git a/core/connectors/sinks/http_sink/src/lib.rs
b/core/connectors/sinks/http_sink/src/lib.rs
index 6f86885b5..db98f634f 100644
--- a/core/connectors/sinks/http_sink/src/lib.rs
+++ b/core/connectors/sinks/http_sink/src/lib.rs
@@ -401,6 +401,14 @@ impl HttpSink {
serde_json::to_value(encoded)
.map_err(|e| Error::Serialization(format!("EncodedPayload:
{}", e)))
}
+ Payload::Avro(bytes) => {
+ let encoded = EncodedPayload {
+ data: general_purpose::STANDARD.encode(&bytes),
+ iggy_payload_encoding: ENCODING_BASE64,
+ };
+ serde_json::to_value(encoded)
+ .map_err(|e| Error::Serialization(format!("EncodedPayload:
{}", e)))
+ }
}
}