This is an automated email from the ASF dual-hosted git repository.

spetz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f55e9563 feat(connectors): implement Avro payload support with 
separate encoder, decoder and transform crates (#3141)
4f55e9563 is described below

commit 4f55e95638a721c4c992129379ee4b9b0a1002a8
Author: Tyooughtul <[email protected]>
AuthorDate: Fri May 1 20:09:38 2026 +0800

    feat(connectors): implement Avro payload support with separate encoder, 
decoder and transform crates (#3141)
    
    Closes #1846
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 core/connectors/runtime/src/configs/connectors.rs  |  21 +-
 core/connectors/runtime/src/sink.rs                |  28 +-
 core/connectors/runtime/src/source.rs              |  21 +-
 core/connectors/sdk/Cargo.toml                     |   1 +
 core/connectors/sdk/src/decoders/avro.rs           | 442 +++++++++++++++
 core/connectors/sdk/src/decoders/mod.rs            |   1 +
 core/connectors/sdk/src/encoders/avro.rs           | 604 +++++++++++++++++++++
 core/connectors/sdk/src/encoders/flatbuffer.rs     |   1 +
 core/connectors/sdk/src/encoders/mod.rs            |   1 +
 core/connectors/sdk/src/encoders/proto.rs          |  12 +
 core/connectors/sdk/src/lib.rs                     |  17 +-
 core/connectors/sdk/src/transforms/avro_convert.rs | 398 ++++++++++++++
 core/connectors/sdk/src/transforms/mod.rs          |   8 +
 .../connectors/sdk/src/transforms/proto_convert.rs |  17 +
 core/connectors/sinks/http_sink/src/lib.rs         |   8 +
 17 files changed, 1565 insertions(+), 17 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c213ee651..84c1cfac4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6796,6 +6796,7 @@ name = "iggy_connector_sdk"
 version = "0.3.0"
 dependencies = [
  "anyhow",
+ "apache-avro",
  "async-trait",
  "base64 0.22.1",
  "dashmap",
diff --git a/Cargo.toml b/Cargo.toml
index 6589c4b2e..1e8afafbb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -72,6 +72,7 @@ aes-gcm = "0.10.3"
 ahash = { version = "0.8.12", features = ["serde"] }
 aligned-vec = "0.6.4"
 anyhow = "1.0.102"
+apache-avro = "0.21.0"
 argon2 = "0.5.3"
 arrow = "57.3.0"
 arrow-array = "57.3.0"
diff --git a/core/connectors/runtime/src/configs/connectors.rs 
b/core/connectors/runtime/src/configs/connectors.rs
index 8d8f33097..70e4f18a9 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -32,6 +32,7 @@ use iggy_connector_sdk::transforms::TransformType;
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 use std::fmt::Formatter;
+use std::path::PathBuf;
 use strum::Display;
 
 #[derive(
@@ -180,6 +181,9 @@ pub struct StreamConsumerConfig {
     pub topics: Vec<String>,
     #[config_env(leaf)]
     pub schema: Schema,
+    pub avro_schema_json: Option<String>,
+    #[config_env(leaf)]
+    pub avro_schema_path: Option<PathBuf>,
     pub batch_length: Option<u32>,
     pub poll_interval: Option<String>,
     pub consumer_group: Option<String>,
@@ -191,6 +195,9 @@ pub struct StreamProducerConfig {
     pub topic: String,
     #[config_env(leaf)]
     pub schema: Schema,
+    pub avro_schema_json: Option<String>,
+    #[config_env(leaf)]
+    pub avro_schema_path: Option<PathBuf>,
     pub batch_length: Option<u32>,
     pub linger_time: Option<String>,
 }
@@ -346,7 +353,7 @@ impl std::fmt::Display for StreamConsumerConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ stream: {}, topics: {}, schema: {:?}, batch_length: {:?}, 
poll_interval: {:?}, consumer_group: {:?} }}",
+            "{{ stream: {}, topics: {}, schema: {:?}, avro_schema_json: {:?}, 
avro_schema_path: {:?}, batch_length: {:?}, poll_interval: {:?}, 
consumer_group: {:?} }}",
             self.stream,
             self.topics
                 .iter()
@@ -354,6 +361,8 @@ impl std::fmt::Display for StreamConsumerConfig {
                 .collect::<Vec<&str>>()
                 .join(", "),
             self.schema,
+            self.avro_schema_json,
+            self.avro_schema_path,
             self.batch_length,
             self.poll_interval,
             self.consumer_group
@@ -365,8 +374,14 @@ impl std::fmt::Display for StreamProducerConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ stream: {}, topic: {}, schema: {:?}, batch_length: {:?}, 
linger_time: {:?} }}",
-            self.stream, self.topic, self.schema, self.batch_length, 
self.linger_time
+            "{{ stream: {}, topic: {}, schema: {:?}, avro_schema_json: {:?}, 
avro_schema_path: {:?}, batch_length: {:?}, linger_time: {:?} }}",
+            self.stream,
+            self.topic,
+            self.schema,
+            self.avro_schema_json,
+            self.avro_schema_path,
+            self.batch_length,
+            self.linger_time
         )
     }
 }
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index b6c8965d3..a0ab25b1a 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -31,9 +31,10 @@ use iggy::prelude::{
     AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, 
IggyMessage,
     PollingStrategy,
 };
+use iggy_connector_sdk::decoders::avro::{AvroConfig, AvroStreamDecoder};
 use iggy_connector_sdk::{
-    DecodedMessage, MessagesMetadata, RawMessage, RawMessages, 
ReceivedMessage, StreamDecoder,
-    TopicMetadata, sink::ConsumeCallback, transforms::Transform,
+    DecodedMessage, MessagesMetadata, RawMessage, RawMessages, 
ReceivedMessage, Schema,
+    StreamDecoder, TopicMetadata, sink::ConsumeCallback, transforms::Transform,
 };
 use std::{
     collections::HashMap,
@@ -426,12 +427,23 @@ pub(crate) async fn setup_sink_consumers(
                 .batch_length(batch_length)
                 .build();
             consumer.init().await?;
-            consumers.push((
-                consumer,
-                stream.schema.decoder(),
-                batch_length,
-                transforms.clone(),
-            ));
+            let decoder: Arc<dyn StreamDecoder> = match stream.schema {
+                Schema::Avro => {
+                    let config = AvroConfig {
+                        schema_json: stream.avro_schema_json.clone(),
+                        schema_path: stream.avro_schema_path.clone(),
+                        ..AvroConfig::default()
+                    };
+                    
Arc::new(AvroStreamDecoder::try_new(config).map_err(|error| {
+                        RuntimeError::InvalidConfiguration(format!(
+                            "Failed to create Avro decoder for stream '{}': 
{error}",
+                            stream.stream
+                        ))
+                    })?)
+                }
+                other => other.decoder(),
+            };
+            consumers.push((consumer, decoder, batch_length, 
transforms.clone()));
         }
     }
     Ok(consumers)
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index c4af20450..2c20eb1b0 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -24,8 +24,9 @@ use iggy::prelude::{
     DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, 
IggyMessage,
     IggyProducer,
 };
+use iggy_connector_sdk::encoders::avro::{AvroEncoderConfig, AvroStreamEncoder};
 use iggy_connector_sdk::{
-    ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder, 
TopicMetadata,
+    ConnectorState, DecodedMessage, Error, ProducedMessages, Schema, 
StreamEncoder, TopicMetadata,
     source::HandleCallback, transforms::Transform,
 };
 use once_cell::sync::Lazy;
@@ -259,7 +260,23 @@ pub(crate) async fn setup_source_producer(
             )
             .build();
         producer.init().await?;
-        last_encoder = Some(stream.schema.encoder());
+        let encoder: Arc<dyn StreamEncoder> = match stream.schema {
+            Schema::Avro => {
+                let config = AvroEncoderConfig {
+                    schema_json: stream.avro_schema_json.clone(),
+                    schema_path: stream.avro_schema_path.clone(),
+                    ..AvroEncoderConfig::default()
+                };
+                Arc::new(AvroStreamEncoder::try_new(config).map_err(|error| {
+                    RuntimeError::InvalidConfiguration(format!(
+                        "Failed to create Avro encoder for stream '{}': 
{error}",
+                        stream.stream
+                    ))
+                })?)
+            }
+            other => other.encoder(),
+        };
+        last_encoder = Some(encoder);
         last_producer = Some(producer);
     }
 
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 364b7e6be..0927bdf60 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -38,6 +38,7 @@ api = ["strum"]
 
 [dependencies]
 anyhow = { workspace = true }
