This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch headers_encryption in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b62da5b862e78b9639fba7b8135e0d7503ff963e Merge: 10af6bf1e 169a09bcd Author: Piotr Gankiewicz <[email protected]> AuthorDate: Mon Mar 30 09:12:43 2026 +0200 Merge branch 'master' into headers_encryption core/common/src/commands/messages/send_messages.rs | 16 +- core/common/src/types/message/iggy_message.rs | 8 +- .../tests/server/scenarios/encryption_scenario.rs | 16 +- foreign/go/contracts/users.go | 25 +- foreign/go/contracts/users_test.go | 746 +++++++++++++++++++++ 5 files changed, 788 insertions(+), 23 deletions(-) diff --cc core/common/src/commands/messages/send_messages.rs index 75606a4f2,91a6aa43c..2943340e9 --- a/core/common/src/commands/messages/send_messages.rs +++ b/core/common/src/commands/messages/send_messages.rs @@@ -317,38 -310,29 +317,38 @@@ impl<'de> Deserialize<'de> for SendMess .decode(payload) .map_err(|_| de::Error::custom("Invalid base64 payload"))?; - let headers_map = if let Some(headers) = msg.get("user_headers") { - if headers.is_null() { - None - } else { - let entries: Vec<HeaderEntry> = serde_json::from_value( - headers.clone(), - ) - .map_err(|e| { - de::Error::custom(format!( - "Invalid headers format: {e}" - )) - })?; - let mut map = HashMap::new(); - for entry in entries { - map.insert(entry.key, entry.value); + let (headers_map, raw_headers) = + if let Some(headers) = msg.get("user_headers") { + if headers.is_null() { + (None, None) + } else if let Some(base64_str) = headers.as_str() { + // Raw base64-encoded header bytes (e.g. client-side encrypted) + let raw = BASE64.decode(base64_str).map_err(|e| { + de::Error::custom(format!( + "Invalid base64 headers: {e}" + )) + })?; + (None, Some(Bytes::from(raw))) + } else { - let entries: Vec<HeaderEntry> = - serde_json::from_value(headers.clone()).map_err( - |e| { - de::Error::custom(format!( - "Invalid headers format: {e}" - )) - }, - )?; ++ let entries: Vec<HeaderEntry> = serde_json::from_value( ++ headers.clone(), ++ ) ++ .map_err(|e| { ++ de::Error::custom(format!( ++ "Invalid headers format: {e}" ++ )) ++ })?; + let mut map = HashMap::new(); + for entry in entries { + map.insert(entry.key, entry.value); + } + (Some(map), None) } - Some(map) - } - } else { - None - }; + } else { + (None, None) + }; - let iggy_message = if let Some(headers) = headers_map { + let mut iggy_message = if let Some(headers) = headers_map { IggyMessage::builder() .id(id) .payload(payload_bytes.into()) diff --cc core/common/src/types/message/iggy_message.rs index 08bac4342,12a2c1afc..4fc214d28 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@@ -617,30 -610,12 +617,28 @@@ impl<'de> Deserialize<'de> for IggyMess payload = Some(Bytes::from(decoded)); } "user_headers" => { - let entries: Vec<HeaderEntry> = map.next_value()?; - let mut headers_map = HashMap::new(); - for entry in entries { - headers_map.insert(entry.key, entry.value); + // Try as array of HeaderEntry first, fall back to base64 string + let value: serde_json::Value = map.next_value()?; + if let Some(base64_str) = value.as_str() { + use base64::{Engine as _, engine::general_purpose::STANDARD}; + let decoded = + STANDARD.decode(base64_str.as_bytes()).map_err(|e| { + de::Error::custom(format!( + "Failed to decode base64 headers: {e}" + )) + })?; + raw_user_headers = Some(Bytes::from(decoded)); + } else if value.is_array() { - let entries: Vec<HeaderEntry> = - serde_json::from_value(value).map_err(|e| { - de::Error::custom(format!( - "Invalid headers format: {e}" - )) ++ let entries: Vec<HeaderEntry> = serde_json::from_value(value) ++ .map_err(|e| { ++ de::Error::custom(format!("Invalid headers format: {e}")) + })?; + let mut headers_map = HashMap::new(); + for entry in entries { + headers_map.insert(entry.key, entry.value); + } + user_headers = Some(headers_map); } - user_headers = Some(headers_map); } _ => { let _ = map.next_value::<de::IgnoredAny>()?; diff --cc core/integration/tests/server/scenarios/encryption_scenario.rs index 8eb2ecbf0,f03741c44..6881f5f8c --- a/core/integration/tests/server/scenarios/encryption_scenario.rs +++ b/core/integration/tests/server/scenarios/encryption_scenario.rs @@@ -332,186 -317,3 +332,190 @@@ async fn should_fill_data_with_headers_ initial_messages_size.as_bytes_u64() ); } + +#[test_matrix( + [TransportProtocol::Tcp, TransportProtocol::Http, TransportProtocol::Quic, TransportProtocol::WebSocket] +)] +#[tokio::test] +#[parallel] +async fn should_encrypt_and_decrypt_headers_with_client_side_encryption( + transport: TransportProtocol, +) { + let mut harness = TestHarness::builder() + .server(TestServerConfig::default()) + .build() + .unwrap(); + + harness.start().await.unwrap(); + + let setup_client = harness.tcp_root_client().await.unwrap(); + let stream_name = format!("client-enc-{transport}"); + let topic_name = "enc-topic"; + + setup_client.create_stream(&stream_name).await.unwrap(); + setup_client + .create_topic( + &Identifier::named(&stream_name).unwrap(), + topic_name, + 1, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + let encryptor = Arc::new(EncryptorKind::Aes256Gcm( + Aes256GcmEncryptor::new(&[42u8; 32]).unwrap(), + )); + + let encrypting_client = harness + .client_builder_for(transport) + .unwrap() + .with_encryptor(encryptor) + .with_root_login() + .connect() + .await + .unwrap(); + + let stream_id = Identifier::named(&stream_name).unwrap(); + let topic_id = Identifier::named(topic_name).unwrap(); + + let mut messages = Vec::new(); + for i in 0..10i64 { + let mut headers = HashMap::new(); + headers.insert(HeaderKey::try_from("index").unwrap(), HeaderValue::from(i)); + headers.insert( + HeaderKey::try_from("transport").unwrap(), + HeaderValue::try_from(transport.to_string().as_str()).unwrap(), + ); + + messages.push( + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(format!("client encrypted msg {i}"))) + .user_headers(headers) + .build() + .unwrap(), + ); + } + + encrypting_client - .send_messages(&stream_id, &topic_id, &Partitioning::partition_id(0), &mut messages) ++ .send_messages( ++ &stream_id, ++ &topic_id, ++ &Partitioning::partition_id(0), ++ &mut messages, ++ ) + .await + .unwrap(); + + let polled = encrypting_client + .poll_messages( + &stream_id, + &topic_id, + Some(0), + &Consumer::default(), + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + + assert_eq!(polled.messages.len(), 10); + for (i, msg) in polled.messages.iter().enumerate() { + assert_eq!( + std::str::from_utf8(&msg.payload).unwrap(), + format!("client encrypted msg {i}") + ); + + let headers = msg.user_headers_map().unwrap().unwrap(); + assert_eq!( + headers + .get(&HeaderKey::try_from("index").unwrap()) + .unwrap() + .as_int64() + .unwrap(), + i as i64 + ); + assert_eq!( + headers + .get(&HeaderKey::try_from("transport").unwrap()) + .unwrap() + .as_str() + .unwrap(), + transport.to_string().as_str() + ); + } + - // Poll with a plain client (no encryptor) — payload and headers should be unreadable - let plain_client = harness.root_client_for(transport).await.unwrap(); - let polled_plain = plain_client ++ let client_without_encryptor = harness.root_client_for(transport).await.unwrap(); ++ let polled_without_decryption = client_without_encryptor + .poll_messages( + &stream_id, + &topic_id, + Some(0), + &Consumer::default(), + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + - assert_eq!(polled_plain.messages.len(), 10); - for msg in &polled_plain.messages { ++ assert_eq!(polled_without_decryption.messages.len(), 10); ++ for msg in &polled_without_decryption.messages { + let payload_text = std::str::from_utf8(&msg.payload).unwrap_or(""); + assert!( + !payload_text.starts_with("client encrypted msg"), + "Payload must not be readable without the encryptor" + ); + + let headers = msg.user_headers_map().unwrap(); + assert!( + headers.is_none(), + "Headers must not be parseable without the encryptor" + ); + } +} + +fn encryption_enabled() -> bool { + true +} + +fn encryption_disabled() -> bool { + false +} + +fn build_server_config(encryption: bool) -> TestServerConfig { + let mut extra_envs = HashMap::new(); + + if encryption { + extra_envs.insert( + "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(), + "true".to_string(), + ); + extra_envs.insert( + "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(), + "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(), + ); + } + + TestServerConfig::builder().extra_envs(extra_envs).build() +} + +fn find_log_files(dir: &Path) -> Vec<PathBuf> { + let mut result = Vec::new(); + if let Ok(entries) = std::fs::read_dir(dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + result.extend(find_log_files(&path)); + } else if path.extension().and_then(|e| e.to_str()) == Some("log") { + result.push(path); + } + } + } + result +}
