This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-7-http
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 056cff621975181226c91804d2999eb5837e73ba
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Mar 27 11:34:03 2026 +0100

    refactor(sdk): delete BytesSerializable trait entirely
    
    BytesSerializable conflated wire protocol encoding with
    domain type serialization, duplicating WireEncode/WireDecode
    which already existed in iggy_binary_protocol. SDK poll/send
    paths built request bytes through 4-5 intermediate allocations
    per BytesSerializable::to_bytes() call.
    
    SDK poll_messages now constructs PollMessagesRequest directly
    from wire conversion functions. SDK send_messages uses the
    zero-copy SendMessagesEncoder with borrowed RawMessage refs.
    IggyMessage/IggyMessageHeader retain to_bytes/from_bytes as
    inherent methods for CLI and encryption callers. HashMap
    header serialization becomes standalone functions since
    inherent methods cannot be added to foreign types.
---
 .../src/commands/binary_message/poll_messages.rs   |   8 +-
 .../src/commands/binary_message/send_messages.rs   |   4 +-
 core/common/src/http/messages/poll_messages.rs     |  44 +---
 core/common/src/http/messages/send_messages.rs     |  78 +------
 core/common/src/lib.rs                             |   1 -
 core/common/src/traits/binary_impls/messages.rs    |  68 ++++--
 core/common/src/traits/bytes_serializable.rs       |  41 ----
 core/common/src/traits/mod.rs                      |   1 -
 core/common/src/types/consumer/consumer_kind.rs    |  27 ---
 core/common/src/types/identifier/mod.rs            |  76 +-----
 core/common/src/types/message/iggy_message.rs      | 137 ++++++-----
 core/common/src/types/message/message_header.rs    |   8 +-
 core/common/src/types/message/message_view.rs      |   4 +-
 core/common/src/types/message/messages_batch.rs    |   4 +-
 .../common/src/types/message/messages_batch_mut.rs |   6 +-
 core/common/src/types/message/mod.rs               |   3 +-
 core/common/src/types/message/partitioning.rs      |  71 +-----
 core/common/src/types/message/polled_messages.rs   |   4 +-
 core/common/src/types/message/polling_strategy.rs  |  27 ---
 core/common/src/types/message/user_headers.rs      | 178 +++++++-------
 .../src/types/permissions/permissions_global.rs    | 258 ---------------------
 core/common/src/wire_conversions.rs                |  27 +++
 .../tests/server/scenarios/offset_scenario.rs      |   4 +-
 .../tests/server/scenarios/timestamp_scenario.rs   |   4 +-
 core/sdk/src/prelude.rs                            |   9 +-
 core/server/src/shard/system/messages.rs           |   3 +-
 core/server/src/state/models.rs                    |   2 +-
 .../message-compression/consumer/main.rs           |   2 +-
 28 files changed, 266 insertions(+), 833 deletions(-)

diff --git a/core/cli/src/commands/binary_message/poll_messages.rs 
b/core/cli/src/commands/binary_message/poll_messages.rs
index 3cb4c4abe..1b0a18422 100644
--- a/core/cli/src/commands/binary_message/poll_messages.rs
+++ b/core/cli/src/commands/binary_message/poll_messages.rs
@@ -22,10 +22,10 @@ use async_trait::async_trait;
 use comfy_table::{Cell, CellAlignment, Row, Table};
 use iggy_common::Client;
 use iggy_common::{
-    BytesSerializable, Consumer, HeaderKey, HeaderKind, HeaderValue, 
Identifier, IggyByteSize,
-    IggyDuration, IggyMessage, IggyTimestamp, PollMessages, PollingStrategy, 
Sizeable,
+    Consumer, HeaderKey, HeaderKind, Identifier, IggyByteSize, IggyDuration, 
IggyMessage,
+    IggyTimestamp, PollMessages, PollingStrategy, Sizeable, 
user_headers_from_bytes,
 };
-use std::collections::{HashMap, HashSet};
+use std::collections::HashSet;
 use tokio::io::AsyncWriteExt;
 use tracing::{Level, event};
 
