This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch headers_encryption in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7e8fa259d7e26d26cd509bd64b2fa48a1a3dd3e8 Author: spetz <[email protected]> AuthorDate: Fri Mar 27 08:39:49 2026 +0100 feat(security): encrypt user headers alongside message payload --- Cargo.lock | 30 +++--- Cargo.toml | 8 +- DEPENDENCIES.md | 30 +++--- core/ai/mcp/Cargo.toml | 2 +- core/binary_protocol/Cargo.toml | 2 +- core/common/Cargo.toml | 2 +- core/common/src/utils/crypto.rs | 70 +++++++++++++ core/connectors/runtime/Cargo.toml | 2 +- core/connectors/sdk/Cargo.toml | 2 +- .../connectors/sinks/elasticsearch_sink/Cargo.toml | 2 +- core/connectors/sinks/iceberg_sink/Cargo.toml | 2 +- core/connectors/sinks/postgres_sink/Cargo.toml | 2 +- core/connectors/sinks/quickwit_sink/Cargo.toml | 2 +- core/connectors/sinks/stdout_sink/Cargo.toml | 2 +- .../sources/elasticsearch_source/Cargo.toml | 2 +- core/connectors/sources/postgres_source/Cargo.toml | 2 +- core/connectors/sources/random_source/Cargo.toml | 2 +- .../tests/server/scenarios/encryption_scenario.rs | 103 +++++++++++++++----- core/sdk/Cargo.toml | 2 +- core/sdk/src/clients/binary_message.rs | 12 +++ core/sdk/src/clients/consumer.rs | 19 +++- core/sdk/src/clients/producer.rs | 6 ++ core/server/Cargo.toml | 2 +- core/server/src/shard/system/messages.rs | 59 +++++++++-- foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs | 12 ++- .../csharp/Iggy_SDK/Contracts/MessageResponse.cs | 9 +- .../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs | 4 +- foreign/csharp/Iggy_SDK/Iggy_SDK.csproj | 2 +- foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs | 108 +++++++++++++++++---- foreign/csharp/Iggy_SDK/Messages/Message.cs | 6 ++ .../csharp/Iggy_SDK/Publishers/IggyPublisher.cs | 12 ++- .../Iggy_SDK/Utils/TcpMessageStreamHelpers.cs | 6 ++ .../MapperTests/HeaderEncryptionTests.cs | 88 +++++++++++++++++ foreign/python/Cargo.toml | 4 +- foreign/python/pyproject.toml | 2 +- 35 files changed, 507 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d43b9a2db..d72954b02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5183,7 +5183,7 @@ checksum = "cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb" [[package]] name = "iggy" -version = "0.9.4-edge.1" +version = "0.9.5-edge.1" dependencies = [ "async-broadcast", "async-dropper", @@ -5300,7 +5300,7 @@ dependencies = [ [[package]] name = "iggy-connectors" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "axum", @@ -5352,7 +5352,7 @@ dependencies = [ [[package]] name = "iggy-mcp" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "axum", "axum-server", @@ -5384,7 +5384,7 @@ dependencies = [ [[package]] name = "iggy_binary_protocol" -version = "0.9.4-edge.1" +version = "0.9.5-edge.1" dependencies = [ "bytemuck", "bytes", @@ -5394,7 +5394,7 @@ dependencies = [ [[package]] name = "iggy_common" -version = "0.9.4-edge.1" +version = "0.9.5-edge.1" dependencies = [ "aes-gcm", "ahash 0.8.12", @@ -5440,7 +5440,7 @@ dependencies = [ [[package]] name = "iggy_connector_elasticsearch_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -5459,7 +5459,7 @@ dependencies = [ [[package]] name = "iggy_connector_elasticsearch_source" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "dashmap", @@ -5478,7 +5478,7 @@ dependencies = [ [[package]] name = "iggy_connector_iceberg_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "arrow-json", "async-trait", @@ -5514,7 +5514,7 @@ dependencies = [ [[package]] name = "iggy_connector_postgres_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "dashmap", @@ -5534,7 +5534,7 @@ dependencies = [ [[package]] name = "iggy_connector_postgres_source" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -5556,7 +5556,7 @@ dependencies = [ [[package]] name = "iggy_connector_quickwit_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "dashmap", @@ -5571,7 +5571,7 @@ dependencies = [ [[package]] name = "iggy_connector_random_source" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "dashmap", @@ -5589,7 +5589,7 @@ dependencies = [ [[package]] name = "iggy_connector_sdk" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -5619,7 +5619,7 @@ dependencies = [ [[package]] name = "iggy_connector_stdout_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "dashmap", @@ -9722,7 +9722,7 @@ dependencies = [ [[package]] name = "server" -version = "0.7.3-edge.1" +version = "0.7.4-edge.1" dependencies = [ "ahash 0.8.12", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 5918468b0..efa79e799 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,11 +160,11 @@ hwlocality = "1.0.0-alpha.11" iceberg = "0.9.0" iceberg-catalog-rest = "0.9.0" iceberg-storage-opendal = "0.9.0" -iggy = { path = "core/sdk", version = "0.9.4-edge.1" } +iggy = { path = "core/sdk", version = "0.9.5-edge.1" } iggy-cli = { path = "core/cli", version = "0.11.3-edge.1" } -iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.4-edge.1" } -iggy_common = { path = "core/common", version = "0.9.4-edge.1" } -iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" } +iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.5-edge.1" } +iggy_common = { path = "core/common", version = "0.9.5-edge.1" } +iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.2-edge.1" } integration = { path = "core/integration" } journal = { path = "core/journal" } js-sys = "0.3" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index b6100cdfb..73414fba9 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -451,24 +451,24 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", if_chain: 1.0.3, "Apache-2.0 OR MIT", -iggy: 0.9.4-edge.1, "Apache-2.0", +iggy: 0.9.5-edge.1, "Apache-2.0", iggy-bench: 0.4.1-edge.1, "Apache-2.0", iggy-bench-dashboard-server: 0.6.3-edge.1, "Apache-2.0", iggy-cli: 0.11.3-edge.1, "Apache-2.0", -iggy-connectors: 0.3.2-edge.1, "Apache-2.0", -iggy-mcp: 0.3.2-edge.1, "Apache-2.0", -iggy_binary_protocol: 0.9.4-edge.1, "Apache-2.0", -iggy_common: 0.9.4-edge.1, "Apache-2.0", -iggy_connector_elasticsearch_sink: 0.3.2-edge.1, "Apache-2.0", -iggy_connector_elasticsearch_source: 0.3.2-edge.1, "Apache-2.0", -iggy_connector_iceberg_sink: 0.3.2-edge.1, "Apache-2.0", +iggy-connectors: 0.3.3-edge.1, "Apache-2.0", +iggy-mcp: 0.3.3-edge.1, "Apache-2.0", +iggy_binary_protocol: 0.9.5-edge.1, "Apache-2.0", +iggy_common: 0.9.5-edge.1, "Apache-2.0", +iggy_connector_elasticsearch_sink: 0.3.3-edge.1, "Apache-2.0", +iggy_connector_elasticsearch_source: 0.3.3-edge.1, "Apache-2.0", +iggy_connector_iceberg_sink: 0.3.3-edge.1, "Apache-2.0", iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0", -iggy_connector_postgres_sink: 0.3.2-edge.1, "Apache-2.0", -iggy_connector_postgres_source: 0.3.2-edge.1, "Apache-2.0", -iggy_connector_quickwit_sink: 0.3.2-edge.1, "Apache-2.0", -iggy_connector_random_source: 0.3.2-edge.1, "Apache-2.0", -iggy_connector_sdk: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_stdout_sink: 0.3.2-edge.1, "Apache-2.0", +iggy_connector_postgres_sink: 0.3.3-edge.1, "Apache-2.0", +iggy_connector_postgres_source: 0.3.3-edge.1, "Apache-2.0", +iggy_connector_quickwit_sink: 0.3.3-edge.1, "Apache-2.0", +iggy_connector_random_source: 0.3.3-edge.1, "Apache-2.0", +iggy_connector_sdk: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_stdout_sink: 0.3.3-edge.1, "Apache-2.0", iggy_examples: 0.0.6, "Apache-2.0", ignore: 0.4.25, "MIT OR Unlicense", image: 0.25.10, "Apache-2.0 OR MIT", @@ -843,7 +843,7 @@ serde_with_macros: 3.18.0, "Apache-2.0 OR MIT", serde_yaml_ng: 0.10.0, "MIT", serial_test: 3.4.0, "MIT", serial_test_derive: 3.4.0, "MIT", -server: 0.7.3-edge.1, "Apache-2.0", +server: 0.7.4-edge.1, "Apache-2.0", sha1: 0.10.6, "Apache-2.0 OR MIT", sha2: 0.10.9, "Apache-2.0 OR MIT", sha3: 0.10.8, "Apache-2.0 OR MIT", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index 071e8ae36..e013d59a8 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-mcp" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "MCP Server for Iggy message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml index b479ecf09..51f310908 100644 --- a/core/binary_protocol/Cargo.toml +++ b/core/binary_protocol/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_binary_protocol" -version = "0.9.4-edge.1" +version = "0.9.5-edge.1" description = "Wire protocol types and codec for the Iggy binary protocol. Shared between server and SDK." edition = "2024" license = "Apache-2.0" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 56e99742f..a2fd8ae21 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] name = "iggy_common" -version = "0.9.4-edge.1" +version = "0.9.5-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs index fca62dde1..24608cb62 100644 --- a/core/common/src/utils/crypto.rs +++ b/core/common/src/utils/crypto.rs @@ -95,6 +95,7 @@ impl Encryptor for Aes256GcmEncryptor { #[cfg(test)] mod tests { use super::*; + use crate::{HeaderKey, HeaderValue, IggyMessage}; #[test] fn given_the_same_key_data_should_be_encrypted_and_decrypted_correctly() { @@ -125,4 +126,73 @@ mod tests { let error = decrypted_data.err().unwrap(); assert_eq!(error.as_code(), IggyError::CannotDecryptData.as_code()); } + + #[test] + fn message_payload_and_headers_should_encrypt_and_decrypt_correctly() { + use bytes::Bytes; + use std::collections::HashMap; + + let key = [1; 32]; + let encryptor = Aes256GcmEncryptor::new(&key).unwrap(); + + let mut headers = HashMap::new(); + headers.insert( + HeaderKey::try_from("batch").unwrap(), + HeaderValue::from(1u64), + ); + headers.insert( + HeaderKey::try_from("type").unwrap(), + HeaderValue::try_from("test-message").unwrap(), + ); + + let mut message = IggyMessage::builder() + .payload(Bytes::from("test payload data")) + .user_headers(headers) + .build() + .unwrap(); + + let original_payload = message.payload.clone(); + let original_headers = message.user_headers.clone().unwrap(); + + message.payload = Bytes::from(encryptor.encrypt(&message.payload).unwrap()); + message.header.payload_length = message.payload.len() as u32; + + let encrypted_headers = encryptor.encrypt(&original_headers).unwrap(); + message.header.user_headers_length = encrypted_headers.len() as u32; + message.user_headers = Some(Bytes::from(encrypted_headers)); + + assert_ne!(message.payload, original_payload); + assert_ne!(message.user_headers.as_ref().unwrap(), &original_headers); + + let decrypted_payload = encryptor.decrypt(&message.payload).unwrap(); + message.payload = Bytes::from(decrypted_payload); + message.header.payload_length = message.payload.len() as u32; + + let decrypted_headers = encryptor + .decrypt(message.user_headers.as_ref().unwrap()) + .unwrap(); + message.header.user_headers_length = decrypted_headers.len() as u32; + message.user_headers = Some(Bytes::from(decrypted_headers)); + + assert_eq!(message.payload, original_payload); + assert_eq!(message.user_headers.as_ref().unwrap(), &original_headers); + + let parsed = message.user_headers_map().unwrap().unwrap(); + assert_eq!( + parsed + .get(&HeaderKey::try_from("batch").unwrap()) + .unwrap() + .as_uint64() + .unwrap(), + 1 + ); + assert_eq!( + parsed + .get(&HeaderKey::try_from("type").unwrap()) + .unwrap() + .as_str() + .unwrap(), + "test-message" + ); + } } diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index 20d42deec..8e78c32b5 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-connectors" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Connectors runtime for Iggy message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml index 34d242577..97a508b15 100644 --- a/core/connectors/sdk/Cargo.toml +++ b/core/connectors/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_sdk" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml b/core/connectors/sinks/elasticsearch_sink/Cargo.toml index 40923ea9d..cb9b888a9 100644 --- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml +++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_elasticsearch_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy Elasticsearch sink connector" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml b/core/connectors/sinks/iceberg_sink/Cargo.toml index 527a27d44..e3ac0ae8c 100644 --- a/core/connectors/sinks/iceberg_sink/Cargo.toml +++ b/core/connectors/sinks/iceberg_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_iceberg_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" edition = "2024" license = "Apache-2.0" keywords = ["iggy", "messaging", "streaming"] diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml b/core/connectors/sinks/postgres_sink/Cargo.toml index 56ab685a3..04cc19986 100644 --- a/core/connectors/sinks/postgres_sink/Cargo.toml +++ b/core/connectors/sinks/postgres_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_postgres_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy PostgreSQL sink connector for storing stream messages into PostgreSQL database" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml index 3aea64bda..cc54878ee 100644 --- a/core/connectors/sinks/quickwit_sink/Cargo.toml +++ b/core/connectors/sinks/quickwit_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_quickwit_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml index 270e52558..df123b5bf 100644 --- a/core/connectors/sinks/stdout_sink/Cargo.toml +++ b/core/connectors/sinks/stdout_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_stdout_sink" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml b/core/connectors/sources/elasticsearch_source/Cargo.toml index d1bb9c654..ed7365ea1 100644 --- a/core/connectors/sources/elasticsearch_source/Cargo.toml +++ b/core/connectors/sources/elasticsearch_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_elasticsearch_source" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy Elasticsearch source connector" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml index bbc82bed2..6281670f7 100644 --- a/core/connectors/sources/postgres_source/Cargo.toml +++ b/core/connectors/sources/postgres_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_postgres_source" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy PostgreSQL source connector supporting CDC and table polling for message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/random_source/Cargo.toml b/core/connectors/sources/random_source/Cargo.toml index 7a0eed4d4..b3e3d666b 100644 --- a/core/connectors/sources/random_source/Cargo.toml +++ b/core/connectors/sources/random_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_random_source" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs index f03741c44..98d37349e 100644 --- a/core/integration/tests/server/scenarios/encryption_scenario.rs +++ b/core/integration/tests/server/scenarios/encryption_scenario.rs @@ -21,33 +21,9 @@ use iggy::prelude::*; use integration::harness::{TestHarness, TestServerConfig}; use serial_test::parallel; use std::collections::HashMap; +use std::path::{Path, PathBuf}; use test_case::test_matrix; -fn encryption_enabled() -> bool { - true -} - -fn encryption_disabled() -> bool { - false -} - -fn build_server_config(encryption: bool) -> TestServerConfig { - let mut extra_envs = HashMap::new(); - - if encryption { - extra_envs.insert( - "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(), - "true".to_string(), - ); - extra_envs.insert( - "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(), - "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(), - ); - } - - TestServerConfig::builder().extra_envs(extra_envs).build() -} - #[test_matrix( [encryption_enabled(), encryption_disabled()] )] @@ -128,6 +104,43 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + // Verify on-disk encryption of headers and payload + let data_path = harness.server().data_path(); + let log_files = find_log_files(&data_path); + assert!( + !log_files.is_empty(), + "Expected at least one .log segment file on disk" + ); + let all_raw_bytes: Vec<u8> = log_files + .iter() + .flat_map(|p| std::fs::read(p).unwrap()) + .collect(); + let contains_plaintext_header = all_raw_bytes + .windows(b"test-message".len()) + .any(|w| w == b"test-message"); + let contains_plaintext_payload = all_raw_bytes + .windows(b"Message batch 1".len()) + .any(|w| w == b"Message batch 1"); + if encryption { + assert!( + !contains_plaintext_header, + "When encryption is enabled, header values must NOT appear as plaintext on disk" + ); + assert!( + !contains_plaintext_payload, + "When encryption is enabled, payload must NOT appear as plaintext on disk" + ); + } else { + assert!( + contains_plaintext_header, + "When encryption is disabled, header values should appear as plaintext on disk" + ); + assert!( + contains_plaintext_payload, + "When encryption is disabled, payload should appear as plaintext on disk" + ); + } + let initial_stats = client.get_stats().await.unwrap(); let initial_messages_count = initial_stats.messages_count; let initial_messages_size = initial_stats.messages_size_bytes; @@ -317,3 +330,43 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp initial_messages_size.as_bytes_u64() ); } + +fn encryption_enabled() -> bool { + true +} + +fn encryption_disabled() -> bool { + false +} + +fn build_server_config(encryption: bool) -> TestServerConfig { + let mut extra_envs = HashMap::new(); + + if encryption { + extra_envs.insert( + "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(), + "true".to_string(), + ); + extra_envs.insert( + "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(), + "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(), + ); + } + + TestServerConfig::builder().extra_envs(extra_envs).build() +} + +fn find_log_files(dir: &Path) -> Vec<PathBuf> { + let mut result = Vec::new(); + if let Ok(entries) = std::fs::read_dir(dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + result.extend(find_log_files(&path)); + } else if path.extension().and_then(|e| e.to_str()) == Some("log") { + result.push(path); + } + } + } + result +} diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index cbd405065..3627c87db 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy" -version = "0.9.4-edge.1" +version = "0.9.5-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/sdk/src/clients/binary_message.rs b/core/sdk/src/clients/binary_message.rs index 63f064d8a..462c60ed8 100644 --- a/core/sdk/src/clients/binary_message.rs +++ b/core/sdk/src/clients/binary_message.rs @@ -61,6 +61,12 @@ impl MessageClient for IggyClient { let payload = encryptor.decrypt(&message.payload)?; message.payload = Bytes::from(payload); message.header.payload_length = message.payload.len() as u32; + + if let Some(ref user_headers) = message.user_headers { + let decrypted_headers = encryptor.decrypt(user_headers)?; + message.header.user_headers_length = decrypted_headers.len() as u32; + message.user_headers = Some(Bytes::from(decrypted_headers)); + } } } @@ -82,6 +88,12 @@ impl MessageClient for IggyClient { for message in &mut *messages { message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); message.header.payload_length = message.payload.len() as u32; + + if let Some(ref user_headers) = message.user_headers { + let encrypted_headers = encryptor.encrypt(user_headers)?; + message.header.user_headers_length = encrypted_headers.len() as u32; + message.user_headers = Some(Bytes::from(encrypted_headers)); + } } } diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 37b203286..17d2c425b 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -981,12 +981,12 @@ impl Stream for IggyConsumer { } else { if let Some(ref encryptor) = self.encryptor { for message in &mut polled_messages.messages { + let offset = message.header.offset; let payload = encryptor.decrypt(&message.payload); if let Err(error) = payload { self.poll_future = None; error!( - "Failed to decrypt the message payload at offset: {}, partition ID: {}", - message.header.offset, partition_id + "Failed to decrypt the message payload at offset: {offset}, partition ID: {partition_id}", ); return Poll::Ready(Some(Err(error))); } @@ -994,6 +994,21 @@ impl Stream for IggyConsumer { let payload = payload.unwrap(); message.payload = Bytes::from(payload); message.header.payload_length = message.payload.len() as u32; + + if let Some(ref user_headers) = message.user_headers { + let decrypted_headers = encryptor.decrypt(user_headers); + if let Err(error) = decrypted_headers { + self.poll_future = None; + error!( + "Failed to decrypt the message user headers at offset: {offset}, partition ID: {partition_id}", + ); + return Poll::Ready(Some(Err(error))); + } + let decrypted_headers = decrypted_headers.unwrap(); + message.header.user_headers_length = + decrypted_headers.len() as u32; + message.user_headers = Some(Bytes::from(decrypted_headers)); + } } } diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index badd909f0..e17fafc80 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -293,6 +293,12 @@ impl ProducerCore { for message in messages { message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); message.header.payload_length = message.payload.len() as u32; + + if let Some(ref user_headers) = message.user_headers { + let encrypted_headers = encryptor.encrypt(user_headers)?; + message.header.user_headers_length = encrypted_headers.len() as u32; + message.user_headers = Some(Bytes::from(encrypted_headers)); + } } } Ok(()) diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 62027ca6c..d26d3fed7 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "server" -version = "0.7.3-edge.1" +version = "0.7.4-edge.1" edition = "2024" license = "Apache-2.0" diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index a59f68d6d..21e342003 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -559,25 +559,46 @@ impl IggyShard { let mut position = 0; for message in batch.iter() { + let mut header = message.header().to_header(); + let offset = header.offset; let payload = encryptor.decrypt(message.payload()); match payload { Ok(payload) => { // Update the header with the decrypted payload length - let mut header = message.header().to_header(); header.payload_length = payload.len() as u32; + // Decrypt user headers if present + let decrypted_user_headers = if let Some(user_headers) = + message.user_headers() + { + match encryptor.decrypt(user_headers) { + Ok(decrypted) => { + header.user_headers_length = decrypted.len() as u32; + Some(decrypted) + } + Err(error) => { + error!( + "Cannot decrypt the message user headers at offset: {offset}. Error: {error}" + ); + continue; + } + } + } else { + None + }; + decrypted_messages.extend_from_slice(&header.to_bytes()); decrypted_messages.extend_from_slice(&payload); - if let Some(user_headers) = message.user_headers() { + if let Some(ref user_headers) = decrypted_user_headers { decrypted_messages.extend_from_slice(user_headers); } position += IGGY_MESSAGE_HEADER_SIZE + payload.len() - + message.header().user_headers_length(); + + header.user_headers_length as usize; indexes.insert(0, position as u32, 0); } Err(error) => { - error!("Cannot decrypt the message. Error: {}", error); + error!("Cannot decrypt the message at offset: {offset}. Error: {error}",); continue; } } @@ -604,7 +625,7 @@ impl IggyShard { for message in batch.iter() { let header = message.header().to_header(); - let user_headers_length = header.user_headers_length; + let offset = header.offset; let payload_bytes = message.payload(); let user_headers_bytes = message.user_headers(); @@ -614,18 +635,38 @@ impl IggyShard { let mut updated_header = header; updated_header.payload_length = encrypted_payload.len() as u32; + // Encrypt user headers if present + let encrypted_user_headers = if let Some(user_headers_bytes) = + user_headers_bytes + { + match encryptor.encrypt(user_headers_bytes) { + Ok(encrypted) => { + updated_header.user_headers_length = encrypted.len() as u32; + Some(encrypted) + } + Err(error) => { + error!( + "Cannot encrypt the message user headers at offset: {offset}. Error: {error}" + ); + continue; + } + } + } else { + None + }; + encrypted_messages.extend_from_slice(&updated_header.to_bytes()); encrypted_messages.extend_from_slice(&encrypted_payload); - if let Some(user_headers_bytes) = user_headers_bytes { - encrypted_messages.extend_from_slice(user_headers_bytes); + if let Some(ref encrypted_user_headers) = encrypted_user_headers { + encrypted_messages.extend_from_slice(encrypted_user_headers); } position += IGGY_MESSAGE_HEADER_SIZE + encrypted_payload.len() - + user_headers_length as usize; + + updated_header.user_headers_length as usize; indexes.insert(0, position as u32, 0); } Err(error) => { - error!("Cannot encrypt the message. Error: {}", error); + error!("Cannot encrypt the message at offset: {offset}. Error: {error}",); continue; } } diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs index 2c5a1b80d..74713deab 100644 --- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs +++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs @@ -21,8 +21,10 @@ using System.Threading.Channels; using Apache.Iggy.Contracts; using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; +using Apache.Iggy.Headers; using Apache.Iggy.IggyClient; using Apache.Iggy.Kinds; +using Apache.Iggy.Mappers; using Apache.Iggy.Utils; using Microsoft.Extensions.Logging; @@ -380,11 +382,19 @@ public partial class IggyConsumer : IAsyncDisposable try { var decryptedPayload = _config.MessageEncryptor.Decrypt(message.Payload); + + Dictionary<HeaderKey, HeaderValue>? decryptedHeaders = message.UserHeaders; + if (decryptedHeaders is null && message.RawUserHeaders is { Length: > 0 }) + { + var decryptedHeaderBytes = _config.MessageEncryptor.Decrypt(message.RawUserHeaders); + decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytes); + } + processedMessage = new MessageResponse { Header = message.Header, Payload = decryptedPayload, - UserHeaders = message.UserHeaders + UserHeaders = decryptedHeaders }; } catch (Exception ex) diff --git a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs index f39ddd864..4d1de109f 100644 --- a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs +++ b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs @@ -42,5 +42,12 @@ public sealed class MessageResponse /// </summary> [JsonPropertyName("user_headers")] [JsonConverter(typeof(UserHeadersConverter))] - public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; init; } + public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; } + + /// <summary> + /// Raw user header bytes before deserialization. + /// Used internally for decrypting encrypted headers. + /// </summary> + [JsonIgnore] + internal byte[]? RawUserHeaders { get; set; } } diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs index 3501117cc..2e2c93943 100644 --- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs +++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs @@ -410,7 +410,7 @@ internal static class TcpContracts var msgSize = 0; foreach (var message in messages) { - var headersBytes = GetHeadersBytes(message.UserHeaders); + var headersBytes = message.RawUserHeaders ?? GetHeadersBytes(message.UserHeaders); BinaryPrimitives.WriteUInt64LittleEndian(bytes[position..(position + 8)], message.Header.Checksum); BinaryPrimitives.WriteUInt128LittleEndian(bytes[(position + 8)..(position + 24)], message.Header.Id); BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 24)..(position + 32)], message.Header.Offset); @@ -548,7 +548,7 @@ internal static class TcpContracts // return bytes; // } - private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? headers) + internal static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? headers) { if (headers == null) { diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj index fb31d48de..923242206 100644 --- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj +++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj @@ -7,7 +7,7 @@ <TargetFrameworks>net8.0;net10.0</TargetFrameworks> <AssemblyName>Apache.Iggy</AssemblyName> <RootNamespace>Apache.Iggy</RootNamespace> - <Version>0.7.1-edge.3</Version> + <Version>0.7.2-edge.1</Version> <GenerateDocumentationFile>true</GenerateDocumentationFile> </PropertyGroup> diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs index 6c3435d2a..cb530a54e 100644 --- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs +++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs @@ -336,8 +336,7 @@ internal static class BinaryMapper }; } - internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload, - Func<byte[], byte[]>? decryptor = null) + internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload) { var length = payload.Length; var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[..4]); @@ -362,13 +361,22 @@ internal static class BinaryMapper var payloadLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 52)..(position + 56)]); var reserved = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 56)..(position + 64)]); - Dictionary<HeaderKey, HeaderValue>? headers = headersLength switch + var wireHeadersLength = headersLength; + byte[]? rawUserHeaders = null; + Dictionary<HeaderKey, HeaderValue>? headers; + if (headersLength == 0) { - 0 => null, - > 0 => MapHeaders( - payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)]), - < 0 => throw new ArgumentOutOfRangeException() - }; + headers = null; + } + else if (headersLength < 0) + { + throw new ArgumentOutOfRangeException(); + } + else + { + rawUserHeaders = payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)].ToArray(); + headers = TryMapHeaders(rawUserHeaders); + } var payloadRangeStart = position + 64; var payloadRangeEnd = position + 64 + payloadLength; @@ -399,9 +407,8 @@ internal static class BinaryMapper Reserved = reserved }, UserHeaders = headers, - Payload = decryptor is not null - ? decryptor(messagePayload[..payloadSliceLen]) - : messagePayload[..payloadSliceLen] + RawUserHeaders = rawUserHeaders, + Payload = messagePayload[..payloadSliceLen] }); } finally @@ -409,7 +416,7 @@ internal static class BinaryMapper ArrayPool<byte>.Shared.Return(messagePayload); } - position += 64 + payloadLength + headersLength; + position += 64 + payloadLength + wireHeadersLength; if (position + PropertiesSize >= length) { break; @@ -424,14 +431,14 @@ internal static class BinaryMapper }; } - private static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload) + internal static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload) { var headers = new Dictionary<HeaderKey, HeaderValue>(); var position = 0; while (position < payload.Length) { - var keyKind = MapHeaderKind(payload, position); + var keyKind = MapHeaderKind(payload[position]); position++; var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]); @@ -444,7 +451,7 @@ internal static class BinaryMapper var keyValue = payload[position..(position + keyLength)].ToArray(); position += keyLength; - var valueKind = MapHeaderKind(payload, position); + var valueKind = MapHeaderKind(payload[position]); position++; var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]); @@ -464,9 +471,62 @@ internal static class BinaryMapper return headers; } - private static HeaderKind MapHeaderKind(ReadOnlySpan<byte> payload, int position) + internal static Dictionary<HeaderKey, HeaderValue>? TryMapHeaders(ReadOnlySpan<byte> payload) + { + if (payload.Length == 0 || payload[0] is 0 or > 15) + { + return null; + } + + var headers = new Dictionary<HeaderKey, HeaderValue>(); + var position = 0; + + while (position < payload.Length) + { + if (!TryMapHeaderKind(payload[position], out var keyKind)) + return null; + position++; + + if (position + 4 > payload.Length) + return null; + var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]); + if (keyLength is <= 0 or > 255) + return null; + + position += 4; + if (position + keyLength > payload.Length) + return null; + var keyValue = payload[position..(position + keyLength)].ToArray(); + position += keyLength; + + if (position >= payload.Length) + return null; + if (!TryMapHeaderKind(payload[position], out var valueKind)) + return null; + position++; + + if (position + 4 > payload.Length) + return null; + var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]); + if (valueLength is <= 0 or > 255) + return null; + + position += 4; + if (position + valueLength > payload.Length) + return null; + ReadOnlySpan<byte> value = payload[position..(position + valueLength)]; + position += valueLength; + + headers[new HeaderKey { Kind = keyKind, Value = keyValue }] = + new HeaderValue { Kind = valueKind, Value = value.ToArray() }; + } + + return headers; + } + + private static HeaderKind MapHeaderKind(byte value) { - var headerKind = payload[position] switch + return value switch { 1 => HeaderKind.Raw, 2 => HeaderKind.String, @@ -483,9 +543,19 @@ internal static class BinaryMapper 13 => HeaderKind.Uint128, 14 => HeaderKind.Float, 15 => HeaderKind.Double, - _ => throw new ArgumentOutOfRangeException() + _ => throw new ArgumentOutOfRangeException(nameof(value), value, null) }; - return headerKind; + } + + private static bool TryMapHeaderKind(byte value, out HeaderKind kind) + { + if (value is >= 1 and <= 15) + { + kind = MapHeaderKind(value); + return true; + } + kind = default; + return false; } internal static IReadOnlyList<StreamResponse> MapStreams(ReadOnlySpan<byte> payload) diff --git a/foreign/csharp/Iggy_SDK/Messages/Message.cs b/foreign/csharp/Iggy_SDK/Messages/Message.cs index 1a0e7e697..dc3c7dd03 100644 --- a/foreign/csharp/Iggy_SDK/Messages/Message.cs +++ b/foreign/csharp/Iggy_SDK/Messages/Message.cs @@ -45,6 +45,12 @@ public class Message /// </summary> public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; } + /// <summary> + /// Pre-serialized (possibly encrypted) user headers bytes. + /// When set, this takes precedence over <see cref="UserHeaders"/> during serialization. + /// </summary> + internal byte[]? RawUserHeaders { get; set; } + /// <summary> /// Default constructor. /// </summary> diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs index 1b95c9da8..68ea63e2e 100644 --- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs +++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using Apache.Iggy.Contracts.Tcp; using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; using Apache.Iggy.IggyClient; @@ -310,7 +311,7 @@ public partial class IggyPublisher : IAsyncDisposable /// <summary> /// Encrypts all messages in the list using the configured message encryptor, if available. - /// Updates the payload length in the message header after encryption. + /// Updates the payload and user headers lengths in the message header after encryption. /// </summary> /// <param name="messages">The messages to encrypt.</param> private void EncryptMessages(IList<Message> messages) @@ -324,6 +325,15 @@ public partial class IggyPublisher : IAsyncDisposable { message.Payload = _config.MessageEncryptor.Encrypt(message.Payload); message.Header.PayloadLength = message.Payload.Length; + + if (message.UserHeaders is { Count: > 0 }) + { + var headerBytes = TcpContracts.GetHeadersBytes(message.UserHeaders); + var encryptedHeaderBytes = _config.MessageEncryptor.Encrypt(headerBytes); + message.RawUserHeaders = encryptedHeaderBytes; + message.UserHeaders = null; + message.Header.UserHeadersLength = encryptedHeaderBytes.Length; + } } } } diff --git a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs index c99337b4d..626442a11 100644 --- a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs +++ b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs @@ -49,6 +49,12 @@ internal static class TcpMessageStreamHelpers foreach (var message in messages) { bytesCount += 16 + 64 + message.Payload.Length; + if (message.RawUserHeaders is not null) + { + bytesCount += message.RawUserHeaders.Length; + continue; + } + if (message.UserHeaders is null) { continue; diff --git a/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs new file mode 100644 index 000000000..fb278eaaf --- /dev/null +++ b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Apache.Iggy.Contracts.Tcp; +using Apache.Iggy.Encryption; +using Apache.Iggy.Headers; +namespace Apache.Iggy.Tests.MapperTests; + +public class HeaderEncryptionTests +{ + [Fact] + public void Headers_should_survive_encrypt_decrypt_roundtrip() + { + var key = Encryption.AesMessageEncryptor.GenerateKey(); + var encryptor = new Encryption.AesMessageEncryptor(key); + + var originalHeaders = new Dictionary<Headers.HeaderKey, Headers.HeaderValue> + { + { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "batch"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) } }, + { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "type"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.String, Value = "test-message"u8.ToArray() } }, + { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "encrypted"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Bool, Value = [1] } }, + }; + + var headerBytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(originalHeaders); + Assert.NotEmpty(headerBytes); + + var encrypted = encryptor.Encrypt(headerBytes); + Assert.NotEqual(headerBytes, encrypted); + Assert.True(encrypted.Length > headerBytes.Length); + + // Encrypted bytes must not parse as valid headers + var parsed = Mappers.BinaryMapper.TryMapHeaders(encrypted); + Assert.Null(parsed); + + var decrypted = encryptor.Decrypt(encrypted); + Assert.Equal(headerBytes, decrypted); + + var roundTripped = Mappers.BinaryMapper.MapHeaders(decrypted); + Assert.Equal(originalHeaders.Count, roundTripped.Count); + + foreach (var (key2, value) in originalHeaders) + { + Assert.True(roundTripped.ContainsKey(key2)); + Assert.Equal(value.Kind, roundTripped[key2].Kind); + Assert.Equal(value.Value, roundTripped[key2].Value); + } + } + + [Fact] + public void TryMapHeaders_returns_null_on_random_bytes() + { + var random = new byte[64]; + Random.Shared.NextBytes(random); + random[0] = 0; + + var result = Mappers.BinaryMapper.TryMapHeaders(random); + Assert.Null(result); + } + + [Fact] + public void TryMapHeaders_returns_valid_headers_on_plaintext() + { + var headers = new Dictionary<Headers.HeaderKey, Headers.HeaderValue> + { + { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "key"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.String, Value = "value"u8.ToArray() } }, + }; + + var bytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(headers); + var result = Mappers.BinaryMapper.TryMapHeaders(bytes); + + Assert.NotNull(result); + Assert.Single(result); + } +} diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index 9ce0d5265..2e6e38516 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "apache-iggy" -version = "0.7.4-dev1" +version = "0.7.5-dev1" edition = "2024" authors = ["Iggy Committers <[email protected]>"] license = "Apache-2.0" @@ -28,7 +28,7 @@ repository = "https://github.com/apache/iggy" [dependencies] bytes = "1.11.1" futures = "0.3.32" -iggy = { path = "../../core/sdk", version = "0.9.4-edge.1" } +iggy = { path = "../../core/sdk", version = "0.9.5-edge.1" } pyo3 = "0.28.2" pyo3-async-runtimes = { version = "0.28.0", features = [ "attributes", diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml index ba57ad43c..012296101 100644 --- a/foreign/python/pyproject.toml +++ b/foreign/python/pyproject.toml @@ -22,7 +22,7 @@ build-backend = "maturin" [project] name = "apache-iggy" requires-python = ">=3.10" -version = "0.7.4.dev1" +version = "0.7.5.dev1" description = "Apache Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." readme = "README.md" license = { file = "LICENSE" }
