This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch headers_encryption
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b62da5b862e78b9639fba7b8135e0d7503ff963e
Merge: 10af6bf1e 169a09bcd
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Mon Mar 30 09:12:43 2026 +0200

    Merge branch 'master' into headers_encryption

 core/common/src/commands/messages/send_messages.rs |  16 +-
 core/common/src/types/message/iggy_message.rs      |   8 +-
 .../tests/server/scenarios/encryption_scenario.rs  |  16 +-
 foreign/go/contracts/users.go                      |  25 +-
 foreign/go/contracts/users_test.go                 | 746 +++++++++++++++++++++
 5 files changed, 788 insertions(+), 23 deletions(-)

diff --cc core/common/src/commands/messages/send_messages.rs
index 75606a4f2,91a6aa43c..2943340e9
--- a/core/common/src/commands/messages/send_messages.rs
+++ b/core/common/src/commands/messages/send_messages.rs
@@@ -317,38 -310,29 +317,38 @@@ impl<'de> Deserialize<'de> for SendMess
                                      .decode(payload)
                                      .map_err(|_| de::Error::custom("Invalid 
base64 payload"))?;
  
 -                                let headers_map = if let Some(headers) = 
msg.get("user_headers") {
 -                                    if headers.is_null() {
 -                                        None
 -                                    } else {
 -                                        let entries: Vec<HeaderEntry> = 
serde_json::from_value(
 -                                            headers.clone(),
 -                                        )
 -                                        .map_err(|e| {
 -                                            de::Error::custom(format!(
 -                                                "Invalid headers format: {e}"
 -                                            ))
 -                                        })?;
 -                                        let mut map = HashMap::new();
 -                                        for entry in entries {
 -                                            map.insert(entry.key, 
entry.value);
 +                                let (headers_map, raw_headers) =
 +                                    if let Some(headers) = 
msg.get("user_headers") {
 +                                        if headers.is_null() {
 +                                            (None, None)
 +                                        } else if let Some(base64_str) = 
headers.as_str() {
 +                                            // Raw base64-encoded header 
bytes (e.g. client-side encrypted)
 +                                            let raw = 
BASE64.decode(base64_str).map_err(|e| {
 +                                                de::Error::custom(format!(
 +                                                    "Invalid base64 headers: 
{e}"
 +                                                ))
 +                                            })?;
 +                                            (None, Some(Bytes::from(raw)))
 +                                        } else {
-                                             let entries: Vec<HeaderEntry> =
-                                                 
serde_json::from_value(headers.clone()).map_err(
-                                                     |e| {
-                                                         
de::Error::custom(format!(
-                                                             "Invalid headers 
format: {e}"
-                                                         ))
-                                                     },
-                                                 )?;
++                                            let entries: Vec<HeaderEntry> = 
serde_json::from_value(
++                                                headers.clone(),
++                                            )
++                                            .map_err(|e| {
++                                                de::Error::custom(format!(
++                                                    "Invalid headers format: 
{e}"
++                                                ))
++                                            })?;
 +                                            let mut map = HashMap::new();
 +                                            for entry in entries {
 +                                                map.insert(entry.key, 
entry.value);
 +                                            }
 +                                            (Some(map), None)
                                          }
 -                                        Some(map)
 -                                    }
 -                                } else {
 -                                    None
 -                                };
 +                                    } else {
 +                                        (None, None)
 +                                    };
  
 -                                let iggy_message = if let Some(headers) = 
headers_map {
 +                                let mut iggy_message = if let Some(headers) = 
headers_map {
                                      IggyMessage::builder()
                                          .id(id)
                                          .payload(payload_bytes.into())
diff --cc core/common/src/types/message/iggy_message.rs
index 08bac4342,12a2c1afc..4fc214d28
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@@ -617,30 -610,12 +617,28 @@@ impl<'de> Deserialize<'de> for IggyMess
                              payload = Some(Bytes::from(decoded));
                          }
                          "user_headers" => {
 -                            let entries: Vec<HeaderEntry> = map.next_value()?;
 -                            let mut headers_map = HashMap::new();
 -                            for entry in entries {
 -                                headers_map.insert(entry.key, entry.value);
 +                            // Try as array of HeaderEntry first, fall back 
to base64 string
 +                            let value: serde_json::Value = map.next_value()?;
 +                            if let Some(base64_str) = value.as_str() {
 +                                use base64::{Engine as _, 
engine::general_purpose::STANDARD};
 +                                let decoded =
 +                                    
STANDARD.decode(base64_str.as_bytes()).map_err(|e| {
 +                                        de::Error::custom(format!(
 +                                            "Failed to decode base64 headers: 
{e}"
 +                                        ))
 +                                    })?;
 +                                raw_user_headers = Some(Bytes::from(decoded));
 +                            } else if value.is_array() {
-                                 let entries: Vec<HeaderEntry> =
-                                     serde_json::from_value(value).map_err(|e| 
{
-                                         de::Error::custom(format!(
-                                             "Invalid headers format: {e}"
-                                         ))
++                                let entries: Vec<HeaderEntry> = 
serde_json::from_value(value)
++                                    .map_err(|e| {
++                                        de::Error::custom(format!("Invalid 
headers format: {e}"))
 +                                    })?;
 +                                let mut headers_map = HashMap::new();
 +                                for entry in entries {
 +                                    headers_map.insert(entry.key, 
entry.value);
 +                                }
 +                                user_headers = Some(headers_map);
                              }
 -                            user_headers = Some(headers_map);
                          }
                          _ => {
                              let _ = map.next_value::<de::IgnoredAny>()?;
diff --cc core/integration/tests/server/scenarios/encryption_scenario.rs
index 8eb2ecbf0,f03741c44..6881f5f8c
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@@ -332,186 -317,3 +332,190 @@@ async fn should_fill_data_with_headers_
          initial_messages_size.as_bytes_u64()
      );
  }
 +
 +#[test_matrix(
 +    [TransportProtocol::Tcp, TransportProtocol::Http, 
TransportProtocol::Quic, TransportProtocol::WebSocket]
 +)]
 +#[tokio::test]
 +#[parallel]
 +async fn should_encrypt_and_decrypt_headers_with_client_side_encryption(
 +    transport: TransportProtocol,
 +) {
 +    let mut harness = TestHarness::builder()
 +        .server(TestServerConfig::default())
 +        .build()
 +        .unwrap();
 +
 +    harness.start().await.unwrap();
 +
 +    let setup_client = harness.tcp_root_client().await.unwrap();
 +    let stream_name = format!("client-enc-{transport}");
 +    let topic_name = "enc-topic";
 +
 +    setup_client.create_stream(&stream_name).await.unwrap();
 +    setup_client
 +        .create_topic(
 +            &Identifier::named(&stream_name).unwrap(),
 +            topic_name,
 +            1,
 +            CompressionAlgorithm::default(),
 +            None,
 +            IggyExpiry::NeverExpire,
 +            MaxTopicSize::ServerDefault,
 +        )
 +        .await
 +        .unwrap();
 +
 +    let encryptor = Arc::new(EncryptorKind::Aes256Gcm(
 +        Aes256GcmEncryptor::new(&[42u8; 32]).unwrap(),
 +    ));
 +
 +    let encrypting_client = harness
 +        .client_builder_for(transport)
 +        .unwrap()
 +        .with_encryptor(encryptor)
 +        .with_root_login()
 +        .connect()
 +        .await
 +        .unwrap();
 +
 +    let stream_id = Identifier::named(&stream_name).unwrap();
 +    let topic_id = Identifier::named(topic_name).unwrap();
 +
 +    let mut messages = Vec::new();
 +    for i in 0..10i64 {
 +        let mut headers = HashMap::new();
 +        headers.insert(HeaderKey::try_from("index").unwrap(), 
HeaderValue::from(i));
 +        headers.insert(
 +            HeaderKey::try_from("transport").unwrap(),
 +            HeaderValue::try_from(transport.to_string().as_str()).unwrap(),
 +        );
 +
 +        messages.push(
 +            IggyMessage::builder()
 +                .id((i + 1) as u128)
 +                .payload(Bytes::from(format!("client encrypted msg {i}")))
 +                .user_headers(headers)
 +                .build()
 +                .unwrap(),
 +        );
 +    }
 +
 +    encrypting_client
-         .send_messages(&stream_id, &topic_id, &Partitioning::partition_id(0), 
&mut messages)
++        .send_messages(
++            &stream_id,
++            &topic_id,
++            &Partitioning::partition_id(0),
++            &mut messages,
++        )
 +        .await
 +        .unwrap();
 +
 +    let polled = encrypting_client
 +        .poll_messages(
 +            &stream_id,
 +            &topic_id,
 +            Some(0),
 +            &Consumer::default(),
 +            &PollingStrategy::offset(0),
 +            10,
 +            false,
 +        )
 +        .await
 +        .unwrap();
 +
 +    assert_eq!(polled.messages.len(), 10);
 +    for (i, msg) in polled.messages.iter().enumerate() {
 +        assert_eq!(
 +            std::str::from_utf8(&msg.payload).unwrap(),
 +            format!("client encrypted msg {i}")
 +        );
 +
 +        let headers = msg.user_headers_map().unwrap().unwrap();
 +        assert_eq!(
 +            headers
 +                .get(&HeaderKey::try_from("index").unwrap())
 +                .unwrap()
 +                .as_int64()
 +                .unwrap(),
 +            i as i64
 +        );
 +        assert_eq!(
 +            headers
 +                .get(&HeaderKey::try_from("transport").unwrap())
 +                .unwrap()
 +                .as_str()
 +                .unwrap(),
 +            transport.to_string().as_str()
 +        );
 +    }
 +
-     // Poll with a plain client (no encryptor) — payload and headers should 
be unreadable
-     let plain_client = harness.root_client_for(transport).await.unwrap();
-     let polled_plain = plain_client
++    let client_without_encryptor = 
harness.root_client_for(transport).await.unwrap();
++    let polled_without_decryption = client_without_encryptor
 +        .poll_messages(
 +            &stream_id,
 +            &topic_id,
 +            Some(0),
 +            &Consumer::default(),
 +            &PollingStrategy::offset(0),
 +            10,
 +            false,
 +        )
 +        .await
 +        .unwrap();
 +
-     assert_eq!(polled_plain.messages.len(), 10);
-     for msg in &polled_plain.messages {
++    assert_eq!(polled_without_decryption.messages.len(), 10);
++    for msg in &polled_without_decryption.messages {
 +        let payload_text = std::str::from_utf8(&msg.payload).unwrap_or("");
 +        assert!(
 +            !payload_text.starts_with("client encrypted msg"),
 +            "Payload must not be readable without the encryptor"
 +        );
 +
 +        let headers = msg.user_headers_map().unwrap();
 +        assert!(
 +            headers.is_none(),
 +            "Headers must not be parseable without the encryptor"
 +        );
 +    }
 +}
 +
 +fn encryption_enabled() -> bool {
 +    true
 +}
 +
 +fn encryption_disabled() -> bool {
 +    false
 +}
 +
 +fn build_server_config(encryption: bool) -> TestServerConfig {
 +    let mut extra_envs = HashMap::new();
 +
 +    if encryption {
 +        extra_envs.insert(
 +            "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
 +            "true".to_string(),
 +        );
 +        extra_envs.insert(
 +            "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
 +            "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
 +        );
 +    }
 +
 +    TestServerConfig::builder().extra_envs(extra_envs).build()
 +}
 +
 +fn find_log_files(dir: &Path) -> Vec<PathBuf> {
 +    let mut result = Vec::new();
 +    if let Ok(entries) = std::fs::read_dir(dir) {
 +        for entry in entries.flatten() {
 +            let path = entry.path();
 +            if path.is_dir() {
 +                result.extend(find_log_files(&path));
 +            } else if path.extension().and_then(|e| e.to_str()) == 
Some("log") {
 +                result.push(path);
 +            }
 +        }
 +    }
 +    result
 +}

Reply via email to