This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6d16e4d32644dc4118e81f7b40c0adfdad3bd9b5 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 27 15:22:32 2026 +0100 fix(server,sdk): fix O(n^2) memory in HTTP SendMessages and missing transport codes HTTP SendMessages serialization allocated a HashMap per message using the total batch count as capacity instead of the actual 3 keys per map. A 10K batch wasted ~9GB of heap. Replace HashMap with serde_json::json! macro which builds the object directly without hashing. Also add missing HTTP (3) and WebSocket (4) transport code mappings in wire_conversions, which returned "Unknown" for these protocols. --- core/common/src/http/messages/send_messages.rs | 18 +++++++++--------- core/common/src/wire_conversions.rs | 2 ++ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/common/src/http/messages/send_messages.rs b/core/common/src/http/messages/send_messages.rs index 0f6b137c6..d01049dae 100644 --- a/core/common/src/http/messages/send_messages.rs +++ b/core/common/src/http/messages/send_messages.rs @@ -25,6 +25,7 @@ use crate::types::message::HeaderEntry; use crate::types::message::partitioning::Partitioning; use crate::{IggyMessage, IggyMessagesBatch}; use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; +use bytes::Bytes; use serde::de::{self, MapAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -88,15 +89,14 @@ impl Serialize for SendMessages { // - messages as an array of {id, payload, headers} // We don't expose stream_id and topic_id via JSON as they're in URL path - let messages: Vec<HashMap<&str, serde_json::Value>> = self + let messages: Vec<serde_json::Value> = self .batch .iter() .map(|msg_view: IggyMessageView<'_>| { - let mut map = HashMap::with_capacity(self.batch.count() as usize); - map.insert("id", serde_json::to_value(msg_view.header().id()).unwrap()); - - let payload_base64 = BASE64.encode(msg_view.payload()); - map.insert("payload", serde_json::to_value(payload_base64).unwrap()); + let mut obj = serde_json::json!({ + "id": msg_view.header().id(), + "payload": BASE64.encode(msg_view.payload()), + }); match msg_view.user_headers_map() { Ok(Some(headers)) => { @@ -104,16 +104,16 @@ impl Serialize for SendMessages { .into_iter() .map(|(k, v)| HeaderEntry { key: k, value: v }) .collect(); - map.insert("user_headers", serde_json::to_value(&entries).unwrap()); + obj["user_headers"] = serde_json::to_value(&entries).unwrap(); } _ if msg_view.user_headers().is_some() => { let raw_base64 = BASE64.encode(msg_view.user_headers().unwrap()); - map.insert("user_headers", serde_json::to_value(raw_base64).unwrap()); + obj["user_headers"] = serde_json::to_value(raw_base64).unwrap(); } _ => {} } - map + obj }) .collect(); diff --git a/core/common/src/wire_conversions.rs b/core/common/src/wire_conversions.rs index eaa728cdb..fc2cc2333 100644 --- a/core/common/src/wire_conversions.rs +++ b/core/common/src/wire_conversions.rs @@ -262,6 +262,8 @@ impl From<ClientResponse> for ClientInfo { let transport = match w.transport { 1 => "TCP", 2 => "QUIC", + 3 => "HTTP", + 4 => "WebSocket", _ => "Unknown", } .to_string();
