This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch headers_encryption
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7e8fa259d7e26d26cd509bd64b2fa48a1a3dd3e8
Author: spetz <[email protected]>
AuthorDate: Fri Mar 27 08:39:49 2026 +0100

    feat(security): encrypt user headers alongside message payload
---
 Cargo.lock                                         |  30 +++---
 Cargo.toml                                         |   8 +-
 DEPENDENCIES.md                                    |  30 +++---
 core/ai/mcp/Cargo.toml                             |   2 +-
 core/binary_protocol/Cargo.toml                    |   2 +-
 core/common/Cargo.toml                             |   2 +-
 core/common/src/utils/crypto.rs                    |  70 +++++++++++++
 core/connectors/runtime/Cargo.toml                 |   2 +-
 core/connectors/sdk/Cargo.toml                     |   2 +-
 .../connectors/sinks/elasticsearch_sink/Cargo.toml |   2 +-
 core/connectors/sinks/iceberg_sink/Cargo.toml      |   2 +-
 core/connectors/sinks/postgres_sink/Cargo.toml     |   2 +-
 core/connectors/sinks/quickwit_sink/Cargo.toml     |   2 +-
 core/connectors/sinks/stdout_sink/Cargo.toml       |   2 +-
 .../sources/elasticsearch_source/Cargo.toml        |   2 +-
 core/connectors/sources/postgres_source/Cargo.toml |   2 +-
 core/connectors/sources/random_source/Cargo.toml   |   2 +-
 .../tests/server/scenarios/encryption_scenario.rs  | 103 +++++++++++++++-----
 core/sdk/Cargo.toml                                |   2 +-
 core/sdk/src/clients/binary_message.rs             |  12 +++
 core/sdk/src/clients/consumer.rs                   |  19 +++-
 core/sdk/src/clients/producer.rs                   |   6 ++
 core/server/Cargo.toml                             |   2 +-
 core/server/src/shard/system/messages.rs           |  59 +++++++++--
 foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs  |  12 ++-
 .../csharp/Iggy_SDK/Contracts/MessageResponse.cs   |   9 +-
 .../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs  |   4 +-
 foreign/csharp/Iggy_SDK/Iggy_SDK.csproj            |   2 +-
 foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs    | 108 +++++++++++++++++----
 foreign/csharp/Iggy_SDK/Messages/Message.cs        |   6 ++
 .../csharp/Iggy_SDK/Publishers/IggyPublisher.cs    |  12 ++-
 .../Iggy_SDK/Utils/TcpMessageStreamHelpers.cs      |   6 ++
 .../MapperTests/HeaderEncryptionTests.cs           |  88 +++++++++++++++++
 foreign/python/Cargo.toml                          |   4 +-
 foreign/python/pyproject.toml                      |   2 +-
 35 files changed, 507 insertions(+), 113 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d43b9a2db..d72954b02 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5183,7 +5183,7 @@ checksum = 
"cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb"
 
 [[package]]
 name = "iggy"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 dependencies = [
  "async-broadcast",
  "async-dropper",
@@ -5300,7 +5300,7 @@ dependencies = [
 
 [[package]]
 name = "iggy-connectors"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "axum",
@@ -5352,7 +5352,7 @@ dependencies = [
 
 [[package]]
 name = "iggy-mcp"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "axum",
  "axum-server",
@@ -5384,7 +5384,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_binary_protocol"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 dependencies = [
  "bytemuck",
  "bytes",
@@ -5394,7 +5394,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_common"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 dependencies = [
  "aes-gcm",
  "ahash 0.8.12",
@@ -5440,7 +5440,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_elasticsearch_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5459,7 +5459,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_elasticsearch_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5478,7 +5478,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_iceberg_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "arrow-json",
  "async-trait",
@@ -5514,7 +5514,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_postgres_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5534,7 +5534,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_postgres_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5556,7 +5556,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_quickwit_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5571,7 +5571,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_random_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5589,7 +5589,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_sdk"
-version = "0.2.1-edge.1"
+version = "0.2.2-edge.1"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5619,7 +5619,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_stdout_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -9722,7 +9722,7 @@ dependencies = [
 
 [[package]]
 name = "server"
-version = "0.7.3-edge.1"
+version = "0.7.4-edge.1"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
diff --git a/Cargo.toml b/Cargo.toml
index 5918468b0..efa79e799 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -160,11 +160,11 @@ hwlocality = "1.0.0-alpha.11"
 iceberg = "0.9.0"
 iceberg-catalog-rest = "0.9.0"
 iceberg-storage-opendal = "0.9.0"
-iggy = { path = "core/sdk", version = "0.9.4-edge.1" }
+iggy = { path = "core/sdk", version = "0.9.5-edge.1" }
 iggy-cli = { path = "core/cli", version = "0.11.3-edge.1" }
-iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.9.4-edge.1" }
-iggy_common = { path = "core/common", version = "0.9.4-edge.1" }
-iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" }
+iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.9.5-edge.1" }
+iggy_common = { path = "core/common", version = "0.9.5-edge.1" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.2-edge.1" }
 integration = { path = "core/integration" }
 journal = { path = "core/journal" }
 js-sys = "0.3"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index b6100cdfb..73414fba9 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -451,24 +451,24 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
 if_chain: 1.0.3, "Apache-2.0 OR MIT",
-iggy: 0.9.4-edge.1, "Apache-2.0",
+iggy: 0.9.5-edge.1, "Apache-2.0",
 iggy-bench: 0.4.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.6.3-edge.1, "Apache-2.0",
 iggy-cli: 0.11.3-edge.1, "Apache-2.0",
-iggy-connectors: 0.3.2-edge.1, "Apache-2.0",
-iggy-mcp: 0.3.2-edge.1, "Apache-2.0",
-iggy_binary_protocol: 0.9.4-edge.1, "Apache-2.0",
-iggy_common: 0.9.4-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_iceberg_sink: 0.3.2-edge.1, "Apache-2.0",
+iggy-connectors: 0.3.3-edge.1, "Apache-2.0",
+iggy-mcp: 0.3.3-edge.1, "Apache-2.0",
+iggy_binary_protocol: 0.9.5-edge.1, "Apache-2.0",
+iggy_common: 0.9.5-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_iceberg_sink: 0.3.3-edge.1, "Apache-2.0",
 iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
-iggy_connector_postgres_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_postgres_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_quickwit_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_random_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_sdk: 0.2.1-edge.1, "Apache-2.0",
-iggy_connector_stdout_sink: 0.3.2-edge.1, "Apache-2.0",
+iggy_connector_postgres_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_postgres_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_quickwit_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_random_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_sdk: 0.2.2-edge.1, "Apache-2.0",
+iggy_connector_stdout_sink: 0.3.3-edge.1, "Apache-2.0",
 iggy_examples: 0.0.6, "Apache-2.0",
 ignore: 0.4.25, "MIT OR Unlicense",
 image: 0.25.10, "Apache-2.0 OR MIT",
@@ -843,7 +843,7 @@ serde_with_macros: 3.18.0, "Apache-2.0 OR MIT",
 serde_yaml_ng: 0.10.0, "MIT",
 serial_test: 3.4.0, "MIT",
 serial_test_derive: 3.4.0, "MIT",
-server: 0.7.3-edge.1, "Apache-2.0",
+server: 0.7.4-edge.1, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 071e8ae36..e013d59a8 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-mcp"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "MCP Server for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index b479ecf09..51f310908 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_binary_protocol"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 description = "Wire protocol types and codec for the Iggy binary protocol. 
Shared between server and SDK."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 56e99742f..a2fd8ae21 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 [package]
 name = "iggy_common"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index fca62dde1..24608cb62 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -95,6 +95,7 @@ impl Encryptor for Aes256GcmEncryptor {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::{HeaderKey, HeaderValue, IggyMessage};
 
     #[test]
     fn given_the_same_key_data_should_be_encrypted_and_decrypted_correctly() {
@@ -125,4 +126,73 @@ mod tests {
         let error = decrypted_data.err().unwrap();
         assert_eq!(error.as_code(), IggyError::CannotDecryptData.as_code());
     }
+
+    #[test]
+    fn message_payload_and_headers_should_encrypt_and_decrypt_correctly() {
+        use bytes::Bytes;
+        use std::collections::HashMap;
+
+        let key = [1; 32];
+        let encryptor = Aes256GcmEncryptor::new(&key).unwrap();
+
+        let mut headers = HashMap::new();
+        headers.insert(
+            HeaderKey::try_from("batch").unwrap(),
+            HeaderValue::from(1u64),
+        );
+        headers.insert(
+            HeaderKey::try_from("type").unwrap(),
+            HeaderValue::try_from("test-message").unwrap(),
+        );
+
+        let mut message = IggyMessage::builder()
+            .payload(Bytes::from("test payload data"))
+            .user_headers(headers)
+            .build()
+            .unwrap();
+
+        let original_payload = message.payload.clone();
+        let original_headers = message.user_headers.clone().unwrap();
+
+        message.payload = 
Bytes::from(encryptor.encrypt(&message.payload).unwrap());
+        message.header.payload_length = message.payload.len() as u32;
+
+        let encrypted_headers = encryptor.encrypt(&original_headers).unwrap();
+        message.header.user_headers_length = encrypted_headers.len() as u32;
+        message.user_headers = Some(Bytes::from(encrypted_headers));
+
+        assert_ne!(message.payload, original_payload);
+        assert_ne!(message.user_headers.as_ref().unwrap(), &original_headers);
+
+        let decrypted_payload = encryptor.decrypt(&message.payload).unwrap();
+        message.payload = Bytes::from(decrypted_payload);
+        message.header.payload_length = message.payload.len() as u32;
+
+        let decrypted_headers = encryptor
+            .decrypt(message.user_headers.as_ref().unwrap())
+            .unwrap();
+        message.header.user_headers_length = decrypted_headers.len() as u32;
+        message.user_headers = Some(Bytes::from(decrypted_headers));
+
+        assert_eq!(message.payload, original_payload);
+        assert_eq!(message.user_headers.as_ref().unwrap(), &original_headers);
+
+        let parsed = message.user_headers_map().unwrap().unwrap();
+        assert_eq!(
+            parsed
+                .get(&HeaderKey::try_from("batch").unwrap())
+                .unwrap()
+                .as_uint64()
+                .unwrap(),
+            1
+        );
+        assert_eq!(
+            parsed
+                .get(&HeaderKey::try_from("type").unwrap())
+                .unwrap()
+                .as_str()
+                .unwrap(),
+            "test-message"
+        );
+    }
 }
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 20d42deec..8e78c32b5 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-connectors"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Connectors runtime for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 34d242577..97a508b15 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_sdk"
-version = "0.2.1-edge.1"
+version = "0.2.2-edge.1"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml 
b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
index 40923ea9d..cb9b888a9 100644
--- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
+++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_elasticsearch_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy Elasticsearch sink connector"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml 
b/core/connectors/sinks/iceberg_sink/Cargo.toml
index 527a27d44..e3ac0ae8c 100644
--- a/core/connectors/sinks/iceberg_sink/Cargo.toml
+++ b/core/connectors/sinks/iceberg_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_iceberg_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 edition = "2024"
 license = "Apache-2.0"
 keywords = ["iggy", "messaging", "streaming"]
diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml 
b/core/connectors/sinks/postgres_sink/Cargo.toml
index 56ab685a3..04cc19986 100644
--- a/core/connectors/sinks/postgres_sink/Cargo.toml
+++ b/core/connectors/sinks/postgres_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_postgres_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy PostgreSQL sink connector for storing stream messages into 
PostgreSQL database"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml 
b/core/connectors/sinks/quickwit_sink/Cargo.toml
index 3aea64bda..cc54878ee 100644
--- a/core/connectors/sinks/quickwit_sink/Cargo.toml
+++ b/core/connectors/sinks/quickwit_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_quickwit_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml 
b/core/connectors/sinks/stdout_sink/Cargo.toml
index 270e52558..df123b5bf 100644
--- a/core/connectors/sinks/stdout_sink/Cargo.toml
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_stdout_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml 
b/core/connectors/sources/elasticsearch_source/Cargo.toml
index d1bb9c654..ed7365ea1 100644
--- a/core/connectors/sources/elasticsearch_source/Cargo.toml
+++ b/core/connectors/sources/elasticsearch_source/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_elasticsearch_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy Elasticsearch source connector"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sources/postgres_source/Cargo.toml 
b/core/connectors/sources/postgres_source/Cargo.toml
index bbc82bed2..6281670f7 100644
--- a/core/connectors/sources/postgres_source/Cargo.toml
+++ b/core/connectors/sources/postgres_source/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_postgres_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy PostgreSQL source connector supporting CDC and table 
polling for message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sources/random_source/Cargo.toml 
b/core/connectors/sources/random_source/Cargo.toml
index 7a0eed4d4..b3e3d666b 100644
--- a/core/connectors/sources/random_source/Cargo.toml
+++ b/core/connectors/sources/random_source/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_random_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs 
b/core/integration/tests/server/scenarios/encryption_scenario.rs
index f03741c44..98d37349e 100644
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@ -21,33 +21,9 @@ use iggy::prelude::*;
 use integration::harness::{TestHarness, TestServerConfig};
 use serial_test::parallel;
 use std::collections::HashMap;
+use std::path::{Path, PathBuf};
 use test_case::test_matrix;
 
-fn encryption_enabled() -> bool {
-    true
-}
-
-fn encryption_disabled() -> bool {
-    false
-}
-
-fn build_server_config(encryption: bool) -> TestServerConfig {
-    let mut extra_envs = HashMap::new();
-
-    if encryption {
-        extra_envs.insert(
-            "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
-            "true".to_string(),
-        );
-        extra_envs.insert(
-            "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
-            "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
-        );
-    }
-
-    TestServerConfig::builder().extra_envs(extra_envs).build()
-}
-
 #[test_matrix(
     [encryption_enabled(), encryption_disabled()]
 )]
@@ -128,6 +104,43 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
 
     tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
 
+    // Verify on-disk encryption of headers and payload
+    let data_path = harness.server().data_path();
+    let log_files = find_log_files(&data_path);
+    assert!(
+        !log_files.is_empty(),
+        "Expected at least one .log segment file on disk"
+    );
+    let all_raw_bytes: Vec<u8> = log_files
+        .iter()
+        .flat_map(|p| std::fs::read(p).unwrap())
+        .collect();
+    let contains_plaintext_header = all_raw_bytes
+        .windows(b"test-message".len())
+        .any(|w| w == b"test-message");
+    let contains_plaintext_payload = all_raw_bytes
+        .windows(b"Message batch 1".len())
+        .any(|w| w == b"Message batch 1");
+    if encryption {
+        assert!(
+            !contains_plaintext_header,
+            "When encryption is enabled, header values must NOT appear as 
plaintext on disk"
+        );
+        assert!(
+            !contains_plaintext_payload,
+            "When encryption is enabled, payload must NOT appear as plaintext 
on disk"
+        );
+    } else {
+        assert!(
+            contains_plaintext_header,
+            "When encryption is disabled, header values should appear as 
plaintext on disk"
+        );
+        assert!(
+            contains_plaintext_payload,
+            "When encryption is disabled, payload should appear as plaintext 
on disk"
+        );
+    }
+
     let initial_stats = client.get_stats().await.unwrap();
     let initial_messages_count = initial_stats.messages_count;
     let initial_messages_size = initial_stats.messages_size_bytes;
@@ -317,3 +330,43 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         initial_messages_size.as_bytes_u64()
     );
 }
+
+fn encryption_enabled() -> bool {
+    true
+}
+
+fn encryption_disabled() -> bool {
+    false
+}
+
+fn build_server_config(encryption: bool) -> TestServerConfig {
+    let mut extra_envs = HashMap::new();
+
+    if encryption {
+        extra_envs.insert(
+            "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
+            "true".to_string(),
+        );
+        extra_envs.insert(
+            "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
+            "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
+        );
+    }
+
+    TestServerConfig::builder().extra_envs(extra_envs).build()
+}
+
+fn find_log_files(dir: &Path) -> Vec<PathBuf> {
+    let mut result = Vec::new();
+    if let Ok(entries) = std::fs::read_dir(dir) {
+        for entry in entries.flatten() {
+            let path = entry.path();
+            if path.is_dir() {
+                result.extend(find_log_files(&path));
+            } else if path.extension().and_then(|e| e.to_str()) == Some("log") 
{
+                result.push(path);
+            }
+        }
+    }
+    result
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index cbd405065..3627c87db 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/clients/binary_message.rs 
b/core/sdk/src/clients/binary_message.rs
index 63f064d8a..462c60ed8 100644
--- a/core/sdk/src/clients/binary_message.rs
+++ b/core/sdk/src/clients/binary_message.rs
@@ -61,6 +61,12 @@ impl MessageClient for IggyClient {
                 let payload = encryptor.decrypt(&message.payload)?;
                 message.payload = Bytes::from(payload);
                 message.header.payload_length = message.payload.len() as u32;
+
+                if let Some(ref user_headers) = message.user_headers {
+                    let decrypted_headers = encryptor.decrypt(user_headers)?;
+                    message.header.user_headers_length = 
decrypted_headers.len() as u32;
+                    message.user_headers = 
Some(Bytes::from(decrypted_headers));
+                }
             }
         }
 
@@ -82,6 +88,12 @@ impl MessageClient for IggyClient {
             for message in &mut *messages {
                 message.payload = 
Bytes::from(encryptor.encrypt(&message.payload)?);
                 message.header.payload_length = message.payload.len() as u32;
+
+                if let Some(ref user_headers) = message.user_headers {
+                    let encrypted_headers = encryptor.encrypt(user_headers)?;
+                    message.header.user_headers_length = 
encrypted_headers.len() as u32;
+                    message.user_headers = 
Some(Bytes::from(encrypted_headers));
+                }
             }
         }
 
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 37b203286..17d2c425b 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -981,12 +981,12 @@ impl Stream for IggyConsumer {
                     } else {
                         if let Some(ref encryptor) = self.encryptor {
                             for message in &mut polled_messages.messages {
+                                let offset = message.header.offset;
                                 let payload = 
encryptor.decrypt(&message.payload);
                                 if let Err(error) = payload {
                                     self.poll_future = None;
                                     error!(
-                                        "Failed to decrypt the message payload 
at offset: {}, partition ID: {}",
-                                        message.header.offset, partition_id
+                                        "Failed to decrypt the message payload 
at offset: {offset}, partition ID: {partition_id}",
                                     );
                                     return Poll::Ready(Some(Err(error)));
                                 }
@@ -994,6 +994,21 @@ impl Stream for IggyConsumer {
                                 let payload = payload.unwrap();
                                 message.payload = Bytes::from(payload);
                                 message.header.payload_length = 
message.payload.len() as u32;
+
+                                if let Some(ref user_headers) = 
message.user_headers {
+                                    let decrypted_headers = 
encryptor.decrypt(user_headers);
+                                    if let Err(error) = decrypted_headers {
+                                        self.poll_future = None;
+                                        error!(
+                                            "Failed to decrypt the message 
user headers at offset: {offset}, partition ID: {partition_id}",
+                                        );
+                                        return Poll::Ready(Some(Err(error)));
+                                    }
+                                    let decrypted_headers = 
decrypted_headers.unwrap();
+                                    message.header.user_headers_length =
+                                        decrypted_headers.len() as u32;
+                                    message.user_headers = 
Some(Bytes::from(decrypted_headers));
+                                }
                             }
                         }
 
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index badd909f0..e17fafc80 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -293,6 +293,12 @@ impl ProducerCore {
             for message in messages {
                 message.payload = 
Bytes::from(encryptor.encrypt(&message.payload)?);
                 message.header.payload_length = message.payload.len() as u32;
+
+                if let Some(ref user_headers) = message.user_headers {
+                    let encrypted_headers = encryptor.encrypt(user_headers)?;
+                    message.header.user_headers_length = 
encrypted_headers.len() as u32;
+                    message.user_headers = 
Some(Bytes::from(encrypted_headers));
+                }
             }
         }
         Ok(())
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 62027ca6c..d26d3fed7 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "server"
-version = "0.7.3-edge.1"
+version = "0.7.4-edge.1"
 edition = "2024"
 license = "Apache-2.0"
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index a59f68d6d..21e342003 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -559,25 +559,46 @@ impl IggyShard {
             let mut position = 0;
 
             for message in batch.iter() {
+                let mut header = message.header().to_header();
+                let offset = header.offset;
                 let payload = encryptor.decrypt(message.payload());
                 match payload {
                     Ok(payload) => {
                         // Update the header with the decrypted payload length
-                        let mut header = message.header().to_header();
                         header.payload_length = payload.len() as u32;
 
+                        // Decrypt user headers if present
+                        let decrypted_user_headers = if let Some(user_headers) 
=
+                            message.user_headers()
+                        {
+                            match encryptor.decrypt(user_headers) {
+                                Ok(decrypted) => {
+                                    header.user_headers_length = 
decrypted.len() as u32;
+                                    Some(decrypted)
+                                }
+                                Err(error) => {
+                                    error!(
+                                        "Cannot decrypt the message user 
headers at offset: {offset}. Error: {error}"
+                                    );
+                                    continue;
+                                }
+                            }
+                        } else {
+                            None
+                        };
+
                         
decrypted_messages.extend_from_slice(&header.to_bytes());
                         decrypted_messages.extend_from_slice(&payload);
-                        if let Some(user_headers) = message.user_headers() {
+                        if let Some(ref user_headers) = decrypted_user_headers 
{
                             decrypted_messages.extend_from_slice(user_headers);
                         }
                         position += IGGY_MESSAGE_HEADER_SIZE
                             + payload.len()
-                            + message.header().user_headers_length();
+                            + header.user_headers_length as usize;
                         indexes.insert(0, position as u32, 0);
                     }
                     Err(error) => {
-                        error!("Cannot decrypt the message. Error: {}", error);
+                        error!("Cannot decrypt the message at offset: 
{offset}. Error: {error}",);
                         continue;
                     }
                 }
@@ -604,7 +625,7 @@ impl IggyShard {
 
         for message in batch.iter() {
             let header = message.header().to_header();
-            let user_headers_length = header.user_headers_length;
+            let offset = header.offset;
             let payload_bytes = message.payload();
             let user_headers_bytes = message.user_headers();
 
@@ -614,18 +635,38 @@ impl IggyShard {
                     let mut updated_header = header;
                     updated_header.payload_length = encrypted_payload.len() as 
u32;
 
+                    // Encrypt user headers if present
+                    let encrypted_user_headers = if let 
Some(user_headers_bytes) =
+                        user_headers_bytes
+                    {
+                        match encryptor.encrypt(user_headers_bytes) {
+                            Ok(encrypted) => {
+                                updated_header.user_headers_length = 
encrypted.len() as u32;
+                                Some(encrypted)
+                            }
+                            Err(error) => {
+                                error!(
+                                    "Cannot encrypt the message user headers 
at offset: {offset}. Error: {error}"
+                                );
+                                continue;
+                            }
+                        }
+                    } else {
+                        None
+                    };
+
                     
encrypted_messages.extend_from_slice(&updated_header.to_bytes());
                     encrypted_messages.extend_from_slice(&encrypted_payload);
-                    if let Some(user_headers_bytes) = user_headers_bytes {
-                        
encrypted_messages.extend_from_slice(user_headers_bytes);
+                    if let Some(ref encrypted_user_headers) = 
encrypted_user_headers {
+                        
encrypted_messages.extend_from_slice(encrypted_user_headers);
                     }
                     position += IGGY_MESSAGE_HEADER_SIZE
                         + encrypted_payload.len()
-                        + user_headers_length as usize;
+                        + updated_header.user_headers_length as usize;
                     indexes.insert(0, position as u32, 0);
                 }
                 Err(error) => {
-                    error!("Cannot encrypt the message. Error: {}", error);
+                    error!("Cannot encrypt the message at offset: {offset}. 
Error: {error}",);
                     continue;
                 }
             }
diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs 
b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
index 2c5a1b80d..74713deab 100644
--- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
+++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
@@ -21,8 +21,10 @@ using System.Threading.Channels;
 using Apache.Iggy.Contracts;
 using Apache.Iggy.Enums;
 using Apache.Iggy.Exceptions;
+using Apache.Iggy.Headers;
 using Apache.Iggy.IggyClient;
 using Apache.Iggy.Kinds;
+using Apache.Iggy.Mappers;
 using Apache.Iggy.Utils;
 using Microsoft.Extensions.Logging;
 
@@ -380,11 +382,19 @@ public partial class IggyConsumer : IAsyncDisposable
                     try
                     {
                         var decryptedPayload = 
_config.MessageEncryptor.Decrypt(message.Payload);
+
+                        Dictionary<HeaderKey, HeaderValue>? decryptedHeaders = 
message.UserHeaders;
+                        if (decryptedHeaders is null && message.RawUserHeaders 
is { Length: > 0 })
+                        {
+                            var decryptedHeaderBytes = 
_config.MessageEncryptor.Decrypt(message.RawUserHeaders);
+                            decryptedHeaders = 
BinaryMapper.MapHeaders(decryptedHeaderBytes);
+                        }
+
                         processedMessage = new MessageResponse
                         {
                             Header = message.Header,
                             Payload = decryptedPayload,
-                            UserHeaders = message.UserHeaders
+                            UserHeaders = decryptedHeaders
                         };
                     }
                     catch (Exception ex)
diff --git a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs 
b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
index f39ddd864..4d1de109f 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
@@ -42,5 +42,12 @@ public sealed class MessageResponse
     /// </summary>
     [JsonPropertyName("user_headers")]
     [JsonConverter(typeof(UserHeadersConverter))]
-    public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; init; }
+    public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; }
+
+    /// <summary>
+    ///     Raw user header bytes before deserialization.
+    ///     Used internally for decrypting encrypted headers.
+    /// </summary>
+    [JsonIgnore]
+    internal byte[]? RawUserHeaders { get; set; }
 }
diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs 
b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index 3501117cc..2e2c93943 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -410,7 +410,7 @@ internal static class TcpContracts
         var msgSize = 0;
         foreach (var message in messages)
         {
-            var headersBytes = GetHeadersBytes(message.UserHeaders);
+            var headersBytes = message.RawUserHeaders ?? 
GetHeadersBytes(message.UserHeaders);
             BinaryPrimitives.WriteUInt64LittleEndian(bytes[position..(position 
+ 8)], message.Header.Checksum);
             BinaryPrimitives.WriteUInt128LittleEndian(bytes[(position + 
8)..(position + 24)], message.Header.Id);
             BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 
24)..(position + 32)], message.Header.Offset);
@@ -548,7 +548,7 @@ internal static class TcpContracts
     //     return bytes;
     // }
 
-    private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? 
headers)
+    internal static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? 
headers)
     {
         if (headers == null)
         {
diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj 
b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
index fb31d48de..923242206 100644
--- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
+++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
@@ -7,7 +7,7 @@
         <TargetFrameworks>net8.0;net10.0</TargetFrameworks>
         <AssemblyName>Apache.Iggy</AssemblyName>
         <RootNamespace>Apache.Iggy</RootNamespace>
-        <Version>0.7.1-edge.3</Version>
+        <Version>0.7.2-edge.1</Version>
         <GenerateDocumentationFile>true</GenerateDocumentationFile>
     </PropertyGroup>
 
diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs 
b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
index 6c3435d2a..cb530a54e 100644
--- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
+++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
@@ -336,8 +336,7 @@ internal static class BinaryMapper
         };
     }
 
-    internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload,
-        Func<byte[], byte[]>? decryptor = null)
+    internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload)
     {
         var length = payload.Length;
         var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[..4]);
@@ -362,13 +361,22 @@ internal static class BinaryMapper
             var payloadLength = 
BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 52)..(position + 
56)]);
             var reserved = 
BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 56)..(position + 
64)]);
 
-            Dictionary<HeaderKey, HeaderValue>? headers = headersLength switch
+            var wireHeadersLength = headersLength;
+            byte[]? rawUserHeaders = null;
+            Dictionary<HeaderKey, HeaderValue>? headers;
+            if (headersLength == 0)
             {
-                0 => null,
-                > 0 => MapHeaders(
-                    payload[(position + 64 + payloadLength)..(position + 64 + 
payloadLength + headersLength)]),
-                < 0 => throw new ArgumentOutOfRangeException()
-            };
+                headers = null;
+            }
+            else if (headersLength < 0)
+            {
+                throw new ArgumentOutOfRangeException();
+            }
+            else
+            {
+                rawUserHeaders = payload[(position + 64 + 
payloadLength)..(position + 64 + payloadLength + headersLength)].ToArray();
+                headers = TryMapHeaders(rawUserHeaders);
+            }
 
             var payloadRangeStart = position + 64;
             var payloadRangeEnd = position + 64 + payloadLength;
@@ -399,9 +407,8 @@ internal static class BinaryMapper
                         Reserved = reserved
                     },
                     UserHeaders = headers,
-                    Payload = decryptor is not null
-                        ? decryptor(messagePayload[..payloadSliceLen])
-                        : messagePayload[..payloadSliceLen]
+                    RawUserHeaders = rawUserHeaders,
+                    Payload = messagePayload[..payloadSliceLen]
                 });
             }
             finally