@@ -85,7 +85,7 @@ impl PollMessagesCmd {
             .iter()
             .flat_map(|m| {
                 if let Some(user_headers) = &m.user_headers {
-                    match HashMap::<HeaderKey, 
HeaderValue>::from_bytes(user_headers.clone()) {
+                    match user_headers_from_bytes(user_headers.clone()) {
                         Ok(headers) => headers
                             .iter()
                             .map(|(k, v)| (k.clone(), v.kind()))
diff --git a/core/cli/src/commands/binary_message/send_messages.rs 
b/core/cli/src/commands/binary_message/send_messages.rs
index 9dbdcbe22..ebbd13c5d 100644
--- a/core/cli/src/commands/binary_message/send_messages.rs
+++ b/core/cli/src/commands/binary_message/send_messages.rs
@@ -21,9 +21,7 @@ use anyhow::Context;
 use async_trait::async_trait;
 use bytes::Bytes;
 use iggy_common::Client;
-use iggy_common::{
-    BytesSerializable, HeaderKey, HeaderValue, Identifier, IggyMessage, 
Partitioning, Sizeable,
-};
+use iggy_common::{HeaderKey, HeaderValue, Identifier, IggyMessage, 
Partitioning, Sizeable};
 use std::collections::HashMap;
 use std::io::{self, Read};
 use tokio::io::AsyncReadExt;
diff --git a/core/common/src/http/messages/poll_messages.rs 
b/core/common/src/http/messages/poll_messages.rs
index e8ca86539..d0d2e7c0a 100644
--- a/core/common/src/http/messages/poll_messages.rs
+++ b/core/common/src/http/messages/poll_messages.rs
@@ -18,8 +18,7 @@
 
 use crate::Consumer;
 use crate::error::IggyError;
-use crate::{BytesSerializable, Identifier, PollingStrategy, Validatable};
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::{Identifier, PollingStrategy, Validatable};
 use serde::{Deserialize, Serialize};
 
 pub const DEFAULT_PARTITION_ID: u32 = 0;
@@ -60,47 +59,6 @@ pub struct PollMessages {
 }
 
 impl PollMessages {
-    pub fn bytes(
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partition_id: Option<u32>,
-        consumer: &Consumer,
-        strategy: &PollingStrategy,
-        count: u32,
-        auto_commit: bool,
-    ) -> Bytes {
-        let consumer_bytes = consumer.to_bytes();
-        let stream_id_bytes = stream_id.to_bytes();
-        let topic_id_bytes = topic_id.to_bytes();
-        let strategy_bytes = strategy.to_bytes();
-        let mut bytes = BytesMut::with_capacity(
-            10 + consumer_bytes.len()
-                + stream_id_bytes.len()
-                + topic_id_bytes.len()
-                + strategy_bytes.len(),
-        );
-        bytes.put_slice(&consumer_bytes);
-        bytes.put_slice(&stream_id_bytes);
-        bytes.put_slice(&topic_id_bytes);
-        // Encode partition_id with a flag byte: 1 = Some, 0 = None
-        if let Some(partition_id) = partition_id {
-            bytes.put_u8(1);
-            bytes.put_u32_le(partition_id);
-        } else {
-            bytes.put_u8(0);
-            bytes.put_u32_le(0); // Padding to keep structure consistent
-        }
-        bytes.put_slice(&strategy_bytes);
-        bytes.put_u32_le(count);
-        if auto_commit {
-            bytes.put_u8(1);
-        } else {
-            bytes.put_u8(0);
-        }
-
-        bytes.freeze()
-    }
-
     pub fn default_number_of_messages_to_poll() -> u32 {
         DEFAULT_NUMBER_OF_MESSAGES_TO_POLL
     }
diff --git a/core/common/src/http/messages/send_messages.rs 
b/core/common/src/http/messages/send_messages.rs
index 0168caa2d..0f6b137c6 100644
--- a/core/common/src/http/messages/send_messages.rs
+++ b/core/common/src/http/messages/send_messages.rs
@@ -16,18 +16,15 @@
  * under the License.
  */
 
-use crate::BytesSerializable;
 use crate::Identifier;
 use crate::IggyMessageView;
 use crate::PartitioningKind;
-use crate::Sizeable;
 use crate::Validatable;
 use crate::error::IggyError;
 use crate::types::message::HeaderEntry;
 use crate::types::message::partitioning::Partitioning;
-use crate::{INDEX_SIZE, IggyMessage, IggyMessagesBatch};
+use crate::{IggyMessage, IggyMessagesBatch};
 use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
-use bytes::{BufMut, Bytes, BytesMut};
 use serde::de::{self, MapAccess, Visitor};
 use serde::ser::SerializeStruct;
 use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -54,79 +51,6 @@ pub struct SendMessages {
     pub batch: IggyMessagesBatch,
 }
 
-impl SendMessages {
-    pub fn bytes(
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partitioning: &Partitioning,
-        messages: &[IggyMessage],
-    ) -> Bytes {
-        let stream_id_field_size = stream_id.get_buffer_size();
-        let topic_id_field_size = topic_id.get_buffer_size();
-        let partitioning_field_size = partitioning.get_buffer_size();
-        let metadata_length_field_size = size_of::<u32>();
-        let messages_count = messages.len();
-        let messages_count_field_size = size_of::<u32>();
-        let metadata_length = stream_id_field_size
-            + topic_id_field_size
-            + partitioning_field_size
-            + messages_count_field_size;
-        let indexes_size = messages_count * INDEX_SIZE;
-        let messages_size = messages
-            .iter()
-            .map(|m| m.get_size_bytes().as_bytes_usize())
-            .sum::<usize>();
-
-        let total_size = metadata_length_field_size
-            + stream_id_field_size
-            + topic_id_field_size
-            + partitioning_field_size
-            + messages_count_field_size
-            + indexes_size
-            + messages_size;
-
-        let mut bytes = BytesMut::with_capacity(total_size);
-
-        bytes.put_u32_le(metadata_length as u32);
-        stream_id.write_to_buffer(&mut bytes);
-        topic_id.write_to_buffer(&mut bytes);
-        partitioning.write_to_buffer(&mut bytes);
-        bytes.put_u32_le(messages_count as u32);
-
-        let mut current_position = bytes.len();
-
-        bytes.put_bytes(0, indexes_size);
-
-        let mut msg_size: u32 = 0;
-        for message in messages.iter() {
-            message.write_to_buffer(&mut bytes);
-            msg_size += message.get_size_bytes().as_bytes_u64() as u32;
-            write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position);
-            write_value_at(&mut bytes, msg_size.to_le_bytes(), 
current_position + 4);
-            write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position + 
8);
-            current_position += INDEX_SIZE;
-        }
-
-        let out = bytes.freeze();
-
-        debug_assert_eq!(
-            total_size,
-            out.len(),
-            "Calculated SendMessages command byte size doesn't match actual 
command size",
-        );
-
-        out
-    }
-}
-
-fn write_value_at<const N: usize>(slice: &mut [u8], value: [u8; N], position: 
usize) {
-    let slice = &mut slice[position..position + N];
-    let ptr = slice.as_mut_ptr();
-    unsafe {
-        std::ptr::copy_nonoverlapping(value.as_ptr(), ptr, N);
-    }
-}
-
 impl Default for SendMessages {
     fn default() -> Self {
         SendMessages {
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index f094461f3..dca080e70 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -54,7 +54,6 @@ pub use sender::{
 };
 pub use traits::binary_client::BinaryClient;
 pub use traits::binary_transport::BinaryTransport;
-pub use traits::bytes_serializable::BytesSerializable;
 pub use traits::client::Client;
 pub use traits::cluster_client::ClusterClient;
 pub use traits::consumer_group_client::ConsumerGroupClient;
diff --git a/core/common/src/traits/binary_impls/messages.rs 
b/core/common/src/traits/binary_impls/messages.rs
index 623f0af90..d77d07b61 100644
--- a/core/common/src/traits/binary_impls/messages.rs
+++ b/core/common/src/traits/binary_impls/messages.rs
@@ -17,16 +17,21 @@
  */
 use crate::BinaryClient;
 use crate::traits::binary_auth::fail_if_not_authenticated;
-use crate::wire_conversions::identifier_to_wire;
+use crate::wire_conversions::{
+    consumer_to_wire, identifier_to_wire, partitioning_to_wire, 
polling_strategy_to_wire,
+};
 use crate::{
-    Consumer, Identifier, IggyError, IggyMessage, MessageClient, Partitioning, 
PollMessages,
-    PolledMessages, PollingStrategy, SendMessages,
+    Consumer, Identifier, IggyError, IggyMessage, MessageClient, Partitioning, 
PolledMessages,
+    PollingStrategy,
 };
+use bytes::BytesMut;
 use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     FLUSH_UNSAVED_BUFFER_CODE, POLL_MESSAGES_CODE, SEND_MESSAGES_CODE,
 };
-use iggy_binary_protocol::requests::messages::FlushUnsavedBufferRequest;
+use iggy_binary_protocol::requests::messages::{
+    FlushUnsavedBufferRequest, PollMessagesRequest, RawMessage, 
SendMessagesEncoder,
+};
 
 #[async_trait::async_trait]
 impl<B: BinaryClient> MessageClient for B {
@@ -41,19 +46,17 @@ impl<B: BinaryClient> MessageClient for B {
         auto_commit: bool,
     ) -> Result<PolledMessages, IggyError> {
         fail_if_not_authenticated(self).await?;
+        let req = PollMessagesRequest {
+            consumer: consumer_to_wire(consumer)?,
+            stream_id: identifier_to_wire(stream_id)?,
+            topic_id: identifier_to_wire(topic_id)?,
+            partition_id,
+            strategy: polling_strategy_to_wire(strategy),
+            count,
+            auto_commit,
+        };
         let response = self
-            .send_raw_with_response(
-                POLL_MESSAGES_CODE,
-                PollMessages::bytes(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    consumer,
-                    strategy,
-                    count,
-                    auto_commit,
-                ),
-            )
+            .send_raw_with_response(POLL_MESSAGES_CODE, req.to_bytes())
             .await?;
         PolledMessages::from_bytes(response)
     }
@@ -66,11 +69,34 @@ impl<B: BinaryClient> MessageClient for B {
         messages: &mut [IggyMessage],
     ) -> Result<(), IggyError> {
         fail_if_not_authenticated(self).await?;
-        self.send_raw_with_response(
-            SEND_MESSAGES_CODE,
-            SendMessages::bytes(stream_id, topic_id, partitioning, messages),
-        )
-        .await?;
+        let wire_stream_id = identifier_to_wire(stream_id)?;
+        let wire_topic_id = identifier_to_wire(topic_id)?;
+        let wire_partitioning = partitioning_to_wire(partitioning);
+        let raw_messages: Vec<RawMessage<'_>> = messages
+            .iter()
+            .map(|m| RawMessage {
+                id: m.header.id,
+                origin_timestamp: m.header.origin_timestamp,
+                headers: m.user_headers.as_deref(),
+                payload: &m.payload,
+            })
+            .collect();
+        let size = SendMessagesEncoder::encoded_size(
+            &wire_stream_id,
+            &wire_topic_id,
+            &wire_partitioning,
+            &raw_messages,
+        );
+        let mut buf = BytesMut::with_capacity(size);
+        SendMessagesEncoder::encode(
+            &mut buf,
+            &wire_stream_id,
+            &wire_topic_id,
+            &wire_partitioning,
+            &raw_messages,
+        );
+        self.send_raw_with_response(SEND_MESSAGES_CODE, buf.freeze())
+            .await?;
         Ok(())
     }
 
diff --git a/core/common/src/traits/bytes_serializable.rs 
b/core/common/src/traits/bytes_serializable.rs
deleted file mode 100644
index 06bbcaba2..000000000
--- a/core/common/src/traits/bytes_serializable.rs
+++ /dev/null
@@ -1,41 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::IggyError;
-use bytes::{Bytes, BytesMut};
-
-/// The trait represents the logic responsible for serializing and 
deserializing the struct to and from bytes.
-pub trait BytesSerializable {
-    /// Serializes the struct to bytes.
-    fn to_bytes(&self) -> Bytes;
-
-    /// Deserializes the struct from bytes.
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized;
-
-    /// Write the struct to a buffer.
-    fn write_to_buffer(&self, _buf: &mut BytesMut) {
-        unimplemented!();
-    }
-
-    /// Get the byte-size of the struct.
-    fn get_buffer_size(&self) -> usize {
-        unimplemented!();
-    }
-}
diff --git a/core/common/src/traits/mod.rs b/core/common/src/traits/mod.rs
index 7e30f99f1..1621fdb8a 100644
--- a/core/common/src/traits/mod.rs
+++ b/core/common/src/traits/mod.rs
@@ -19,7 +19,6 @@ pub(crate) mod binary_auth;
 pub(crate) mod binary_client;
 mod binary_impls;
 pub(crate) mod binary_transport;
-pub(crate) mod bytes_serializable;
 pub(crate) mod client;
 pub(crate) mod cluster_client;
 pub(crate) mod consumer_group_client;
diff --git a/core/common/src/types/consumer/consumer_kind.rs 
b/core/common/src/types/consumer/consumer_kind.rs
index 65a4ffd3e..96b3fbb7b 100644
--- a/core/common/src/types/consumer/consumer_kind.rs
+++ b/core/common/src/types/consumer/consumer_kind.rs
@@ -16,11 +16,9 @@
  * under the License.
  */
 
-use crate::BytesSerializable;
 use crate::Identifier;
 use crate::Validatable;
 use crate::error::IggyError;
-use bytes::{BufMut, Bytes, BytesMut};
 use clap::ValueEnum;
 use serde::{Deserialize, Deserializer, Serialize};
 use std::fmt::Display;
@@ -93,31 +91,6 @@ impl Consumer {
     }
 }
 
-impl BytesSerializable for Consumer {
-    fn to_bytes(&self) -> Bytes {
-        let id_bytes = self.id.to_bytes();
-        let mut bytes = BytesMut::with_capacity(1 + id_bytes.len());
-        bytes.put_u8(self.kind.as_code());
-        bytes.put_slice(&id_bytes);
-        bytes.freeze()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        if bytes.len() < 4 {
-            return Err(IggyError::InvalidCommand);
-        }
-
-        let kind = ConsumerKind::from_code(bytes[0])?;
-        let id = Identifier::from_bytes(bytes.slice(1..))?;
-        let consumer = Consumer { kind, id };
-        consumer.validate()?;
-        Ok(consumer)
-    }
-}
-
 /// `ConsumerKind` is an enum that represents the type of consumer.
 impl ConsumerKind {
     /// Returns the code of the `ConsumerKind`.
diff --git a/core/common/src/types/identifier/mod.rs 
b/core/common/src/types/identifier/mod.rs
index 82c41eb8a..e80a39215 100644
--- a/core/common/src/types/identifier/mod.rs
+++ b/core/common/src/types/identifier/mod.rs
@@ -16,12 +16,10 @@
  * under the License.
  */
 
-use crate::BytesSerializable;
 use crate::Sizeable;
 use crate::Validatable;
 use crate::error::IggyError;
 use crate::utils::byte_size::IggyByteSize;
-use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
 use serde_with::base64::Base64;
 use serde_with::serde_as;
@@ -215,46 +213,6 @@ impl Sizeable for Identifier {
     }
 }
 
-impl BytesSerializable for Identifier {
-    fn to_bytes(&self) -> Bytes {
-        let mut bytes = BytesMut::with_capacity(2 + self.length as usize);
-        bytes.put_u8(self.kind.as_code());
-        bytes.put_u8(self.length);
-        bytes.put_slice(&self.value);
-        bytes.freeze()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        let kind = 
IdKind::from_code(*bytes.first().ok_or(IggyError::InvalidIdentifier)?)?;
-        let length = *bytes.get(1).ok_or(IggyError::InvalidIdentifier)?;
-        let value = bytes
-            .get(2..2 + length as usize)
-            .ok_or(IggyError::InvalidIdentifier)?
-            .to_vec();
-
-        let identifier = Identifier {
-            kind,
-            length,
-            value,
-        };
-        identifier.validate()?;
-        Ok(identifier)
-    }
-
-    fn write_to_buffer(&self, bytes: &mut BytesMut) {
-        bytes.put_u8(self.kind.as_code());
-        bytes.put_u8(self.length);
-        bytes.put_slice(&self.value);
-    }
-
-    fn get_buffer_size(&self) -> usize {
-        2 + self.length as usize
-    }
-}
-
 impl IdKind {
     /// Returns the code of the identifier kind.
     pub fn as_code(&self) -> u8 {
@@ -378,33 +336,6 @@ mod tests {
         assert!(Identifier::named(&"a".repeat(256)).is_err());
     }
 
-    #[test]
-    fn from_bytes_should_fail_on_empty_input() {
-        assert!(Identifier::from_bytes(Bytes::new()).is_err());
-    }
-
-    #[test]
-    fn from_bytes_should_fail_on_truncated_input() {
-        let id = Identifier::numeric(42).unwrap();
-        let bytes = id.to_bytes();
-        for i in 0..bytes.len() - 1 {
-            let truncated = bytes.slice(..i);
-            assert!(
-                Identifier::from_bytes(truncated).is_err(),
-                "expected error for truncation at byte {i}"
-            );
-        }
-    }
-
-    #[test]
-    fn from_bytes_should_fail_on_corrupted_length() {
-        let mut buf = BytesMut::new();
-        buf.put_u8(1); // Numeric kind
-        buf.put_u8(255); // length = 255 but only 2 bytes of value follow
-        buf.put_u16_le(0);
-        assert!(Identifier::from_bytes(buf.freeze()).is_err());
-    }
-
     #[test]
     fn from_raw_bytes_should_fail_on_empty_input() {
         assert!(Identifier::from_raw_bytes(&[]).is_err());
@@ -413,10 +344,11 @@ mod tests {
     #[test]
     fn from_raw_bytes_should_fail_on_truncated_input() {
         let id = Identifier::numeric(42).unwrap();
-        let bytes = id.to_bytes();
-        for i in 0..bytes.len() - 1 {
+        let mut full = vec![id.kind.as_code(), id.length];
+        full.extend_from_slice(&id.value);
+        for i in 0..full.len() - 1 {
             assert!(
-                Identifier::from_raw_bytes(&bytes[..i]).is_err(),
+                Identifier::from_raw_bytes(&full[..i]).is_err(),
                 "expected error for truncation at byte {i}"
             );
         }
diff --git a/core/common/src/types/message/iggy_message.rs 
b/core/common/src/types/message/iggy_message.rs
index 4fc214d28..914fe27dd 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -17,8 +17,7 @@
  */
 
 use super::message_header::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use super::user_headers::get_user_headers_size;
-use crate::BytesSerializable;
+use super::user_headers::{get_user_headers_size, user_headers_from_bytes, 
user_headers_to_bytes};
 use crate::Sizeable;
 use crate::error::IggyError;
 use crate::utils::byte_size::IggyByteSize;
@@ -193,7 +192,7 @@ impl IggyMessage {
             reserved: 0,
         };
 
-        let user_headers = user_headers.map(|h| h.to_bytes());
+        let user_headers = user_headers.map(|h| user_headers_to_bytes(&h));
 
         Ok(Self {
             header,
@@ -236,7 +235,7 @@ impl IggyMessage {
     /// ```
     pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, 
HeaderValue>>, IggyError> {
         if let Some(user_headers) = &self.user_headers {
-            match HashMap::<HeaderKey, 
HeaderValue>::from_bytes(user_headers.clone()) {
+            match user_headers_from_bytes(user_headers.clone()) {
                 Ok(h) => Ok(Some(h)),
                 Err(e) => {
                     warn!(
@@ -354,70 +353,8 @@ impl IggyMessage {
     pub fn payload_as_string(&self) -> Result<String, IggyError> {
         String::from_utf8(self.payload.to_vec()).map_err(|_| 
IggyError::InvalidUtf8)
     }
-}
-
-impl FromStr for IggyMessage {
-    type Err = IggyError;
-
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        Self::builder().payload(Bytes::from(s.to_owned())).build()
-    }
-}
-
-impl std::fmt::Display for IggyMessage {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match String::from_utf8(self.payload.to_vec()) {
-            Ok(payload) => {
-                write!(
-                    f,
-                    "[{offset}] ID:{id} '{preview}'",
-                    offset = self.header.offset,
-                    id = self.header.id,
-                    preview = if payload.len() > 50 {
-                        format!("{}... ({}B)", &payload[..47], 
self.payload.len())
-                    } else {
-                        payload
-                    }
-                )
-            }
-            Err(_) => {
-                write!(
-                    f,
-                    "[{offset}] ID:{id} <binary {payload_len}B>",
-                    offset = self.header.offset,
-                    id = self.header.id,
-                    payload_len = self.payload.len()
-                )
-            }
-        }
-    }
-}
-
-impl Sizeable for IggyMessage {
-    fn get_size_bytes(&self) -> IggyByteSize {
-        let message_header_len = IGGY_MESSAGE_HEADER_SIZE;
-        let payload_len = self.payload.len();
-        let user_headers_len = self.user_headers.as_ref().map(|h| 
h.len()).unwrap_or(0);
-
-        #[cfg(debug_assertions)]
-        {
-            assert_eq!(
-                user_headers_len, self.header.user_headers_length as usize,
-                "user_headers.len() != header.user_headers_length"
-            );
-
-            assert_eq!(
-                payload_len, self.header.payload_length as usize,
-                "payload.len() != header.payload_length"
-            )
-        }
-
-        IggyByteSize::from((message_header_len + payload_len + 
user_headers_len) as u64)
-    }
-}
 
-impl BytesSerializable for IggyMessage {
-    fn to_bytes(&self) -> Bytes {
+    pub fn to_bytes(&self) -> Bytes {
         let mut bytes = 
BytesMut::with_capacity(self.get_size_bytes().as_bytes_usize());
         let message_header = self.header.to_bytes();
         bytes.put_slice(&message_header);
@@ -428,7 +365,7 @@ impl BytesSerializable for IggyMessage {
         bytes.freeze()
     }
 
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
+    pub fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
         if bytes.len() < IGGY_MESSAGE_HEADER_SIZE {
             return Err(IggyError::InvalidCommand);
         }
@@ -478,7 +415,7 @@ impl BytesSerializable for IggyMessage {
         })
     }
 
-    fn write_to_buffer(&self, buf: &mut BytesMut) {
+    pub fn write_to_buffer(&self, buf: &mut BytesMut) {
         buf.put_slice(&self.header.to_bytes());
         buf.put_slice(&self.payload);
         if let Some(user_headers) = &self.user_headers {
@@ -487,6 +424,66 @@ impl BytesSerializable for IggyMessage {
     }
 }
 
+impl FromStr for IggyMessage {
+    type Err = IggyError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Self::builder().payload(Bytes::from(s.to_owned())).build()
+    }
+}
+
+impl std::fmt::Display for IggyMessage {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match String::from_utf8(self.payload.to_vec()) {
+            Ok(payload) => {
+                write!(
+                    f,
+                    "[{offset}] ID:{id} '{preview}'",
+                    offset = self.header.offset,
+                    id = self.header.id,
+                    preview = if payload.len() > 50 {
+                        format!("{}... ({}B)", &payload[..47], 
self.payload.len())
+                    } else {
+                        payload
+                    }
+                )
+            }
+            Err(_) => {
+                write!(
+                    f,
+                    "[{offset}] ID:{id} <binary {payload_len}B>",
+                    offset = self.header.offset,
+                    id = self.header.id,
+                    payload_len = self.payload.len()
+                )
+            }
+        }
+    }
+}
+
+impl Sizeable for IggyMessage {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        let message_header_len = IGGY_MESSAGE_HEADER_SIZE;
+        let payload_len = self.payload.len();
+        let user_headers_len = self.user_headers.as_ref().map(|h| 
h.len()).unwrap_or(0);
+
+        #[cfg(debug_assertions)]
+        {
+            assert_eq!(
+                user_headers_len, self.header.user_headers_length as usize,
+                "user_headers.len() != header.user_headers_length"
+            );
+
+            assert_eq!(
+                payload_len, self.header.payload_length as usize,
+                "payload.len() != header.payload_length"
+            )
+        }
+
+        IggyByteSize::from((message_header_len + payload_len + 
user_headers_len) as u64)
+    }
+}
+
 impl From<IggyMessage> for Bytes {
     fn from(message: IggyMessage) -> Self {
         message.to_bytes()
@@ -652,7 +649,7 @@ impl<'de> Deserialize<'de> for IggyMessage {
                 let user_headers_bytes = if let Some(raw) = raw_user_headers {
                     Some(raw)
                 } else {
-                    user_headers.map(|headers| headers.to_bytes())
+                    user_headers.map(|headers| user_headers_to_bytes(&headers))
                 };
 
                 let user_headers_length = user_headers_bytes
diff --git a/core/common/src/types/message/message_header.rs 
b/core/common/src/types/message/message_header.rs
index 0fc1121c3..8af4dbe51 100644
--- a/core/common/src/types/message/message_header.rs
+++ b/core/common/src/types/message/message_header.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::{BytesSerializable, Sizeable, error::IggyError, 
utils::byte_size::IggyByteSize};
+use crate::{Sizeable, error::IggyError, utils::byte_size::IggyByteSize};
 use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
 use std::ops::Range;
@@ -107,10 +107,8 @@ impl IggyMessageHeader {
             },
         })
     }
-}
 
-impl BytesSerializable for IggyMessageHeader {
-    fn to_bytes(&self) -> Bytes {
+    pub fn to_bytes(&self) -> Bytes {
         let mut bytes = 
BytesMut::with_capacity(self.get_size_bytes().as_bytes_usize());
         bytes.put_u64_le(self.checksum);
         bytes.put_u128_le(self.id);
@@ -123,7 +121,7 @@ impl BytesSerializable for IggyMessageHeader {
         bytes.freeze()
     }
 
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
+    pub fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
         if bytes.len() != IGGY_MESSAGE_HEADER_SIZE {
             return Err(IggyError::InvalidCommand);
         }
diff --git a/core/common/src/types/message/message_view.rs 
b/core/common/src/types/message/message_view.rs
index 23a63e711..c3db53154 100644
--- a/core/common/src/types/message/message_view.rs
+++ b/core/common/src/types/message/message_view.rs
@@ -19,7 +19,7 @@
 use super::HeaderValue;
 use super::message_boundaries::IggyMessageBoundaries;
 use super::message_header::*;
-use crate::BytesSerializable;
+use super::user_headers::user_headers_from_bytes;
 use crate::IggyByteSize;
 use crate::Sizeable;
 use crate::error::IggyError;
@@ -90,7 +90,7 @@ impl<'a> IggyMessageView<'a> {
         if let Some(headers) = self.user_headers() {
             let headers_bytes = Bytes::copy_from_slice(headers);
 
-            match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) 
{
+            match user_headers_from_bytes(headers_bytes) {
                 Ok(h) => Ok(Some(h)),
                 Err(e) => {
                     tracing::error!(
diff --git a/core/common/src/types/message/messages_batch.rs 
b/core/common/src/types/message/messages_batch.rs
index 6346af08c..176376d4e 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -18,8 +18,8 @@
 
 use super::message_boundaries::IggyMessageBoundaries;
 use crate::{
-    BytesSerializable, INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, 
IggyMessageView,
-    IggyMessageViewIterator, MAX_PAYLOAD_SIZE, Sizeable, Validatable, 
error::IggyError,
+    INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, IggyMessageView, 
IggyMessageViewIterator,
+    MAX_PAYLOAD_SIZE, Sizeable, Validatable, error::IggyError,
 };
 use bytes::{BufMut, Bytes, BytesMut};
 use std::ops::{Deref, Index};
diff --git a/core/common/src/types/message/messages_batch_mut.rs 
b/core/common/src/types/message/messages_batch_mut.rs
index 78ba9b3c0..ee0333bc4 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -20,9 +20,9 @@ use super::indexes_mut::IggyIndexesMut;
 use super::message_boundaries::IggyMessageBoundaries;
 use super::message_view_mut::IggyMessageViewMutIterator;
 use crate::{
-    BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, 
IggyError,
-    IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator, 
IggyMessagesBatch,
-    IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, 
Validatable,
+    IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, 
IggyIndexView, IggyMessage,
+    IggyMessageView, IggyMessageViewIterator, IggyMessagesBatch, 
IggyTimestamp, MAX_PAYLOAD_SIZE,
+    MAX_USER_HEADERS_SIZE, Sizeable, Validatable,
 };
 use crate::{MessageDeduplicator, PooledBuffer, random_id};
 use lending_iterator::prelude::*;
diff --git a/core/common/src/types/message/mod.rs 
b/core/common/src/types/message/mod.rs
index 281cc3273..bd5688926 100644
--- a/core/common/src/types/message/mod.rs
+++ b/core/common/src/types/message/mod.rs
@@ -71,5 +71,6 @@ pub use polling_kind::PollingKind;
 pub use polling_strategy::PollingStrategy;
 pub use user_headers::{
     HeaderEntry, HeaderField, HeaderKey, HeaderKind, HeaderValue, KeyMarker, 
UserHeaders,
-    ValueMarker, deserialize_headers, serialize_headers,
+    ValueMarker, deserialize_headers, serialize_headers, 
user_headers_from_bytes,
+    user_headers_to_bytes,
 };
diff --git a/core/common/src/types/message/partitioning.rs 
b/core/common/src/types/message/partitioning.rs
index f0b2af3b0..5d80273d5 100644
--- a/core/common/src/types/message/partitioning.rs
+++ b/core/common/src/types/message/partitioning.rs
@@ -17,8 +17,7 @@
  */
 
 use super::PartitioningKind;
-use crate::{BytesSerializable, IggyByteSize, Sizeable, error::IggyError};
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::{IggyByteSize, Sizeable, error::IggyError};
 use serde::{Deserialize, Serialize};
 use serde_with::base64::Base64;
 use serde_with::serde_as;
@@ -187,33 +186,6 @@ impl Sizeable for Partitioning {
     }
 }
 
-impl BytesSerializable for Partitioning {
-    fn to_bytes(&self) -> Bytes {
-        let mut bytes = BytesMut::with_capacity(2 + self.length as usize);
-        bytes.put_u8(self.kind.as_code());
-        bytes.put_u8(self.length);
-        bytes.put_slice(&self.value);
-        bytes.freeze()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        Self::from_raw_bytes(&bytes)
-    }
-
-    fn write_to_buffer(&self, bytes: &mut BytesMut) {
-        bytes.put_u8(self.kind.as_code());
-        bytes.put_u8(self.length);
-        bytes.put_slice(&self.value);
-    }
-
-    fn get_buffer_size(&self) -> usize {
-        2 + self.length as usize
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -240,24 +212,11 @@ mod tests {
         assert_eq!(p.value, 42u32.to_le_bytes());
     }
 
-    #[test]
-    fn from_bytes_should_reject_partition_id_with_wrong_length() {
-        for bad_len in [0u8, 1, 2, 3, 5] {
-            let mut buf = BytesMut::new();
-            buf.put_u8(PartitioningKind::PartitionId.as_code());
-            buf.put_u8(bad_len);
-            buf.extend(vec![0u8; bad_len as usize]);
-            assert!(
-                Partitioning::from_bytes(buf.freeze()).is_err(),
-                "expected error for PartitionId with length={bad_len}"
-            );
-        }
-    }
-
     #[test]
     fn from_raw_bytes_should_fail_on_truncated_input() {
         let p = Partitioning::partition_id(99);
-        let full = p.to_bytes();
+        let mut full = vec![p.kind.as_code(), p.length];
+        full.extend_from_slice(&p.value);
         for i in 0..full.len() {
             assert!(
                 Partitioning::from_raw_bytes(&full[..i]).is_err(),
@@ -277,14 +236,6 @@ mod tests {
         assert!(s.contains("<invalid>"));
     }
 
-    #[test]
-    fn round_trip_partition_id() {
-        let original = Partitioning::partition_id(12345);
-        let bytes = original.to_bytes();
-        let restored = Partitioning::from_bytes(bytes).unwrap();
-        assert_eq!(original, restored);
-    }
-
     #[test]
     fn from_raw_bytes_should_reject_balanced_with_nonzero_length() {
         for bad_len in [1u8, 2, 4, 255] {
@@ -296,20 +247,4 @@ mod tests {
             );
         }
     }
-
-    #[test]
-    fn round_trip_balanced() {
-        let original = Partitioning::balanced();
-        let bytes = original.to_bytes();
-        let restored = Partitioning::from_bytes(bytes).unwrap();
-        assert_eq!(original, restored);
-    }
-
-    #[test]
-    fn round_trip_messages_key() {
-        let original = Partitioning::messages_key(b"my-key").unwrap();
-        let bytes = original.to_bytes();
-        let restored = Partitioning::from_bytes(bytes).unwrap();
-        assert_eq!(original, restored);
-    }
 }
diff --git a/core/common/src/types/message/polled_messages.rs 
b/core/common/src/types/message/polled_messages.rs
index 208de1f0a..198f78cd7 100644
--- a/core/common/src/types/message/polled_messages.rs
+++ b/core/common/src/types/message/polled_messages.rs
@@ -16,9 +16,7 @@
  * under the License.
  */
 
-use crate::{
-    BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, IggyMessage, 
IggyMessageHeader, error::IggyError,
-};
+use crate::{IGGY_MESSAGE_HEADER_SIZE, IggyMessage, IggyMessageHeader, 
error::IggyError};
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
 use tracing::error;
diff --git a/core/common/src/types/message/polling_strategy.rs 
b/core/common/src/types/message/polling_strategy.rs
index 35052a188..5035087a9 100644
--- a/core/common/src/types/message/polling_strategy.rs
+++ b/core/common/src/types/message/polling_strategy.rs
@@ -16,11 +16,8 @@
  * under the License.
  */
 
-use crate::BytesSerializable;
-use crate::error::IggyError;
 use crate::types::message::polling_kind::PollingKind;
 use crate::utils::timestamp::IggyTimestamp;
-use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
 use serde_with::{DisplayFromStr, serde_as};
 use std::fmt::Display;
@@ -121,27 +118,3 @@ impl PollingStrategy {
         PollingKind::default()
     }
 }
-
-impl BytesSerializable for PollingStrategy {
-    fn to_bytes(&self) -> Bytes {
-        let mut bytes = BytesMut::with_capacity(9);
-        bytes.put_u8(self.kind.as_code());
-        bytes.put_u64_le(self.value);
-        bytes.freeze()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
-        if bytes.len() != 9 {
-            return Err(IggyError::InvalidCommand);
-        }
-
-        let kind = PollingKind::from_code(bytes[0])?;
-        let value = u64::from_le_bytes(
-            bytes[1..9]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        );
-        let strategy = PollingStrategy { kind, value };
-        Ok(strategy)
-    }
-}
diff --git a/core/common/src/types/message/user_headers.rs 
b/core/common/src/types/message/user_headers.rs
index aaec4ffe0..396608199 100644
--- a/core/common/src/types/message/user_headers.rs
+++ b/core/common/src/types/message/user_headers.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::BytesSerializable;
 use crate::error::IggyError;
 use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
@@ -930,103 +929,98 @@ impl<T> TryFrom<&HeaderField<T>> for Vec<u8> {
     }
 }
 
-impl BytesSerializable for HashMap<HeaderKey, HeaderValue> {
-    fn to_bytes(&self) -> Bytes {
-        if self.is_empty() {
-            return Bytes::new();
-        }
-
-        let mut bytes = BytesMut::new();
-        for (key, value) in self {
-            bytes.put_u8(key.kind().as_code());
-            #[allow(clippy::cast_possible_truncation)]
-            bytes.put_u32_le(key.as_bytes().len() as u32);
-            bytes.put_slice(key.as_bytes());
-            bytes.put_u8(value.kind().as_code());
-            #[allow(clippy::cast_possible_truncation)]
-            bytes.put_u32_le(value.as_bytes().len() as u32);
-            bytes.put_slice(value.as_bytes());
-        }
+pub fn user_headers_to_bytes(headers: &HashMap<HeaderKey, HeaderValue>) -> 
Bytes {
+    if headers.is_empty() {
+        return Bytes::new();
+    }
 
-        bytes.freeze()
+    let mut bytes = BytesMut::new();
+    for (key, value) in headers {
+        bytes.put_u8(key.kind().as_code());
+        #[allow(clippy::cast_possible_truncation)]
+        bytes.put_u32_le(key.as_bytes().len() as u32);
+        bytes.put_slice(key.as_bytes());
+        bytes.put_u8(value.kind().as_code());
+        #[allow(clippy::cast_possible_truncation)]
+        bytes.put_u32_le(value.as_bytes().len() as u32);
+        bytes.put_slice(value.as_bytes());
     }
 
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        if bytes.is_empty() {
-            return Ok(Self::new());
-        }
+    bytes.freeze()
+}
 
-        let mut headers = Self::new();
-        let mut position = 0;
-        while position < bytes.len() {
-            let key_kind = HeaderKind::from_code(bytes[position])?;
-            position += 1;
+pub fn user_headers_from_bytes(bytes: Bytes) -> Result<HashMap<HeaderKey, 
HeaderValue>, IggyError> {
+    if bytes.is_empty() {
+        return Ok(HashMap::new());
+    }
 
-            if position + 4 > bytes.len() {
-                return Err(IggyError::InvalidHeaderKey);
-            }
-            let key_length = u32::from_le_bytes(
-                bytes[position..position + 4]
-                    .try_into()
-                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
-            ) as usize;
-            if key_length == 0 || key_length > 255 {
-                return Err(IggyError::InvalidHeaderKey);
-            }
-            position += 4;
+    let mut headers = HashMap::new();
+    let mut position = 0;
+    while position < bytes.len() {
+        let key_kind = HeaderKind::from_code(bytes[position])?;
+        position += 1;
 
-            if position + key_length > bytes.len() {
-                return Err(IggyError::InvalidHeaderKey);
-            }
-            if let Some(expected) = key_kind.expected_size()
-                && key_length != expected
-            {
-                return Err(IggyError::InvalidHeaderKey);
-            }
-            let key_value = bytes[position..position + key_length].to_vec();
-            position += key_length;
+        if position + 4 > bytes.len() {
+            return Err(IggyError::InvalidHeaderKey);
+        }
+        let key_length = u32::from_le_bytes(
+            bytes[position..position + 4]
+                .try_into()
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
+        ) as usize;
+        if key_length == 0 || key_length > 255 {
+            return Err(IggyError::InvalidHeaderKey);
+        }
+        position += 4;
 
-            if position >= bytes.len() {
-                return Err(IggyError::InvalidHeaderValue);
-            }
-            let value_kind = HeaderKind::from_code(bytes[position])?;
-            position += 1;
+        if position + key_length > bytes.len() {
+            return Err(IggyError::InvalidHeaderKey);
+        }
+        if let Some(expected) = key_kind.expected_size()
+            && key_length != expected
+        {
+            return Err(IggyError::InvalidHeaderKey);
+        }
+        let key_value = bytes[position..position + key_length].to_vec();
+        position += key_length;
 
-            if position + 4 > bytes.len() {
-                return Err(IggyError::InvalidHeaderValue);
-            }
-            let value_length = u32::from_le_bytes(
-                bytes[position..position + 4]
-                    .try_into()
-                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
-            ) as usize;
-            if value_length == 0 || value_length > 255 {
-                return Err(IggyError::InvalidHeaderValue);
-            }
-            position += 4;
+        if position >= bytes.len() {
+            return Err(IggyError::InvalidHeaderValue);
+        }
+        let value_kind = HeaderKind::from_code(bytes[position])?;
+        position += 1;
 
-            if position + value_length > bytes.len() {
-                return Err(IggyError::InvalidHeaderValue);
-            }
-            if let Some(expected) = value_kind.expected_size()
-                && value_length != expected
-            {
-                return Err(IggyError::InvalidHeaderValue);
-            }
-            let value_value = bytes[position..position + 
value_length].to_vec();
-            position += value_length;
+        if position + 4 > bytes.len() {
+            return Err(IggyError::InvalidHeaderValue);
+        }
+        let value_length = u32::from_le_bytes(
+            bytes[position..position + 4]
+                .try_into()
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
+        ) as usize;
+        if value_length == 0 || value_length > 255 {
+            return Err(IggyError::InvalidHeaderValue);
+        }
+        position += 4;
 
-            headers.insert(
-                HeaderKey::new_unchecked(key_kind, &key_value),
-                HeaderValue::new_unchecked(value_kind, &value_value),
-            );
+        if position + value_length > bytes.len() {
+            return Err(IggyError::InvalidHeaderValue);
         }
+        if let Some(expected) = value_kind.expected_size()
+            && value_length != expected
+        {
+            return Err(IggyError::InvalidHeaderValue);
+        }
+        let value_value = bytes[position..position + value_length].to_vec();
+        position += value_length;
 
-        Ok(headers)
+        headers.insert(
+            HeaderKey::new_unchecked(key_kind, &key_value),
+            HeaderValue::new_unchecked(value_kind, &value_value),
+        );
     }
+
+    Ok(headers)
 }
 
 pub fn get_user_headers_size(headers: &Option<HashMap<HeaderKey, 
HeaderValue>>) -> Option<u32> {
@@ -1370,7 +1364,7 @@ mod tests {
         headers.insert(HeaderKey::try_from("key 1").unwrap(), 12345u64.into());
         headers.insert(HeaderKey::try_from("key_3").unwrap(), true.into());
 
-        let bytes = headers.to_bytes();
+        let bytes = user_headers_to_bytes(&headers);
 
         let mut position = 0;
         let mut headers_count = 0;
@@ -1427,7 +1421,7 @@ mod tests {
             bytes.put_slice(&value.value);
         }
 
-        let deserialized_headers = HashMap::<HeaderKey, 
HeaderValue>::from_bytes(bytes.freeze());
+        let deserialized_headers = user_headers_from_bytes(bytes.freeze());
 
         assert!(deserialized_headers.is_ok());
         let deserialized_headers = deserialized_headers.unwrap();
@@ -1451,8 +1445,8 @@ mod tests {
         );
         headers.insert(999u64.into(), true.into());
 
-        let bytes = headers.to_bytes();
-        let deserialized = HashMap::<HeaderKey, 
HeaderValue>::from_bytes(bytes).unwrap();
+        let bytes = user_headers_to_bytes(&headers);
+        let deserialized = user_headers_from_bytes(bytes).unwrap();
 
         assert_eq!(deserialized.len(), headers.len());
         for (key, value) in &headers {
@@ -1661,7 +1655,7 @@ mod tests {
         bytes.put_u32_le(100); // key_len = 100 (lie!)
         bytes.put_slice(b"abc"); // only 3 bytes of key data
 
-        let result = HashMap::<HeaderKey, 
HeaderValue>::from_bytes(bytes.freeze());
+        let result = user_headers_from_bytes(bytes.freeze());
         assert!(result.is_err());
     }
 
@@ -1688,7 +1682,7 @@ mod tests {
         bytes.put_u32_le(4); // value_len = 4 (wrong, should be 2)
         bytes.put_slice(&[1, 2, 3, 4]);
 
-        let result = HashMap::<HeaderKey, 
HeaderValue>::from_bytes(bytes.freeze());
+        let result = user_headers_from_bytes(bytes.freeze());
         assert!(result.is_err());
     }
 
@@ -1703,7 +1697,7 @@ mod tests {
         bytes.put_u8(6); // value_kind = Int32
         // Missing value length bytes
 
-        let result = HashMap::<HeaderKey, 
HeaderValue>::from_bytes(bytes.freeze());
+        let result = user_headers_from_bytes(bytes.freeze());
         assert!(result.is_err());
     }
 
diff --git a/core/common/src/types/permissions/permissions_global.rs 
b/core/common/src/types/permissions/permissions_global.rs
index b0024e523..42f670715 100644
--- a/core/common/src/types/permissions/permissions_global.rs
+++ b/core/common/src/types/permissions/permissions_global.rs
@@ -16,9 +16,6 @@
  * under the License.
  */
 
-use crate::BytesSerializable;
-use crate::error::IggyError;
-use bytes::{Buf, BufMut, Bytes, BytesMut};
 use comfy_table::Table;
 use comfy_table::presets::ASCII_NO_BORDERS;
 use serde::{Deserialize, Serialize};
@@ -219,150 +216,6 @@ impl Display for Permissions {
     }
 }
 
-impl BytesSerializable for Permissions {
-    fn to_bytes(&self) -> Bytes {
-        let mut bytes = BytesMut::new();
-        bytes.put_u8(if self.global.manage_servers { 1 } else { 0 });
-        bytes.put_u8(if self.global.read_servers { 1 } else { 0 });
-        bytes.put_u8(if self.global.manage_users { 1 } else { 0 });
-        bytes.put_u8(if self.global.read_users { 1 } else { 0 });
-        bytes.put_u8(if self.global.manage_streams { 1 } else { 0 });
-        bytes.put_u8(if self.global.read_streams { 1 } else { 0 });
-        bytes.put_u8(if self.global.manage_topics { 1 } else { 0 });
-        bytes.put_u8(if self.global.read_topics { 1 } else { 0 });
-        bytes.put_u8(if self.global.poll_messages { 1 } else { 0 });
-        bytes.put_u8(if self.global.send_messages { 1 } else { 0 });
-        if let Some(streams) = self.streams.as_ref().filter(|s| !s.is_empty()) 
{
-            bytes.put_u8(1);
-            let streams_count = streams.len();
-            let mut current_stream = 1;
-            for (stream_id, stream) in streams {
-                bytes.put_u32_le(*stream_id as u32);
-                bytes.put_u8(if stream.manage_stream { 1 } else { 0 });
-                bytes.put_u8(if stream.read_stream { 1 } else { 0 });
-                bytes.put_u8(if stream.manage_topics { 1 } else { 0 });
-                bytes.put_u8(if stream.read_topics { 1 } else { 0 });
-                bytes.put_u8(if stream.poll_messages { 1 } else { 0 });
-                bytes.put_u8(if stream.send_messages { 1 } else { 0 });
-                if let Some(topics) = stream.topics.as_ref().filter(|t| 
!t.is_empty()) {
-                    bytes.put_u8(1);
-                    let topics_count = topics.len();
-                    let mut current_topic = 1;
-                    for (topic_id, topic) in topics {
-                        bytes.put_u32_le(*topic_id as u32);
-                        bytes.put_u8(if topic.manage_topic { 1 } else { 0 });
-                        bytes.put_u8(if topic.read_topic { 1 } else { 0 });
-                        bytes.put_u8(if topic.poll_messages { 1 } else { 0 });
-                        bytes.put_u8(if topic.send_messages { 1 } else { 0 });
-                        if current_topic < topics_count {
-                            current_topic += 1;
-                            bytes.put_u8(1);
-                        } else {
-                            bytes.put_u8(0);
-                        }
-                    }
-                } else {
-                    bytes.put_u8(0);
-                }
-                if current_stream < streams_count {
-                    current_stream += 1;
-                    bytes.put_u8(1);
-                } else {
-                    bytes.put_u8(0);
-                }
-            }
-        } else {
-            bytes.put_u8(0);
-        }
-        bytes.freeze()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        let mut bytes = bytes;
-        let manage_servers = bytes.get_u8() == 1;
-        let read_servers = bytes.get_u8() == 1;
-        let manage_users = bytes.get_u8() == 1;
-        let read_users = bytes.get_u8() == 1;
-        let manage_streams = bytes.get_u8() == 1;
-        let read_streams = bytes.get_u8() == 1;
-        let manage_topics = bytes.get_u8() == 1;
-        let read_topics = bytes.get_u8() == 1;
-        let poll_messages = bytes.get_u8() == 1;
-        let send_messages = bytes.get_u8() == 1;
-        let mut streams = None;
-        if bytes.get_u8() == 1 {
-            let mut streams_map = BTreeMap::new();
-            loop {
-                let stream_id = bytes.get_u32_le() as usize;
-                let manage_stream = bytes.get_u8() == 1;
-                let read_stream = bytes.get_u8() == 1;
-                let manage_topics = bytes.get_u8() == 1;
-                let read_topics = bytes.get_u8() == 1;
-                let poll_messages = bytes.get_u8() == 1;
-                let send_messages = bytes.get_u8() == 1;
-                let mut topics = None;
-                if bytes.get_u8() == 1 {
-                    let mut topics_map = BTreeMap::new();
-                    loop {
-                        let topic_id = bytes.get_u32_le() as usize;
-                        let manage_topic = bytes.get_u8() == 1;
-                        let read_topic = bytes.get_u8() == 1;
-                        let poll_messages = bytes.get_u8() == 1;
-                        let send_messages = bytes.get_u8() == 1;
-                        topics_map.insert(
-                            topic_id,
-                            TopicPermissions {
-                                manage_topic,
-                                read_topic,
-                                poll_messages,
-                                send_messages,
-                            },
-                        );
-                        if bytes.get_u8() == 0 {
-                            break;
-                        }
-                    }
-                    topics = Some(topics_map);
-                }
-                streams_map.insert(
-                    stream_id,
-                    StreamPermissions {
-                        manage_stream,
-                        read_stream,
-                        manage_topics,
-                        read_topics,
-                        poll_messages,
-                        send_messages,
-                        topics,
-                    },
-                );
-                if bytes.get_u8() == 0 {
-                    break;
-                }
-            }
-            streams = Some(streams_map);
-        }
-        Ok(Self {
-            global: GlobalPermissions {
-                manage_servers,
-                read_servers,
-                manage_users,
-                read_users,
-                manage_streams,
-                read_streams,
-                manage_topics,
-                read_topics,
-                poll_messages,
-                send_messages,
-            },
-            streams,
-        })
-    }
-}
-
 impl From<GlobalPermissions> for Table {
     fn from(value: GlobalPermissions) -> Self {
         let mut table = Self::new();
@@ -470,114 +323,3 @@ impl From<&StreamPermissions> for Table {
         table
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn should_be_serialized_and_deserialized_from_bytes() {
-        let permissions = Permissions {
-            global: GlobalPermissions {
-                manage_servers: true,
-                read_servers: true,
-                manage_users: true,
-                read_users: true,
-                manage_streams: false,
-                read_streams: true,
-                manage_topics: false,
-                read_topics: true,
-                poll_messages: true,
-                send_messages: true,
-            },
-            streams: Some(BTreeMap::from([
-                (
-                    1,
-                    StreamPermissions {
-                        manage_stream: true,
-                        read_stream: true,
-                        manage_topics: true,
-                        read_topics: true,
-                        poll_messages: true,
-                        send_messages: true,
-                        topics: Some(BTreeMap::from([
-                            (
-                                1,
-                                TopicPermissions {
-                                    manage_topic: true,
-                                    read_topic: true,
-                                    poll_messages: true,
-                                    send_messages: true,
-                                },
-                            ),
-                            (
-                                2,
-                                TopicPermissions {
-                                    manage_topic: true,
-                                    read_topic: false,
-                                    poll_messages: true,
-                                    send_messages: false,
-                                },
-                            ),
-                        ])),
-                    },
-                ),
-                (
-                    2,
-                    StreamPermissions {
-                        manage_stream: false,
-                        read_stream: true,
-                        manage_topics: false,
-                        read_topics: true,
-                        poll_messages: true,
-                        send_messages: true,
-                        topics: None,
-                    },
-                ),
-            ])),
-        };
-
-        let bytes = permissions.to_bytes();
-        let deserialized_permissions = Permissions::from_bytes(bytes).unwrap();
-
-        assert_eq!(permissions, deserialized_permissions);
-    }
-
-    #[test]
-    fn should_handle_empty_streams_map() {
-        let permissions = Permissions {
-            global: GlobalPermissions::default(),
-            streams: Some(BTreeMap::new()),
-        };
-
-        let bytes = permissions.to_bytes();
-        let deserialized = Permissions::from_bytes(bytes).unwrap();
-
-        assert!(deserialized.streams.is_none());
-    }
-
-    #[test]
-    fn should_handle_empty_topics_map() {
-        let permissions = Permissions {
-            global: GlobalPermissions::default(),
-            streams: Some(BTreeMap::from([(
-                1,
-                StreamPermissions {
-                    manage_stream: true,
-                    read_stream: true,
-                    manage_topics: false,
-                    read_topics: false,
-                    poll_messages: false,
-                    send_messages: false,
-                    topics: Some(BTreeMap::new()),
-                },
-            )])),
-        };
-
-        let bytes = permissions.to_bytes();
-        let deserialized = Permissions::from_bytes(bytes).unwrap();
-
-        let stream = deserialized.streams.as_ref().unwrap().get(&1).unwrap();
-        assert!(stream.topics.is_none());
-    }
-}
diff --git a/core/common/src/wire_conversions.rs 
b/core/common/src/wire_conversions.rs
index 343172b61..c5a48916c 100644
--- a/core/common/src/wire_conversions.rs
+++ b/core/common/src/wire_conversions.rs
@@ -522,6 +522,33 @@ pub fn consumer_to_wire(consumer: &Consumer) -> 
Result<WireConsumer, IggyError>
     })
 }
 
+/// Convert a domain `PollingStrategy` to `WirePollingStrategy`.
+pub fn polling_strategy_to_wire(
+    strategy: &crate::PollingStrategy,
+) -> iggy_binary_protocol::primitives::polling_strategy::WirePollingStrategy {
+    iggy_binary_protocol::primitives::polling_strategy::WirePollingStrategy {
+        kind: strategy.kind.as_code(),
+        value: strategy.value,
+    }
+}
+
+/// Convert a domain `Partitioning` to `WirePartitioning`.
+pub fn partitioning_to_wire(
+    partitioning: &crate::Partitioning,
+) -> iggy_binary_protocol::primitives::partitioning::WirePartitioning {
+    use iggy_binary_protocol::primitives::partitioning::WirePartitioning;
+    match partitioning.kind {
+        crate::PartitioningKind::Balanced => WirePartitioning::Balanced,
+        crate::PartitioningKind::PartitionId => {
+            let id = 
u32::from_le_bytes(partitioning.value[..4].try_into().unwrap());
+            WirePartitioning::PartitionId(id)
+        }
+        crate::PartitioningKind::MessagesKey => {
+            WirePartitioning::MessagesKey(partitioning.value.clone())
+        }
+    }
+}
+
 // ---------------------------------------------------------------------------
 // Permissions (wire -> domain)
 // ---------------------------------------------------------------------------
diff --git a/core/integration/tests/server/scenarios/offset_scenario.rs 
b/core/integration/tests/server/scenarios/offset_scenario.rs
index c1544137b..282e355fa 100644
--- a/core/integration/tests/server/scenarios/offset_scenario.rs
+++ b/core/integration/tests/server/scenarios/offset_scenario.rs
@@ -19,6 +19,7 @@
 use super::PARTITION_ID;
 use bytes::BytesMut;
 use iggy::prelude::*;
+use iggy_common::user_headers_from_bytes;
 use integration::harness::TestHarness;
 use std::collections::HashMap;
 
@@ -403,8 +404,7 @@ async fn verify_message_content(
             );
 
             if let Some(headers) = &msg.user_headers {
-                let headers_map =
-                    HashMap::<HeaderKey, 
HeaderValue>::from_bytes(headers.clone()).unwrap();
+                let headers_map = 
user_headers_from_bytes(headers.clone()).unwrap();
                 assert_eq!(headers_map.len(), 3, "Expected 3 headers");
             }
         }
diff --git a/core/integration/tests/server/scenarios/timestamp_scenario.rs 
b/core/integration/tests/server/scenarios/timestamp_scenario.rs
index af6fb7796..661970dd6 100644
--- a/core/integration/tests/server/scenarios/timestamp_scenario.rs
+++ b/core/integration/tests/server/scenarios/timestamp_scenario.rs
@@ -19,6 +19,7 @@
 use super::PARTITION_ID;
 use bytes::BytesMut;
 use iggy::prelude::*;
+use iggy_common::user_headers_from_bytes;
 use integration::harness::TestHarness;
 use std::collections::HashMap;
 use tokio::time::{Duration, sleep};
@@ -427,8 +428,7 @@ async fn verify_message_content_by_timestamp(
             );
 
             if let Some(headers) = &msg.user_headers {
-                let headers_map =
-                    HashMap::<HeaderKey, 
HeaderValue>::from_bytes(headers.clone()).unwrap();
+                let headers_map = 
user_headers_from_bytes(headers.clone()).unwrap();
                 assert_eq!(headers_map.len(), 3, "Expected 3 headers");
             }
         }
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index e7824a06e..72e426ce5 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -49,10 +49,10 @@ pub use crate::stream_builder::{IggyStream, 
IggyStreamConfig};
 pub use crate::tcp::tcp_client::TcpClient;
 pub use crate::websocket::websocket_client::WebSocketClient;
 pub use iggy_common::{
-    Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, 
CacheMetrics,
-    CacheMetricsKey, ClientError, ClientInfoDetails, ClusterMetadata, 
ClusterNode, ClusterNodeRole,
-    ClusterNodeStatus, CompressionAlgorithm, Consumer, ConsumerGroupDetails, 
ConsumerKind,
-    EncryptorKind, GlobalPermissions, HeaderKey, HeaderKind, HeaderValue, 
HttpClientConfig,
+    Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, CacheMetrics, 
CacheMetricsKey, ClientError,
+    ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, 
ClusterNodeStatus,
+    CompressionAlgorithm, Consumer, ConsumerGroupDetails, ConsumerKind, 
EncryptorKind,
+    GlobalPermissions, HeaderKey, HeaderKind, HeaderValue, HttpClientConfig,
     HttpClientConfigBuilder, IdKind, Identifier, IdentityInfo, IggyByteSize, 
IggyDuration,
     IggyError, IggyExpiry, IggyIndexView, IggyMessage, IggyMessageHeader, 
IggyMessageHeaderView,
     IggyMessageView, IggyMessageViewIterator, IggyTimestamp, MaxTopicSize, 
Partition, Partitioner,
@@ -63,6 +63,7 @@ pub use iggy_common::{
     TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, 
TransportEndpoints,
     TransportProtocol, UserId, UserStatus, Validatable, WebSocketClientConfig,
     WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, defaults, 
locking,
+    user_headers_from_bytes, user_headers_to_bytes,
 };
 pub use iggy_common::{
     Client, ClusterClient, ConsumerGroupClient, ConsumerOffsetClient, 
MessageClient,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 21e342003..82a5d25b0 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -30,8 +30,7 @@ use iggy_common::IggyPollMetadata;
 use iggy_common::PooledBuffer;
 use iggy_common::sharding::IggyNamespace;
 use iggy_common::{
-    BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier, IggyError,
-    PollingStrategy,
+    Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, 
PollingStrategy,
 };
 use std::sync::atomic::Ordering;
 use tracing::error;
diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs
index 01a9309df..7d885b29d 100644
--- a/core/server/src/state/models.rs
+++ b/core/server/src/state/models.rs
@@ -107,7 +107,7 @@ impl Display for CreatePersonalAccessTokenWithHash {
 }
 
 // Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | 
inner_bytes
-// This is identical to the previous BytesSerializable format.
+// Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | 
inner_bytes
 
 impl WireEncode for CreateStreamWithId {
     fn encoded_size(&self) -> usize {
diff --git 
a/examples/rust/src/message-headers/message-compression/consumer/main.rs 
b/examples/rust/src/message-headers/message-compression/consumer/main.rs
index 32104d754..3dc14d308 100644
--- a/examples/rust/src/message-headers/message-compression/consumer/main.rs
+++ b/examples/rust/src/message-headers/message-compression/consumer/main.rs
@@ -76,7 +76,7 @@ fn handle_payload_compression(msg: &mut ReceivedMessage) -> 
Result<(), IggyError
         // remove the compression header since payload is now decompressed
         if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() {
             headers_map.remove(&Codec::header_key());
-            let headers_bytes = headers_map.to_bytes();
+            let headers_bytes = user_headers_to_bytes(&headers_map);
             msg.message.header.user_headers_length = headers_bytes.len() as 
u32;
             msg.message.user_headers = if headers_map.is_empty() {
                 None

Reply via email to