+apache-avro = { workspace = true }
 async-trait = { workspace = true }
 base64 = { workspace = true }
 dashmap = { workspace = true }
diff --git a/core/connectors/sdk/src/decoders/avro.rs 
b/core/connectors/sdk/src/decoders/avro.rs
new file mode 100644
index 000000000..67050f716
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/avro.rs
@@ -0,0 +1,442 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+use apache_avro::Schema as AvroSchema;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tracing::{error, info};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AvroConfig {
+    pub schema_path: Option<PathBuf>,
+    pub schema_json: Option<String>,
+    pub field_mappings: Option<HashMap<String, String>>,
+    pub extract_as_json: bool,
+}
+
+impl Default for AvroConfig {
+    fn default() -> Self {
+        Self {
+            schema_path: None,
+            schema_json: None,
+            field_mappings: None,
+            extract_as_json: true,
+        }
+    }
+}
+
+pub struct AvroStreamDecoder {
+    config: AvroConfig,
+    schema: Option<AvroSchema>,
+}
+
+impl AvroStreamDecoder {
+    /// Use [`Self::try_new`] for fail-fast behaviour.
+    pub fn new(config: AvroConfig) -> Self {
+        let mut decoder = Self {
+            config,
+            schema: None,
+        };
+        if let Err(e) = decoder.load_schema() {
+            error!("Failed to load Avro schema during decoder creation: {}", 
e);
+        }
+        decoder
+    }
+
+    pub fn try_new(config: AvroConfig) -> Result<Self, Error> {
+        let mut decoder = Self {
+            config,
+            schema: None,
+        };
+        decoder.load_schema()?;
+        Ok(decoder)
+    }
+
+    pub fn new_default() -> Self {
+        Self::new(AvroConfig::default())
+    }
+
+    pub fn update_config(&mut self, config: AvroConfig) -> Result<(), Error> {
+        let old_config = std::mem::replace(&mut self.config, config);
+        if let Err(e) = self.load_schema() {
+            // Rollback on failure to keep config/schema consistent
+            self.config = old_config;
+            return Err(e);
+        }
+        Ok(())
+    }
+
+    fn load_schema(&mut self) -> Result<(), Error> {
+        if let Some(schema_json) = &self.config.schema_json {
+            match AvroSchema::parse_str(schema_json) {
+                Ok(schema) => {
+                    info!("Loaded Avro schema from inline JSON");
+                    self.schema = Some(schema);
+                    return Ok(());
+                }
+                Err(e) => {
+                    error!("Failed to parse inline Avro schema: {}", e);
+                    return Err(Error::InvalidConfigValue(format!(
+                        "Invalid Avro schema JSON: {e}"
+                    )));
+                }
+            }
+        }
+
+        if let Some(schema_path) = &self.config.schema_path {
+            match std::fs::read_to_string(schema_path) {
+                Ok(schema_content) => match 
AvroSchema::parse_str(&schema_content) {
+                    Ok(schema) => {
+                        info!("Loaded Avro schema from file: {:?}", 
schema_path);
+                        self.schema = Some(schema);
+                        return Ok(());
+                    }
+                    Err(e) => {
+                        error!("Failed to parse Avro schema file: {}", e);
+                        return Err(Error::InvalidConfigValue(format!(
+                            "Invalid Avro schema file: {e}"
+                        )));
+                    }
+                },
+                Err(e) => {
+                    error!("Failed to read Avro schema file: {}", e);
+                    return Err(Error::InvalidConfigValue(format!(
+                        "Cannot read schema file: {e}"
+                    )));
+                }
+            }
+        }
+
+        self.schema = None;
+        Ok(())
+    }
+
+    fn decode_as_json(&self, payload: &[u8]) -> Result<Payload, Error> {
+        let schema = self.schema.as_ref().ok_or_else(|| {
+            error!("Cannot decode Avro to JSON without a schema");
+            Error::InvalidConfigValue("Avro schema is required for JSON 
extraction".to_string())
+        })?;
+
+        let mut reader = payload;
+        let avro_value = apache_avro::from_avro_datum(schema, &mut reader, 
None).map_err(|e| {
+            error!("Failed to decode Avro datum: {}", e);
+            Error::CannotDecode(Schema::Avro)
+        })?;
+
+        if !reader.is_empty() {
+            error!(
+                "Avro payload contains trailing bytes after datum: {} bytes 
remaining",
+                reader.len()
+            );
+            return Err(Error::CannotDecode(Schema::Avro));
+        }
+
+        let serde_value = serde_json::Value::try_from(avro_value).map_err(|e| {
+            error!("Failed to convert Avro value to JSON: {}", e);
+            Error::CannotDecode(Schema::Avro)
+        })?;
+
+        let mut json_bytes = serde_json::to_vec(&serde_value).map_err(|e| {
+            error!("Failed to serialize JSON value: {}", e);
+            Error::CannotDecode(Schema::Avro)
+        })?;
+
+        let json_value = simd_json::to_owned_value(&mut 
json_bytes).map_err(|e| {
+            error!("Failed to parse JSON into simd_json: {}", e);
+            Error::CannotDecode(Schema::Avro)
+        })?;
+
+        let transformed = 
self.apply_field_transformations(Payload::Json(json_value))?;
+        Ok(transformed)
+    }
+
+    fn decode_as_raw(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Avro(payload))
+    }
+
+    fn apply_field_transformations(&self, payload: Payload) -> Result<Payload, 
Error> {
+        if let Some(mappings) = &self.config.field_mappings {
+            match payload {
+                Payload::Json(json_value) => {
+                    if let simd_json::OwnedValue::Object(mut map) = json_value 
{
+                        let mut new_entries = Vec::new();
+
+                        for (key, value) in map.iter() {
+                            if let Some(new_key) = mappings.get(key) {
+                                new_entries.push((new_key.clone(), 
value.clone()));
+                            } else {
+                                new_entries.push((key.clone(), value.clone()));
+                            }
+                        }
+
+                        map.clear();
+                        for (key, value) in new_entries {
+                            map.insert(key, value);
+                        }
+
+                        Ok(Payload::Json(simd_json::OwnedValue::Object(map)))
+                    } else {
+                        Ok(Payload::Json(json_value))
+                    }
+                }
+                other => Ok(other),
+            }
+        } else {
+            Ok(payload)
+        }
+    }
+}
+
+impl StreamDecoder for AvroStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Avro
+    }
+
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+        if payload.is_empty() {
+            return Err(Error::InvalidPayloadType);
+        }
+
+        if self.config.extract_as_json {
+            self.decode_as_json(&payload)
+        } else {
+            self.decode_as_raw(payload)
+        }
+    }
+}
+
+impl Default for AvroStreamDecoder {
+    fn default() -> Self {
+        Self::new_default()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn create_test_schema_json() -> String {
+        r#"{
+            "type": "record",
+            "name": "User",
+            "fields": [
+                {"name": "name", "type": "string"},
+                {"name": "age", "type": "int"}
+            ]
+        }"#
+        .to_string()
+    }
+
+    fn encode_test_avro_data(schema_json: &str) -> Vec<u8> {
+        use apache_avro::types::Value as AvroValue;
+        let schema = AvroSchema::parse_str(schema_json).unwrap();
+        let record = AvroValue::Record(vec![
+            ("name".to_string(), AvroValue::String("Alice".to_string())),
+            ("age".to_string(), AvroValue::Int(30)),
+        ]);
+        apache_avro::to_avro_datum(&schema, record).unwrap()
+    }
+
+    #[test]
+    fn decode_should_fail_given_empty_payload() {
+        let decoder = AvroStreamDecoder::default();
+        let result = decoder.decode(vec![]);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn decode_should_return_avro_payload_when_extract_as_json_is_disabled() {
+        let config = AvroConfig {
+            extract_as_json: false,
+            ..AvroConfig::default()
+        };
+        let decoder = AvroStreamDecoder::new(config);
+
+        let test_data = b"fake_avro_data".to_vec();
+        let result = decoder.decode(test_data.clone());
+
+        assert!(result.is_ok());
+        if let Ok(Payload::Avro(data)) = result {
+            assert_eq!(data, test_data);
+        } else {
+            panic!("Expected Avro payload");
+        }
+    }
+
+    #[test]
+    fn decode_should_decode_avro_to_json_with_schema() {
+        let schema_json = create_test_schema_json();
+        let avro_data = encode_test_avro_data(&schema_json);
+
+        let config = AvroConfig {
+            schema_json: Some(schema_json),
+            extract_as_json: true,
+            ..AvroConfig::default()
+        };
+        let decoder = AvroStreamDecoder::new(config);
+
+        let result = decoder.decode(avro_data);
+
+        assert!(result.is_ok());
+        if let Ok(Payload::Json(json_value)) = result {
+            if let simd_json::OwnedValue::Object(map) = &json_value {
+                assert!(map.contains_key("name"));
+                assert!(map.contains_key("age"));
+            } else {
+                panic!("Expected JSON object");
+            }
+        } else {
+            panic!("Expected JSON payload");
+        }
+    }
+
+    #[test]
+    fn decode_should_apply_field_mappings_when_configured() {
+        let schema_json = create_test_schema_json();
+        let avro_data = encode_test_avro_data(&schema_json);
+
+        let mut field_mappings = HashMap::new();
+        field_mappings.insert("name".to_string(), "full_name".to_string());
+
+        let config = AvroConfig {
+            schema_json: Some(schema_json),
+            extract_as_json: true,
+            field_mappings: Some(field_mappings),
+            ..AvroConfig::default()
+        };
+        let decoder = AvroStreamDecoder::new(config);
+
+        let result = decoder.decode(avro_data);
+
+        assert!(result.is_ok());
+        if let Ok(Payload::Json(json_value)) = result {
+            if let simd_json::OwnedValue::Object(map) = &json_value {
+                assert!(map.contains_key("full_name"));
+                assert!(!map.contains_key("name"));
+            } else {
+                panic!("Expected JSON object");
+            }
+        } else {
+            panic!("Expected JSON payload");
+        }
+    }
+
+    #[test]
+    fn config_should_have_sensible_defaults() {
+        let config = AvroConfig::default();
+
+        assert!(config.schema_path.is_none());
+        assert!(config.schema_json.is_none());
+        assert!(config.field_mappings.is_none());
+        assert!(config.extract_as_json);
+    }
+
+    #[test]
+    fn decoder_should_be_creatable_with_custom_config() {
+        let config = AvroConfig {
+            schema_path: Some(PathBuf::from("/path/to/schema.avsc")),
+            schema_json: Some(r#"{"type": "string"}"#.to_string()),
+            extract_as_json: false,
+            ..AvroConfig::default()
+        };
+
+        let decoder = AvroStreamDecoder::new(config.clone());
+
+        assert_eq!(decoder.config.schema_path, config.schema_path);
+        assert_eq!(decoder.config.schema_json, config.schema_json);
+        assert_eq!(decoder.config.extract_as_json, config.extract_as_json);
+    }
+
+    #[test]
+    fn decode_should_fail_without_schema_when_extract_as_json_is_true() {
+        let config = AvroConfig {
+            schema_json: None,
+            schema_path: None,
+            extract_as_json: true,
+            ..AvroConfig::default()
+        };
+        let decoder = AvroStreamDecoder::new(config);
+
+        let result = decoder.decode(b"{\"name\": \"test\"}".to_vec());
+        assert!(
+            result.is_err(),
+            "Expected error when schema is missing and extract_as_json=true"
+        );
+    }
+
+    #[test]
+    fn try_new_should_fail_on_invalid_schema() {
+        let config = AvroConfig {
+            schema_json: Some("not a valid schema".to_string()),
+            ..AvroConfig::default()
+        };
+        let result = AvroStreamDecoder::try_new(config);
+        assert!(
+            result.is_err(),
+            "Expected try_new to fail with invalid schema"
+        );
+    }
+
+    #[test]
+    fn update_config_should_rollback_on_invalid_schema() {
+        let valid_schema = create_test_schema_json();
+        let mut decoder = AvroStreamDecoder::new(AvroConfig {
+            schema_json: Some(valid_schema.clone()),
+            extract_as_json: true,
+            ..AvroConfig::default()
+        });
+
+        assert!(decoder.schema.is_some());
+
+        let old_config = decoder.config.clone();
+        let invalid_config = AvroConfig {
+            schema_json: Some("invalid schema".to_string()),
+            extract_as_json: true,
+            ..AvroConfig::default()
+        };
+
+        let result = decoder.update_config(invalid_config);
+        assert!(result.is_err(), "Expected update_config to fail");
+
+        // After rollback, config and schema should remain unchanged
+        assert_eq!(decoder.config.schema_json, old_config.schema_json);
+        assert!(decoder.schema.is_some());
+    }
+
+    #[test]
+    fn decode_should_reject_trailing_bytes() {
+        let schema_json = create_test_schema_json();
+        let mut avro_data = encode_test_avro_data(&schema_json);
+        avro_data.extend_from_slice(b"trailing garbage");
+
+        let config = AvroConfig {
+            schema_json: Some(schema_json),
+            extract_as_json: true,
+            ..AvroConfig::default()
+        };
+        let decoder = AvroStreamDecoder::new(config);
+
+        let result = decoder.decode(avro_data);
+        assert!(
+            result.is_err(),
+            "Expected decode to fail when payload has trailing bytes"
+        );
+    }
+}
diff --git a/core/connectors/sdk/src/decoders/mod.rs 
b/core/connectors/sdk/src/decoders/mod.rs
index c7c4cfd80..bb167c3cb 100644
--- a/core/connectors/sdk/src/decoders/mod.rs
+++ b/core/connectors/sdk/src/decoders/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod avro;
 pub mod flatbuffer;
 pub mod json;
 pub mod proto;
diff --git a/core/connectors/sdk/src/encoders/avro.rs 
b/core/connectors/sdk/src/encoders/avro.rs
new file mode 100644
index 000000000..ece592c53
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/avro.rs
@@ -0,0 +1,604 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder, 
convert::owned_value_to_serde_json};
+use apache_avro::Schema as AvroSchema;
+use base64::Engine;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tracing::{error, info};
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct AvroEncoderConfig {
+    pub schema_path: Option<PathBuf>,
+    pub schema_json: Option<String>,
+    pub field_mappings: Option<HashMap<String, String>>,
+}
+
+pub struct AvroStreamEncoder {
+    config: AvroEncoderConfig,
+    schema: Option<AvroSchema>,
+}
+
+impl AvroStreamEncoder {
+    /// If you need fail-fast behaviour,
+    /// use [`try_new`](Self::try_new) instead.
+    pub fn new(config: AvroEncoderConfig) -> Self {
+        let mut encoder = Self {
+            config,
+            schema: None,
+        };
+        if let Err(e) = encoder.load_schema() {
+            error!("Failed to load Avro schema during encoder creation: {}", 
e);
+        }
+        encoder
+    }
+
+    pub fn try_new(config: AvroEncoderConfig) -> Result<Self, Error> {
+        let mut encoder = Self {
+            config,
+            schema: None,
+        };
+        encoder.load_schema()?;
+        Ok(encoder)
+    }
+
+    pub fn new_default() -> Self {
+        Self::new(AvroEncoderConfig::default())
+    }
+
+    pub fn update_config(&mut self, config: AvroEncoderConfig) -> Result<(), 
Error> {
+        let old_config = std::mem::replace(&mut self.config, config);
+        if let Err(e) = self.load_schema() {
+            // Rollback on failure to keep config/schema consistent
+            self.config = old_config;
+            return Err(e);
+        }
+        Ok(())
+    }
+
+    fn load_schema(&mut self) -> Result<(), Error> {
+        if let Some(schema_json) = &self.config.schema_json {
+            match AvroSchema::parse_str(schema_json) {
+                Ok(schema) => {
+                    info!("Loaded Avro schema from inline JSON");
+                    self.schema = Some(schema);
+                    return Ok(());
+                }
+                Err(e) => {
+                    error!("Failed to parse inline Avro schema: {}", e);
+                    return Err(Error::InvalidConfigValue(format!(
+                        "Invalid Avro schema JSON: {e}"
+                    )));
+                }
+            }
+        }
+
+        if let Some(schema_path) = &self.config.schema_path {
+            match std::fs::read_to_string(schema_path) {
+                Ok(schema_content) => match 
AvroSchema::parse_str(&schema_content) {
+                    Ok(schema) => {
+                        info!("Loaded Avro schema from file: {:?}", 
schema_path);
+                        self.schema = Some(schema);
+                        return Ok(());
+                    }
+                    Err(e) => {
+                        error!("Failed to parse Avro schema file: {}", e);
+                        return Err(Error::InvalidConfigValue(format!(
+                            "Invalid Avro schema file: {e}"
+                        )));
+                    }
+                },
+                Err(e) => {
+                    error!("Failed to read Avro schema file: {}", e);
+                    return Err(Error::InvalidConfigValue(format!(
+                        "Cannot read schema file: {e}"
+                    )));
+                }
+            }
+        }
+
+        self.schema = None;
+        Ok(())
+    }
+
+    fn apply_field_transformations(&self, payload: Payload) -> Result<Payload, 
Error> {
+        if let Some(mappings) = &self.config.field_mappings {
+            match payload {
+                Payload::Json(json_value) => {
+                    if let simd_json::OwnedValue::Object(mut map) = json_value 
{
+                        let mut new_entries = Vec::new();
+
+                        for (key, value) in map.iter() {
+                            if let Some(new_key) = mappings.get(key) {
+                                new_entries.push((new_key.clone(), 
value.clone()));
+                            } else {
+                                new_entries.push((key.clone(), value.clone()));
+                            }
+                        }
+
+                        map.clear();
+                        for (key, value) in new_entries {
+                            map.insert(key, value);
+                        }
+
+                        Ok(Payload::Json(simd_json::OwnedValue::Object(map)))
+                    } else {
+                        Ok(Payload::Json(json_value))
+                    }
+                }
+                other => Ok(other),
+            }
+        } else {
+            Ok(payload)
+        }
+    }
+
+    fn encode_json_to_avro(&self, json_value: simd_json::OwnedValue) -> 
Result<Vec<u8>, Error> {
+        let schema = self.schema.as_ref().ok_or_else(|| {
+            error!("Cannot encode JSON to Avro without a schema");
+            Error::InvalidConfigValue("Avro schema is required for 
encoding".to_string())
+        })?;
+
+        let serde_value = owned_value_to_serde_json(&json_value);
+        let avro_value = Self::serde_json_to_avro_value(serde_value, schema)?;
+
+        apache_avro::to_avro_datum(schema, avro_value).map_err(|e| {
+            error!("Failed to encode Avro datum: {}", e);
+            Error::Serialization(format!("Avro encoding failed: {e}"))
+        })
+    }
+
+    fn serde_json_to_avro_value(
+        value: serde_json::Value,
+        schema: &apache_avro::Schema,
+    ) -> Result<apache_avro::types::Value, Error> {
+        use apache_avro::Schema as AvroSchema;
+        use apache_avro::types::Value as AvroValue;
+
+        match (value, schema) {
+            (serde_json::Value::Object(map), 
AvroSchema::Record(record_schema)) => {
+                let mut record = Vec::new();
+                for field in &record_schema.fields {
+                    let field_value = map
+                        .get(&field.name)
+                        .cloned()
+                        .unwrap_or(serde_json::Value::Null);
+                    record.push((
+                        field.name.clone(),
+                        Self::serde_json_to_avro_value(field_value, 
&field.schema)?,
+                    ));
+                }
+                Ok(AvroValue::Record(record))
+            }
+            (serde_json::Value::Array(arr), AvroSchema::Array(array_schema)) 
=> {
+                Ok(AvroValue::Array(
+                    arr.into_iter()
+                        .map(|v| Self::serde_json_to_avro_value(v, 
&array_schema.items))
+                        .collect::<Result<Vec<_>, _>>()?,
+                ))
+            }
+            (serde_json::Value::Object(map), AvroSchema::Map(map_schema)) => 
Ok(AvroValue::Map(
+                map.into_iter()
+                    .map(|(k, v)| {
+                        Self::serde_json_to_avro_value(v, 
&map_schema.types).map(|av| (k, av))
+                    })
+                    .collect::<Result<_, _>>()?,
+            )),
+            (value, AvroSchema::Union(union_schema)) => {
+                let schemas = union_schema.variants();
+                for (idx, variant_schema) in schemas.iter().enumerate() {
+                    match Self::serde_json_to_avro_value(value.clone(), 
variant_schema) {
+                        Ok(avro_val) if avro_val.validate(variant_schema) => {
+                            return Ok(AvroValue::Union(idx as u32, 
Box::new(avro_val)));
+                        }
+                        _ => continue,
+                    }
+                }
+                Err(Error::InvalidPayloadType)
+            }
+            (serde_json::Value::Null, AvroSchema::Null) => Ok(AvroValue::Null),
+            (serde_json::Value::Bool(b), AvroSchema::Boolean) => 
Ok(AvroValue::Boolean(b)),
+            (serde_json::Value::Number(n), AvroSchema::Int) => {
+                let v = n.as_i64().ok_or_else(|| {
+                    Error::Serialization(format!("Cannot convert JSON number 
to Avro Int: {n}"))
+                })?;
+                let v_i32 = i32::try_from(v).map_err(|_| {
+                    Error::Serialization(format!("JSON number {v} out of range 
for Avro Int (i32)"))
+                })?;
+                Ok(AvroValue::Int(v_i32))
+            }
+            (serde_json::Value::Number(n), AvroSchema::Long) => {
+                let v = n.as_i64().ok_or_else(|| {
+                    Error::Serialization(format!("Cannot convert JSON number 
to Avro Long: {n}"))
+                })?;
+                Ok(AvroValue::Long(v))
+            }
+            (serde_json::Value::Number(n), AvroSchema::Float) => {
+                let v = n.as_f64().ok_or_else(|| {
+                    Error::Serialization(format!("Cannot convert JSON number 
to Avro Float: {n}"))
+                })?;
+                Ok(AvroValue::Float(v as f32))
+            }
+            (serde_json::Value::Number(n), AvroSchema::Double) => {
+                let v = n.as_f64().ok_or_else(|| {
+                    Error::Serialization(format!("Cannot convert JSON number 
to Avro Double: {n}"))
+                })?;
+                Ok(AvroValue::Double(v))
+            }
+            (serde_json::Value::String(s), AvroSchema::String) => 
Ok(AvroValue::String(s)),
+            (serde_json::Value::String(s), AvroSchema::Bytes) => {
+                Ok(AvroValue::Bytes(s.into_bytes()))
+            }
+            (serde_json::Value::Array(arr), AvroSchema::Bytes) => {
+                let mut bytes = Vec::with_capacity(arr.len());
+                for (i, v) in arr.into_iter().enumerate() {
+                    let n = v.as_u64().ok_or_else(|| {
+                        Error::Serialization(format!(
+                            "Bytes array element at index {i} is not a valid 
u8: {v}"
+                        ))
+                    })?;
+                    let b = u8::try_from(n).map_err(|_| {
+                        Error::Serialization(format!(
+                            "Bytes array element at index {i} out of u8 range: 
{n}"
+                        ))
+                    })?;
+                    bytes.push(b);
+                }
+                Ok(AvroValue::Bytes(bytes))
+            }
+
+            (value, schema) => {
+                let avro_val: AvroValue = value.into();
+                if avro_val.validate(schema) {
+                    Ok(avro_val)
+                } else {
+                    Err(Error::Serialization(format!(
+                        "JSON value does not match Avro schema: {schema:?}"
+                    )))
+                }
+            }
+        }
+    }
+
+    fn encode_text_to_avro(&self, text: String) -> Result<Vec<u8>, Error> {
+        let mut text_bytes = text.into_bytes();
+        if let Ok(json_value) = simd_json::to_owned_value(&mut text_bytes) {
+            return self.encode_json_to_avro(json_value);
+        }
+
+        Err(Error::InvalidJsonPayload)
+    }
+
+    fn encode_raw_to_avro(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
+        Ok(data)
+    }
+
+    pub fn convert_format(
+        &self,
+        payload: Payload,
+        target_format: Schema,
+    ) -> Result<Payload, Error> {
+        match (payload, target_format) {
+            (Payload::Avro(data), Schema::Json) => {
+                let mut data_copy = data.clone();
+                let json_value = simd_json::to_owned_value(&mut data_copy)
+                    .unwrap_or_else(|_| simd_json::json!({ "avro_data_base64": 
base64::engine::general_purpose::STANDARD.encode(&data) }));
+                Ok(Payload::Json(json_value))
+            }
+            (Payload::Avro(data), Schema::Text) => {
+                let base64_data = 
base64::engine::general_purpose::STANDARD.encode(&data);
+                Ok(Payload::Text(base64_data))
+            }
+            (Payload::Avro(data), Schema::Raw) => Ok(Payload::Raw(data)),
+
+            (Payload::Json(json), Schema::Text) => {
+                let text =
+                    simd_json::to_string_pretty(&json).map_err(|_| 
Error::InvalidJsonPayload)?;
+                Ok(Payload::Text(text))
+            }
+            (Payload::Json(json), Schema::Raw) => {
+                let bytes = simd_json::to_vec(&json).map_err(|_| 
Error::InvalidJsonPayload)?;
+                Ok(Payload::Raw(bytes))
+            }
+
+            (Payload::Text(text), Schema::Json) => {
+                let mut text_bytes = text.into_bytes();
+                let json_value = simd_json::to_owned_value(&mut text_bytes)
+                    .map_err(|_| Error::InvalidJsonPayload)?;
+                Ok(Payload::Json(json_value))
+            }
+            (Payload::Text(text), Schema::Raw) => 
Ok(Payload::Raw(text.into_bytes())),
+
+            (Payload::Raw(data), Schema::Text) => {
+                let text = String::from_utf8(data).map_err(|_| 
Error::InvalidTextPayload)?;
+                Ok(Payload::Text(text))
+            }
+            (Payload::Raw(mut data), Schema::Json) => {
+                let json_value =
+                    simd_json::to_owned_value(&mut data).map_err(|_| 
Error::InvalidJsonPayload)?;
+                Ok(Payload::Json(json_value))
+            }
+
+            (payload, _) => Ok(payload),
+        }
+    }
+}
+
+impl StreamEncoder for AvroStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Avro
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        let transformed_payload = self.apply_field_transformations(payload)?;
+
+        match transformed_payload {
+            Payload::Json(json_value) => self.encode_json_to_avro(json_value),
+            Payload::Text(text) => self.encode_text_to_avro(text),
+            Payload::Raw(data) => self.encode_raw_to_avro(data),
+            Payload::Avro(data) => Ok(data),
+            Payload::Proto(text) => self.encode_text_to_avro(text),
+            Payload::FlatBuffer(data) => self.encode_raw_to_avro(data),
+        }
+    }
+}
+
+impl Default for AvroStreamEncoder {
+    fn default() -> Self {
+        Self::new_default()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn create_test_schema_json() -> String {
+        r#"{
+            "type": "record",
+            "name": "User",
+            "fields": [
+                {"name": "name", "type": "string"},
+                {"name": "age", "type": "int"}
+            ]
+        }"#
+        .to_string()
+    }
+
+    #[test]
+    fn encode_should_handle_json_payload_with_schema() {
+        let schema_json = create_test_schema_json();
+        let config = AvroEncoderConfig {
+            schema_json: Some(schema_json),
+            ..AvroEncoderConfig::default()
+        };
+        let encoder = AvroStreamEncoder::new(config);
+
+        let json_value = simd_json::json!({
+            "name": "Alice",
+            "age": 30
+        });
+
+        let result = encoder.encode(Payload::Json(json_value));
+
+        assert!(result.is_ok());
+        let encoded_data = result.unwrap();
+        assert!(!encoded_data.is_empty());
+    }
+
+    #[test]
+    fn encode_should_pass_through_avro_payload() {
+        let encoder = AvroStreamEncoder::default();
+
+        let avro_data = vec![1, 2, 3, 4, 5];
+        let avro_payload = Payload::Avro(avro_data.clone());
+        let result = encoder.encode(avro_payload);
+
+        assert!(result.is_ok());
+        let encoded_data = result.unwrap();
+        assert_eq!(encoded_data, avro_data);
+    }
+
+    #[test]
+    fn encode_should_apply_field_mappings_when_configured() {
+        let schema_json = create_test_schema_json();
+        let mut field_mappings = HashMap::new();
+        field_mappings.insert("full_name".to_string(), "name".to_string());
+
+        let config = AvroEncoderConfig {
+            schema_json: Some(schema_json),
+            field_mappings: Some(field_mappings),
+            ..AvroEncoderConfig::default()
+        };
+        let encoder = AvroStreamEncoder::new(config);
+
+        let json_value = simd_json::json!({
+            "full_name": "Alice",
+            "age": 30
+        });
+
+        let result = encoder.encode(Payload::Json(json_value));
+
+        assert!(result.is_ok());
+        let encoded_data = result.unwrap();
+        assert!(!encoded_data.is_empty());
+    }
+
+    #[test]
+    fn encode_should_fail_without_schema_for_json_payload() {
+        let encoder = AvroStreamEncoder::default();
+
+        let json_value = simd_json::json!({
+            "name": "Alice",
+            "age": 30
+        });
+
+        let result = encoder.encode(Payload::Json(json_value));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn encode_should_handle_union_null_string_schema() {
+        let schema_json = r#"["null", "string"]"#.to_string();
+        let config = AvroEncoderConfig {
+            schema_json: Some(schema_json),
+            ..AvroEncoderConfig::default()
+        };
+        let encoder = AvroStreamEncoder::new(config);
+
+        let result = encoder.encode(Payload::Json(simd_json::json!("hello")));
+        assert!(result.is_ok(), "Expected string union variant to encode");
+        assert!(!result.unwrap().is_empty());
+    }
+
+    #[test]
+    fn encode_should_handle_union_null_int_schema() {
+        let schema_json = r#"["null", "int"]"#.to_string();
+        let config = AvroEncoderConfig {
+            schema_json: Some(schema_json),
+            ..AvroEncoderConfig::default()
+        };
+        let encoder = AvroStreamEncoder::new(config);
+
+        let result = encoder.encode(Payload::Json(simd_json::json!(42)));
+        assert!(result.is_ok(), "Expected int union variant to encode");
+        assert!(!result.unwrap().is_empty());
+    }
+
+    #[test]
+    fn encode_should_fail_on_int_overflow() {
+        let schema_json = r#"{"type": "int"}"#.to_string();
+        let config = AvroEncoderConfig {
+            schema_json: Some(schema_json),
+            ..AvroEncoderConfig::default()
+        };
+        let encoder = AvroStreamEncoder::new(config);
+
+        let result = encoder.encode(Payload::Json(simd_json::json!(i64::MAX)));
+        assert!(result.is_err(), "Expected overflow to fail");
+    }
+
+    #[test]
+    fn encode_should_fail_on_invalid_bytes_array_element() {
+        let schema_json = r#"{"type": "bytes"}"#.to_string();
+        let config = AvroEncoderConfig {
+            schema_json: Some(schema_json),
+            ..AvroEncoderConfig::default()
+        };
+        let encoder = AvroStreamEncoder::new(config);
+
+        let result = encoder.encode(Payload::Json(simd_json::json!([1, 2, 
300])));
+        assert!(result.is_err(), "Expected out-of-range byte to fail");
+    }
+
+    #[test]
+    fn convert_format_should_transform_avro_to_json() {
+        let encoder = AvroStreamEncoder::default();
+
+        let avro_data = vec![1, 2, 3, 4, 5];
+        let result = encoder.convert_format(Payload::Avro(avro_data.clone()), 
Schema::Json);
+
+        assert!(result.is_ok());
+        if let Ok(Payload::Json(json_value)) = result {
+            if let simd_json::OwnedValue::Object(map) = json_value {
+                assert!(map.contains_key("avro_data_base64"));
+            } else {
+                panic!("Expected JSON object");
+            }
+        } else {
+            panic!("Expected JSON payload");
+        }
+    }
+
+    #[test]
+    fn convert_format_should_transform_avro_to_text() {
+        let encoder = AvroStreamEncoder::default();
+
+        let avro_data = vec![1, 2, 3, 4, 5];
+        let result = encoder.convert_format(Payload::Avro(avro_data), 
Schema::Text);
+
+        assert!(result.is_ok());
+        if let Ok(Payload::Text(text)) = result {
+            assert!(!text.is_empty());
+        } else {
+            panic!("Expected Text payload");
+        }
+    }
+
+    #[test]
+    fn config_should_have_sensible_defaults() {
+        let config = AvroEncoderConfig::default();
+
+        assert!(config.schema_path.is_none());
+        assert!(config.schema_json.is_none());
+        assert!(config.field_mappings.is_none());
+    }
+
+    #[test]
+    fn encoder_should_be_creatable_with_custom_config() {
+        let config = AvroEncoderConfig {
+            schema_path: Some(PathBuf::from("/path/to/schema.avsc")),
+            schema_json: Some(r#"{"type": "string"}"#.to_string()),
+            ..AvroEncoderConfig::default()
+        };
+
+        let encoder = AvroStreamEncoder::new(config.clone());
+
+        assert_eq!(encoder.config.schema_path, config.schema_path);
+        assert_eq!(encoder.config.schema_json, config.schema_json);
+    }
+
+    #[test]
+    fn try_new_should_fail_on_invalid_schema() {
+        let config = AvroEncoderConfig {
+            schema_json: Some("not a valid schema".to_string()),
+            ..AvroEncoderConfig::default()
+        };
+        let result = AvroStreamEncoder::try_new(config);
+        assert!(
+            result.is_err(),
+            "Expected try_new to fail with invalid schema"
+        );
+    }
+
+    #[test]
+    fn update_config_should_rollback_on_invalid_schema() {
+        let valid_schema = r#"{"type": "string"}"#.to_string();
+        let mut encoder = AvroStreamEncoder::new(AvroEncoderConfig {
+            schema_json: Some(valid_schema.clone()),
+            ..AvroEncoderConfig::default()
+        });
+
+        assert!(encoder.schema.is_some());
+
+        let old_config = encoder.config.clone();
+        let invalid_config = AvroEncoderConfig {
+            schema_json: Some("invalid schema".to_string()),
+            ..AvroEncoderConfig::default()
+        };
+
+        let result = encoder.update_config(invalid_config);
+        assert!(result.is_err(), "Expected update_config to fail");
+
+        // After rollback, config and schema should remain unchanged
+        assert_eq!(encoder.config.schema_json, old_config.schema_json);
+        assert!(encoder.schema.is_some());
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/flatbuffer.rs 
b/core/connectors/sdk/src/encoders/flatbuffer.rs
index 2b2812aef..3c617ecbf 100644
--- a/core/connectors/sdk/src/encoders/flatbuffer.rs
+++ b/core/connectors/sdk/src/encoders/flatbuffer.rs
@@ -238,6 +238,7 @@ impl StreamEncoder for FlatBufferStreamEncoder {
             Payload::Raw(data) => self.encode_raw_to_flatbuffer(data),
             Payload::FlatBuffer(data) => Ok(data),
             Payload::Proto(text) => self.encode_text_to_flatbuffer(text),
+            Payload::Avro(data) => self.encode_raw_to_flatbuffer(data),
         }
     }
 }
diff --git a/core/connectors/sdk/src/encoders/mod.rs 
b/core/connectors/sdk/src/encoders/mod.rs
index c7c4cfd80..bb167c3cb 100644
--- a/core/connectors/sdk/src/encoders/mod.rs
+++ b/core/connectors/sdk/src/encoders/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod avro;
 pub mod flatbuffer;
 pub mod json;
 pub mod proto;
diff --git a/core/connectors/sdk/src/encoders/proto.rs 
b/core/connectors/sdk/src/encoders/proto.rs
index 52daa2829..09ace6cc0 100644
--- a/core/connectors/sdk/src/encoders/proto.rs
+++ b/core/connectors/sdk/src/encoders/proto.rs
@@ -371,6 +371,10 @@ impl ProtoStreamEncoder {
                 "flatbuffer_size": data.len(),
                 "data": general_purpose::STANDARD.encode(&data)
             }),
+            Payload::Avro(data) => simd_json::json!({
+                "avro_size": data.len(),
+                "data": general_purpose::STANDARD.encode(&data)
+            }),
         };
 
         if let simd_json::OwnedValue::Object(json_map) = json_value {
@@ -625,6 +629,13 @@ impl ProtoStreamEncoder {
                 ),
                 data,
             ),
+            Payload::Avro(data) => (
+                format!(
+                    "{}/google.protobuf.BytesValue",
+                    self.config.format_options.type_url_prefix
+                ),
+                data,
+            ),
         };
 
         let any = Any {
@@ -644,6 +655,7 @@ impl ProtoStreamEncoder {
             Payload::Raw(data) => Ok(data),
             Payload::Proto(text) => Ok(text.into_bytes()),
             Payload::FlatBuffer(data) => Ok(data),
+            Payload::Avro(data) => Ok(data),
         }
     }
 
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index a864407be..2289aa950 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -20,12 +20,12 @@
 use async_trait::async_trait;
 use base64::{self, Engine};
 use decoders::{
-    flatbuffer::FlatBufferStreamDecoder, json::JsonStreamDecoder, 
proto::ProtoStreamDecoder,
-    raw::RawStreamDecoder, text::TextStreamDecoder,
+    avro::AvroStreamDecoder, flatbuffer::FlatBufferStreamDecoder, 
json::JsonStreamDecoder,
+    proto::ProtoStreamDecoder, raw::RawStreamDecoder, text::TextStreamDecoder,
 };
 use encoders::{
-    flatbuffer::FlatBufferStreamEncoder, json::JsonStreamEncoder, 
proto::ProtoStreamEncoder,
-    raw::RawStreamEncoder, text::TextStreamEncoder,
+    avro::AvroStreamEncoder, flatbuffer::FlatBufferStreamEncoder, 
json::JsonStreamEncoder,
+    proto::ProtoStreamEncoder, raw::RawStreamEncoder, text::TextStreamEncoder,
 };
 use iggy::prelude::{HeaderKey, HeaderValue};
 use once_cell::sync::OnceCell;
@@ -144,6 +144,7 @@ pub enum Payload {
     Text(String),
     Proto(String),
     FlatBuffer(Vec<u8>),
+    Avro(Vec<u8>),
 }
 
 impl Payload {
@@ -157,6 +158,7 @@ impl Payload {
             Payload::Text(text) => Ok(text.into_bytes()),
             Payload::Proto(text) => Ok(text.into_bytes()),
             Payload::FlatBuffer(value) => Ok(value),
+            Payload::Avro(value) => Ok(value),
         }
     }
 
@@ -185,6 +187,7 @@ impl Payload {
             Payload::Text(text) => Ok(text.as_bytes().to_vec()),
             Payload::Proto(text) => Ok(text.as_bytes().to_vec()),
             Payload::FlatBuffer(value) => Ok(value.clone()),
+            Payload::Avro(value) => Ok(value.clone()),
         }
     }
 }
@@ -201,6 +204,7 @@ impl std::fmt::Display for Payload {
             Payload::Text(text) => write!(f, "Text({text})"),
             Payload::Proto(text) => write!(f, "Proto({text})"),
             Payload::FlatBuffer(value) => write!(f, "FlatBuffer({} bytes)", 
value.len()),
+            Payload::Avro(value) => write!(f, "Avro({} bytes)", value.len()),
         }
     }
 }
@@ -222,6 +226,8 @@ pub enum Schema {
     Proto,
     #[strum(to_string = "flatbuffer")]
     FlatBuffer,
+    #[strum(to_string = "avro")]
+    Avro,
 }
 
 impl Schema {
@@ -245,6 +251,7 @@ impl Schema {
                 Err(_) => Ok(Payload::Raw(value)),
             },
             Schema::FlatBuffer => Ok(Payload::FlatBuffer(value)),
+            Schema::Avro => Ok(Payload::Avro(value)),
         }
     }
 
@@ -255,6 +262,7 @@ impl Schema {
             Schema::Text => Arc::new(TextStreamDecoder),
             Schema::Proto => Arc::new(ProtoStreamDecoder::default()),
             Schema::FlatBuffer => Arc::new(FlatBufferStreamDecoder::default()),
+            Schema::Avro => Arc::new(AvroStreamDecoder::default()),
         }
     }
 
@@ -265,6 +273,7 @@ impl Schema {
             Schema::Text => Arc::new(TextStreamEncoder),
             Schema::Proto => Arc::new(ProtoStreamEncoder::default()),
             Schema::FlatBuffer => Arc::new(FlatBufferStreamEncoder::default()),
+            Schema::Avro => Arc::new(AvroStreamEncoder::default()),
         }
     }
 }
diff --git a/core/connectors/sdk/src/transforms/avro_convert.rs 
b/core/connectors/sdk/src/transforms/avro_convert.rs
new file mode 100644
index 000000000..f0242e9d9
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/avro_convert.rs
@@ -0,0 +1,398 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use super::{Transform, TransformType};
+use crate::decoders::avro::{AvroConfig, AvroStreamDecoder};
+use crate::encoders::avro::{AvroEncoderConfig, AvroStreamEncoder};
+use crate::{DecodedMessage, Error, Payload, Schema, TopicMetadata};
+use crate::{StreamDecoder, StreamEncoder};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AvroConvertConfig {
+    pub source_format: Schema,
+    pub target_format: Schema,
+    pub schema_path: Option<PathBuf>,
+    pub schema_json: Option<String>,
+    pub field_mappings: Option<HashMap<String, String>>,
+    pub conversion_options: AvroConversionOptions,
+}
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct AvroConversionOptions {
+    pub pretty_json: bool,
+    pub include_metadata: bool,
+    pub strict_mode: bool,
+}
+
+impl Default for AvroConvertConfig {
+    fn default() -> Self {
+        Self {
+            source_format: Schema::Avro,
+            target_format: Schema::Json,
+            schema_path: None,
+            schema_json: None,
+            field_mappings: None,
+            conversion_options: AvroConversionOptions::default(),
+        }
+    }
+}
+
+pub struct AvroConvert {
+    config: AvroConvertConfig,
+}
+
+impl AvroConvert {
+    pub fn new(config: AvroConvertConfig) -> Self {
+        Self { config }
+    }
+
+    pub fn new_default() -> Self {
+        Self::new(AvroConvertConfig::default())
+    }
+
+    fn apply_field_mappings(
+        &self,
+        payload: Payload,
+        field_mappings: &HashMap<String, String>,
+    ) -> Result<Payload, Error> {
+        match payload {
+            Payload::Json(json_value) => {
+                if let simd_json::OwnedValue::Object(mut map) = json_value {
+                    let mut new_entries = Vec::new();
+
+                    for (key, value) in map.iter() {
+                        if let Some(new_key) = field_mappings.get(key) {
+                            new_entries.push((new_key.clone(), value.clone()));
+                        } else {
+                            new_entries.push((key.clone(), value.clone()));
+                        }
+                    }
+
+                    map.clear();
+                    for (key, value) in new_entries {
+                        map.insert(key, value);
+                    }
+
+                    Ok(Payload::Json(simd_json::OwnedValue::Object(map)))
+                } else {
+                    Ok(Payload::Json(json_value))
+                }
+            }
+            other => Ok(other),
+        }
+    }
+}
+
+impl Transform for AvroConvert {
+    fn r#type(&self) -> TransformType {
+        TransformType::AvroConvert
+    }
+
+    fn transform(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if let Some(field_mappings) = &self.config.field_mappings {
+            message.payload = self.apply_field_mappings(message.payload, 
field_mappings)?;
+        }
+
+        message.payload = match (&self.config.source_format, 
&self.config.target_format) {
+            (Schema::Json, Schema::Avro) => {
+                let encoder_config = AvroEncoderConfig {
+                    schema_path: self.config.schema_path.clone(),
+                    schema_json: self.config.schema_json.clone(),
+                    field_mappings: self.config.field_mappings.clone(),
+                };
+                let encoder = AvroStreamEncoder::new(encoder_config);
+                let encoded_bytes = encoder.encode(message.payload)?;
+                Payload::Avro(encoded_bytes)
+            }
+            (Schema::Avro, Schema::Json) => {
+                let decoder_config = AvroConfig {
+                    schema_path: self.config.schema_path.clone(),
+                    schema_json: self.config.schema_json.clone(),
+                    field_mappings: self.config.field_mappings.clone(),
+                    extract_as_json: true,
+                };
+                let decoder = AvroStreamDecoder::new(decoder_config);
+                if let Payload::Avro(bytes) = message.payload {
+                    decoder.decode(bytes)?
+                } else {
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+            (Schema::Avro, Schema::Text) => {
+                let encoder = AvroStreamEncoder::default();
+                encoder.convert_format(message.payload, Schema::Text)?
+            }
+            (Schema::Avro, Schema::Raw) => {
+                let encoder = AvroStreamEncoder::default();
+                encoder.convert_format(message.payload, Schema::Raw)?
+            }
+            (Schema::Text, Schema::Avro) => {
+                let encoder_config = AvroEncoderConfig {
+                    schema_path: self.config.schema_path.clone(),
+                    schema_json: self.config.schema_json.clone(),
+                    field_mappings: self.config.field_mappings.clone(),
+                };
+                let encoder = AvroStreamEncoder::new(encoder_config);
+                let encoded_bytes = encoder.encode(message.payload)?;
+                Payload::Avro(encoded_bytes)
+            }
+            (Schema::Raw, Schema::Avro) => {
+                if let Payload::Raw(data) = message.payload {
+                    Payload::Avro(data)
+                } else {
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+            (source, target) if source == target => message.payload,
+            _ => {
+                // For unsupported conversions, pass through unchanged
+                message.payload
+            }
+        };
+
+        Ok(Some(message))
+    }
+}
+
+impl Default for AvroConvert {
+    fn default() -> Self {
+        Self::new_default()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::TopicMetadata;
+
+    fn create_test_message(payload: Payload) -> DecodedMessage {
+        DecodedMessage {
+            id: Some(123),
+            offset: Some(456),
+            checksum: Some(789),
+            timestamp: Some(1234567890),
+            origin_timestamp: Some(1234567890),
+            headers: None,
+            payload,
+        }
+    }
+
+    fn create_test_metadata() -> TopicMetadata {
+        TopicMetadata {
+            stream: "test_stream".to_string(),
+            topic: "test_topic".to_string(),
+        }
+    }
+
+    fn create_test_schema_json() -> String {
+        r#"{
+            "type": "record",
+            "name": "User",
+            "fields": [
+                {"name": "name", "type": "string"},
+                {"name": "age", "type": "int"}
+            ]
+        }"#
+        .to_string()
+    }
+
+    fn encode_test_avro_data(schema_json: &str) -> Vec<u8> {
+        use apache_avro::Schema as AvroSchema;
+        use apache_avro::types::Value as AvroValue;
+        let schema = AvroSchema::parse_str(schema_json).unwrap();
+        let record = AvroValue::Record(vec![
+            ("name".to_string(), AvroValue::String("Alice".to_string())),
+            ("age".to_string(), AvroValue::Int(30)),
+        ]);
+        apache_avro::to_avro_datum(&schema, record).unwrap()
+    }
+
+    #[test]
+    fn transform_should_convert_avro_to_json_successfully() {
+        let schema_json = create_test_schema_json();
+        let avro_data = encode_test_avro_data(&schema_json);
+
+        let config = AvroConvertConfig {
+            source_format: Schema::Avro,
+            target_format: Schema::Json,
+            schema_json: Some(schema_json),
+            ..AvroConvertConfig::default()
+        };
+        let converter = AvroConvert::new(config);
+        let metadata = create_test_metadata();
+
+        let avro_payload = Payload::Avro(avro_data);
+        let message = create_test_message(avro_payload);
+
+        let result = converter.transform(&metadata, message);
+
+        assert!(result.is_ok());
+        if let Ok(Some(transformed_message)) = result {
+            if let Payload::Json(json_value) = transformed_message.payload {
+                if let simd_json::OwnedValue::Object(map) = json_value {
+                    assert!(map.contains_key("name"));
+                    assert!(map.contains_key("age"));
+                } else {
+                    panic!("Expected JSON object");
+                }
+            } else {
+                panic!("Expected JSON payload");
+            }
+        } else {
+            panic!("Expected transformed message");
+        }
+    }
+
+    #[test]
+    fn transform_should_convert_json_to_avro_successfully() {
+        let schema_json = create_test_schema_json();
+
+        let config = AvroConvertConfig {
+            source_format: Schema::Json,
+            target_format: Schema::Avro,
+            schema_json: Some(schema_json),
+            ..AvroConvertConfig::default()
+        };
+        let converter = AvroConvert::new(config);
+        let metadata = create_test_metadata();
+
+        let json_payload = Payload::Json(simd_json::json!({
+            "name": "Bob",
+            "age": 25
+        }));
+        let message = create_test_message(json_payload);
+
+        let result = converter.transform(&metadata, message);
+
+        assert!(result.is_ok());
+        if let Ok(Some(transformed_message)) = result {
+            if let Payload::Avro(data) = transformed_message.payload {
+                assert!(!data.is_empty());
+            } else {
+                panic!("Expected Avro payload");
+            }
+        } else {
+            panic!("Expected transformed message");
+        }
+    }
+
+    #[test]
+    fn transform_should_convert_avro_to_text_successfully() {
+        let avro_data = vec![1, 2, 3, 4, 5];
+
+        let config = AvroConvertConfig {
+            source_format: Schema::Avro,
+            target_format: Schema::Text,
+            ..AvroConvertConfig::default()
+        };
+        let converter = AvroConvert::new(config);
+        let metadata = create_test_metadata();
+
+        let avro_payload = Payload::Avro(avro_data);
+        let message = create_test_message(avro_payload);
+
+        let result = converter.transform(&metadata, message);
+
+        assert!(result.is_ok());
+        if let Ok(Some(transformed_message)) = result {
+            if let Payload::Text(text) = transformed_message.payload {
+                assert!(!text.is_empty());
+            } else {
+                panic!("Expected Text payload");
+            }
+        } else {
+            panic!("Expected transformed message");
+        }
+    }
+
+    #[test]
+    fn transform_should_apply_field_mappings_during_conversion() {
+        let mut field_mappings = HashMap::new();
+        field_mappings.insert("name".to_string(), "full_name".to_string());
+
+        let schema_json = create_test_schema_json();
+        let avro_data = encode_test_avro_data(&schema_json);
+
+        let config = AvroConvertConfig {
+            source_format: Schema::Avro,
+            target_format: Schema::Json,
+            schema_json: Some(schema_json),
+            field_mappings: Some(field_mappings),
+            ..AvroConvertConfig::default()
+        };
+        let converter = AvroConvert::new(config);
+        let metadata = create_test_metadata();
+
+        let avro_payload = Payload::Avro(avro_data);
+        let message = create_test_message(avro_payload);
+
+        let result = converter.transform(&metadata, message);
+
+        assert!(result.is_ok());
+        if let Ok(Some(transformed_message)) = result {
+            if let Payload::Json(json_value) = transformed_message.payload {
+                if let simd_json::OwnedValue::Object(map) = json_value {
+                    assert!(map.contains_key("full_name"));
+                    assert!(!map.contains_key("name"));
+                } else {
+                    panic!("Expected JSON object");
+                }
+            } else {
+                panic!("Expected JSON payload");
+            }
+        } else {
+            panic!("Expected transformed message");
+        }
+    }
+
+    #[test]
+    fn transform_should_pass_through_same_format() {
+        let config = AvroConvertConfig {
+            source_format: Schema::Avro,
+            target_format: Schema::Avro,
+            ..AvroConvertConfig::default()
+        };
+        let converter = AvroConvert::new(config);
+        let metadata = create_test_metadata();
+
+        let avro_data = vec![1, 2, 3, 4, 5];
+        let avro_payload = Payload::Avro(avro_data.clone());
+        let message = create_test_message(avro_payload);
+
+        let result = converter.transform(&metadata, message);
+
+        assert!(result.is_ok());
+        if let Ok(Some(transformed_message)) = result {
+            if let Payload::Avro(data) = transformed_message.payload {
+                assert_eq!(data, avro_data);
+            } else {
+                panic!("Expected Avro payload");
+            }
+        } else {
+            panic!("Expected transformed message");
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs 
b/core/connectors/sdk/src/transforms/mod.rs
index d3feef125..56acc009b 100644
--- a/core/connectors/sdk/src/transforms/mod.rs
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -17,6 +17,7 @@
  */
 
 mod add_fields;
+pub mod avro_convert;
 mod delete_fields;
 mod filter_fields;
 pub mod flatbuffer_convert;
@@ -25,6 +26,7 @@ pub mod proto_convert;
 mod update_fields;
 use crate::{DecodedMessage, Error, TopicMetadata};
 pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField};
+pub use avro_convert::{AvroConvert, AvroConvertConfig};
 pub use delete_fields::{DeleteFields, DeleteFieldsConfig};
 pub use filter_fields::{
     FilterFields, FilterFieldsConfig, FilterPattern, KeyPattern as 
FilterKeyPattern,
@@ -86,6 +88,7 @@ pub enum TransformType {
     UpdateFields,
     ProtoConvert,
     FlatBufferConvert,
+    AvroConvert,
 }
 
 pub fn from_config(
@@ -123,5 +126,10 @@ pub fn from_config(
                 serde_json::from_value(raw.clone()).map_err(|_| 
Error::InvalidConfig)?;
             Ok(Arc::new(FlatBufferConvert::new(cfg)))
         }
+        TransformType::AvroConvert => {
+            let cfg: AvroConvertConfig =
+                serde_json::from_value(raw.clone()).map_err(|_| 
Error::InvalidConfig)?;
+            Ok(Arc::new(AvroConvert::new(cfg)))
+        }
     }
 }
diff --git a/core/connectors/sdk/src/transforms/proto_convert.rs 
b/core/connectors/sdk/src/transforms/proto_convert.rs
index b8e558298..f1debc758 100644
--- a/core/connectors/sdk/src/transforms/proto_convert.rs
+++ b/core/connectors/sdk/src/transforms/proto_convert.rs
@@ -471,6 +471,15 @@ impl ProtoConvert {
                     Err(Error::InvalidPayloadType)
                 }
             }
+            Schema::Avro => {
+                // Convert protobuf to raw bytes first, then wrap as Avro
+                let raw_payload = self.protobuf_to_raw(payload)?;
+                if let Payload::Raw(data) = raw_payload {
+                    Ok(Payload::Avro(data))
+                } else {
+                    Err(Error::InvalidPayloadType)
+                }
+            }
         }
     }
 
@@ -488,6 +497,14 @@ impl ProtoConvert {
                     Err(Error::InvalidPayloadType)
                 }
             }
+            Schema::Avro => {
+                // Convert Avro to raw bytes first, then to protobuf
+                if let Payload::Avro(data) = payload {
+                    self.raw_to_protobuf(Payload::Raw(data))
+                } else {
+                    Err(Error::InvalidPayloadType)
+                }
+            }
         }
     }
 
diff --git a/core/connectors/sinks/http_sink/src/lib.rs 
b/core/connectors/sinks/http_sink/src/lib.rs
index 6f86885b5..db98f634f 100644
--- a/core/connectors/sinks/http_sink/src/lib.rs
+++ b/core/connectors/sinks/http_sink/src/lib.rs
@@ -401,6 +401,14 @@ impl HttpSink {
                 serde_json::to_value(encoded)
                     .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
             }
+            Payload::Avro(bytes) => {
+                let encoded = EncodedPayload {
+                    data: general_purpose::STANDARD.encode(&bytes),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
         }
     }
 

Reply via email to