@@ -409,7 +416,7 @@ internal static class BinaryMapper
                 ArrayPool<byte>.Shared.Return(messagePayload);
             }
 
-            position += 64 + payloadLength + headersLength;
+            position += 64 + payloadLength + wireHeadersLength;
             if (position + PropertiesSize >= length)
             {
                 break;
@@ -424,14 +431,14 @@ internal static class BinaryMapper
         };
     }
 
-    private static Dictionary<HeaderKey, HeaderValue> 
MapHeaders(ReadOnlySpan<byte> payload)
+    internal static Dictionary<HeaderKey, HeaderValue> 
MapHeaders(ReadOnlySpan<byte> payload)
     {
         var headers = new Dictionary<HeaderKey, HeaderValue>();
         var position = 0;
 
         while (position < payload.Length)
         {
-            var keyKind = MapHeaderKind(payload, position);
+            var keyKind = MapHeaderKind(payload[position]);
             position++;
 
             var keyLength = 
BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
@@ -444,7 +451,7 @@ internal static class BinaryMapper
             var keyValue = payload[position..(position + keyLength)].ToArray();
             position += keyLength;
 
-            var valueKind = MapHeaderKind(payload, position);
+            var valueKind = MapHeaderKind(payload[position]);
             position++;
 
             var valueLength = 
BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
@@ -464,9 +471,62 @@ internal static class BinaryMapper
         return headers;
     }
 
-    private static HeaderKind MapHeaderKind(ReadOnlySpan<byte> payload, int 
position)
+    internal static Dictionary<HeaderKey, HeaderValue>? 
TryMapHeaders(ReadOnlySpan<byte> payload)
+    {
+        if (payload.Length == 0 || payload[0] is 0 or > 15)
+        {
+            return null;
+        }
+
+        var headers = new Dictionary<HeaderKey, HeaderValue>();
+        var position = 0;
+
+        while (position < payload.Length)
+        {
+            if (!TryMapHeaderKind(payload[position], out var keyKind))
+                return null;
+            position++;
+
+            if (position + 4 > payload.Length)
+                return null;
+            var keyLength = 
BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
+            if (keyLength is <= 0 or > 255)
+                return null;
+
+            position += 4;
+            if (position + keyLength > payload.Length)
+                return null;
+            var keyValue = payload[position..(position + keyLength)].ToArray();
+            position += keyLength;
+
+            if (position >= payload.Length)
+                return null;
+            if (!TryMapHeaderKind(payload[position], out var valueKind))
+                return null;
+            position++;
+
+            if (position + 4 > payload.Length)
+                return null;
+            var valueLength = 
BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
+            if (valueLength is <= 0 or > 255)
+                return null;
+
+            position += 4;
+            if (position + valueLength > payload.Length)
+                return null;
+            ReadOnlySpan<byte> value = payload[position..(position + 
valueLength)];
+            position += valueLength;
+
+            headers[new HeaderKey { Kind = keyKind, Value = keyValue }] =
+                new HeaderValue { Kind = valueKind, Value = value.ToArray() };
+        }
+
+        return headers;
+    }
+
+    private static HeaderKind MapHeaderKind(byte value)
     {
-        var headerKind = payload[position] switch
+        return value switch
         {
             1 => HeaderKind.Raw,
             2 => HeaderKind.String,
@@ -483,9 +543,19 @@ internal static class BinaryMapper
             13 => HeaderKind.Uint128,
             14 => HeaderKind.Float,
             15 => HeaderKind.Double,
-            _ => throw new ArgumentOutOfRangeException()
+            _ => throw new ArgumentOutOfRangeException(nameof(value), value, 
null)
         };
-        return headerKind;
+    }
+
+    private static bool TryMapHeaderKind(byte value, out HeaderKind kind)
+    {
+        if (value is >= 1 and <= 15)
+        {
+            kind = MapHeaderKind(value);
+            return true;
+        }
+        kind = default;
+        return false;
     }
 
     internal static IReadOnlyList<StreamResponse> 
MapStreams(ReadOnlySpan<byte> payload)
diff --git a/foreign/csharp/Iggy_SDK/Messages/Message.cs 
b/foreign/csharp/Iggy_SDK/Messages/Message.cs
index 1a0e7e697..dc3c7dd03 100644
--- a/foreign/csharp/Iggy_SDK/Messages/Message.cs
+++ b/foreign/csharp/Iggy_SDK/Messages/Message.cs
@@ -45,6 +45,12 @@ public class Message
     /// </summary>
     public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; }
 
+    /// <summary>
+    ///     Pre-serialized (possibly encrypted) user headers bytes.
+    ///     When set, this takes precedence over <see cref="UserHeaders"/> 
during serialization.
+    /// </summary>
+    internal byte[]? RawUserHeaders { get; set; }
+
     /// <summary>
     ///     Default constructor.
     /// </summary>
diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs 
b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
index 1b95c9da8..68ea63e2e 100644
--- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
+++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using Apache.Iggy.Contracts.Tcp;
 using Apache.Iggy.Enums;
 using Apache.Iggy.Exceptions;
 using Apache.Iggy.IggyClient;
@@ -310,7 +311,7 @@ public partial class IggyPublisher : IAsyncDisposable
 
     /// <summary>
     ///     Encrypts all messages in the list using the configured message 
encryptor, if available.
-    ///     Updates the payload length in the message header after encryption.
+    ///     Updates the payload and user headers lengths in the message header 
after encryption.
     /// </summary>
     /// <param name="messages">The messages to encrypt.</param>
     private void EncryptMessages(IList<Message> messages)
@@ -324,6 +325,15 @@ public partial class IggyPublisher : IAsyncDisposable
         {
             message.Payload = 
_config.MessageEncryptor.Encrypt(message.Payload);
             message.Header.PayloadLength = message.Payload.Length;
+
+            if (message.UserHeaders is { Count: > 0 })
+            {
+                var headerBytes = 
TcpContracts.GetHeadersBytes(message.UserHeaders);
+                var encryptedHeaderBytes = 
_config.MessageEncryptor.Encrypt(headerBytes);
+                message.RawUserHeaders = encryptedHeaderBytes;
+                message.UserHeaders = null;
+                message.Header.UserHeadersLength = encryptedHeaderBytes.Length;
+            }
         }
     }
 }
diff --git a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs 
b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
index c99337b4d..626442a11 100644
--- a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
+++ b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
@@ -49,6 +49,12 @@ internal static class TcpMessageStreamHelpers
         foreach (var message in messages)
         {
             bytesCount += 16 + 64 + message.Payload.Length;
+            if (message.RawUserHeaders is not null)
+            {
+                bytesCount += message.RawUserHeaders.Length;
+                continue;
+            }
+
             if (message.UserHeaders is null)
             {
                 continue;
diff --git a/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs 
b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs
new file mode 100644
index 000000000..fb278eaaf
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Contracts.Tcp;
+using Apache.Iggy.Encryption;
+using Apache.Iggy.Headers;
+namespace Apache.Iggy.Tests.MapperTests;
+
+public class HeaderEncryptionTests
+{
+    [Fact]
+    public void Headers_should_survive_encrypt_decrypt_roundtrip()
+    {
+        var key = Encryption.AesMessageEncryptor.GenerateKey();
+        var encryptor = new Encryption.AesMessageEncryptor(key);
+
+        var originalHeaders = new Dictionary<Headers.HeaderKey, 
Headers.HeaderValue>
+        {
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value 
= "batch"u8.ToArray() }, new Headers.HeaderValue { Kind = 
Headers.HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value 
= "type"u8.ToArray() }, new Headers.HeaderValue { Kind = 
Headers.HeaderKind.String, Value = "test-message"u8.ToArray() } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value 
= "encrypted"u8.ToArray() }, new Headers.HeaderValue { Kind = 
Headers.HeaderKind.Bool, Value = [1] } },
+        };
+
+        var headerBytes = 
Contracts.Tcp.TcpContracts.GetHeadersBytes(originalHeaders);
+        Assert.NotEmpty(headerBytes);
+
+        var encrypted = encryptor.Encrypt(headerBytes);
+        Assert.NotEqual(headerBytes, encrypted);
+        Assert.True(encrypted.Length > headerBytes.Length);
+
+        // Encrypted bytes must not parse as valid headers
+        var parsed = Mappers.BinaryMapper.TryMapHeaders(encrypted);
+        Assert.Null(parsed);
+
+        var decrypted = encryptor.Decrypt(encrypted);
+        Assert.Equal(headerBytes, decrypted);
+
+        var roundTripped = Mappers.BinaryMapper.MapHeaders(decrypted);
+        Assert.Equal(originalHeaders.Count, roundTripped.Count);
+
+        foreach (var (key2, value) in originalHeaders)
+        {
+            Assert.True(roundTripped.ContainsKey(key2));
+            Assert.Equal(value.Kind, roundTripped[key2].Kind);
+            Assert.Equal(value.Value, roundTripped[key2].Value);
+        }
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_random_bytes()
+    {
+        var random = new byte[64];
+        Random.Shared.NextBytes(random);
+        random[0] = 0;
+
+        var result = Mappers.BinaryMapper.TryMapHeaders(random);
+        Assert.Null(result);
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_valid_headers_on_plaintext()
+    {
+        var headers = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+        {
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value 
= "key"u8.ToArray() }, new Headers.HeaderValue { Kind = 
Headers.HeaderKind.String, Value = "value"u8.ToArray() } },
+        };
+
+        var bytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(headers);
+        var result = Mappers.BinaryMapper.TryMapHeaders(bytes);
+
+        Assert.NotNull(result);
+        Assert.Single(result);
+    }
+}
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 9ce0d5265..2e6e38516 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "apache-iggy"
-version = "0.7.4-dev1"
+version = "0.7.5-dev1"
 edition = "2024"
 authors = ["Iggy Committers <[email protected]>"]
 license = "Apache-2.0"
@@ -28,7 +28,7 @@ repository = "https://github.com/apache/iggy";
 [dependencies]
 bytes = "1.11.1"
 futures = "0.3.32"
-iggy = { path = "../../core/sdk", version = "0.9.4-edge.1" }
+iggy = { path = "../../core/sdk", version = "0.9.5-edge.1" }
 pyo3 = "0.28.2"
 pyo3-async-runtimes = { version = "0.28.0", features = [
     "attributes",
diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml
index ba57ad43c..012296101 100644
--- a/foreign/python/pyproject.toml
+++ b/foreign/python/pyproject.toml
@@ -22,7 +22,7 @@ build-backend = "maturin"
 [project]
 name = "apache-iggy"
 requires-python = ">=3.10"
-version = "0.7.4.dev1"
+version = "0.7.5.dev1"
 description = "Apache Iggy is the persistent message streaming platform 
written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of 
processing millions of messages per second."
 readme = "README.md"
 license = { file = "LICENSE" }

Reply via email to