This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partition_remaster
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1de4e23a862c80de0e3bd9b4f28dffb8e4ede30a
Author: numinex <[email protected]>
AuthorDate: Tue Mar 24 08:05:31 2026 +0100

    feat(partitions): rework partitions using prefix/tail message
---
 Cargo.lock                                         |   8 +
 Cargo.toml                                         |   1 +
 core/common/Cargo.toml                             |   1 +
 core/common/src/types/consensus/message.rs         | 199 ++++---
 .../src/types/segment_storage/messages_writer.rs   |   1 -
 core/consensus/Cargo.toml                          |   1 +
 core/consensus/src/plane_helpers.rs                |  29 +-
 core/iobuf/Cargo.toml                              |   1 +
 core/iobuf/src/lib.rs                              | 599 ++++++++++++++-------
 core/message_bus/src/lib.rs                        |  16 +-
 core/metadata/Cargo.toml                           |   1 +
 core/metadata/src/impls/metadata.rs                |   9 +
 core/metadata/src/stm/mod.rs                       |   3 +-
 core/partitions/Cargo.toml                         |   3 +
 core/partitions/src/frozen_messages_writer.rs      | 116 ++++
 core/partitions/src/iggy_partition.rs              | 105 ++--
 core/partitions/src/iggy_partitions.rs             |  96 +++-
 core/partitions/src/journal.rs                     | 167 +++---
 core/partitions/src/lib.rs                         |  38 +-
 core/partitions/src/send_messages2.rs              | 552 +++++++++++++++++++
 core/simulator/Cargo.toml                          |   1 +
 core/simulator/src/bus.rs                          |  16 +-
 core/simulator/src/client.rs                       |  13 +-
 core/simulator/src/deps.rs                         |  13 +-
 core/simulator/src/lib.rs                          |   6 +-
 core/simulator/src/packet.rs                       |   6 +-
 26 files changed, 1462 insertions(+), 539 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 63420b7b5..6450c9bec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2232,6 +2232,7 @@ dependencies = [
  "bytes",
  "futures",
  "iggy_common",
+ "iobuf",
  "message_bus",
  "rand 0.10.0",
  "rand_xoshiro",
@@ -5427,6 +5428,7 @@ dependencies = [
  "err_trail",
  "human-repr",
  "humantime",
+ "iobuf",
  "lending-iterator",
  "moka",
  "nix",
@@ -5922,6 +5924,7 @@ name = "iobuf"
 version = "0.1.0"
 dependencies = [
  "aligned-vec",
+ "compio-buf",
 ]
 
 [[package]]
@@ -6671,6 +6674,7 @@ dependencies = [
  "bytes",
  "consensus",
  "iggy_common",
+ "iobuf",
  "journal",
  "left-right",
  "message_bus",
@@ -7645,9 +7649,12 @@ dependencies = [
 name = "partitions"
 version = "0.1.0"
 dependencies = [
+ "bytemuck",
  "bytes",
+ "compio",
  "consensus",
  "iggy_common",
+ "iobuf",
  "journal",
  "message_bus",
  "ringbuffer",
@@ -9942,6 +9949,7 @@ dependencies = [
  "enumset",
  "futures",
  "iggy_common",
+ "iobuf",
  "journal",
  "message_bus",
  "metadata",
diff --git a/Cargo.toml b/Cargo.toml
index 3843f5649..af4c45d95 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -157,6 +157,7 @@ hostname = "0.4.2"
 human-repr = "1.1.0"
 humantime = "2.3.0"
 hwlocality = "1.0.0-alpha.11"
+iobuf = { path = "core/iobuf" }
 iceberg = "0.9.0"
 iceberg-catalog-rest = "0.9.0"
 iceberg-storage-opendal = "0.9.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index d3d3794ba..26f5591cf 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -48,6 +48,7 @@ enumset = { workspace = true }
 err_trail = { workspace = true }
 human-repr = { workspace = true }
 humantime = { workspace = true }
+iobuf = { workspace = true }
 lending-iterator = { workspace = true }
 moka = { workspace = true }
 once_cell = { workspace = true }
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index 5a49671db..bd30a5323 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -19,9 +19,13 @@ use crate::{
     header::RequestHeader,
     types::consensus::header::{self, ConsensusHeader, PrepareHeader, 
PrepareOkHeader},
 };
-use bytes::Bytes;
+use iobuf::{Frozen, Owned, Prefix};
 use std::marker::PhantomData;
 
+const MESSAGE_ALIGN: usize = 4096;
+type MessageOwned = Owned<MESSAGE_ALIGN>;
+type MessageBuffer = (Prefix<MESSAGE_ALIGN>, Frozen<MESSAGE_ALIGN>);
+
 // TODO: Rename this to Message and ConsensusHeader to Header.
 pub trait ConsensusMessage<H>
 where
@@ -36,7 +40,7 @@ where
     H: ConsensusHeader,
 {
     fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
+        let header_bytes = &self.prefix[..size_of::<H>()];
         bytemuck::checked::try_from_bytes(header_bytes)
             .expect("header validated at construction time")
     }
@@ -44,7 +48,8 @@ where
 
 #[derive(Debug, Clone)]
 pub struct Message<H: ConsensusHeader> {
-    buffer: Bytes,
+    prefix: Prefix<MESSAGE_ALIGN>,
+    tail: Frozen<MESSAGE_ALIGN>,
     _marker: PhantomData<H>,
 }
 
@@ -55,41 +60,30 @@ where
     #[inline]
     #[allow(unused)]
     pub fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
+        let header_bytes = &self.prefix[..size_of::<H>()];
         bytemuck::checked::try_from_bytes(header_bytes)
             .expect("header validated at construction time")
     }
 
-    /// Create a new message from a buffer.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    /// - buffer is too small for the header
-    /// - buffer contains invalid bit patterns (enum discriminants)
-    /// - buffer is too small for the size specified in the header
-    /// - header validation fails
     #[allow(unused)]
-    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
-        // verify minimum size
-        if buffer.len() < size_of::<H>() {
+    pub fn from_inner(buffer: MessageBuffer) -> Result<Self, 
header::ConsensusError> {
+        let (prefix, tail) = buffer;
+        let total_len = prefix.len() + tail.len();
+
+        if total_len < size_of::<H>() || prefix.len() < size_of::<H>() {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: H::COMMAND,
                 found: header::Command2::Reserved,
             });
         }
 
-        // Validate bit patterns (enum discriminants) via try_from_bytes
-        let header_bytes = &buffer[..size_of::<H>()];
+        let header_bytes = &prefix[..size_of::<H>()];
         let header = bytemuck::checked::try_from_bytes::<H>(header_bytes)
             .map_err(|_| header::ConsensusError::InvalidBitPattern)?;
 
-        // validate the header
         header.validate()?;
 
-        // verify buffer size matches header.size
-        let header_size = header.size() as usize;
-        if buffer.len() < header_size {
+        if total_len < header.size() as usize {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: H::COMMAND,
                 found: header::Command2::Reserved,
@@ -97,7 +91,8 @@ where
         }
 
         Ok(Self {
-            buffer,
+            prefix,
+            tail,
             _marker: PhantomData,
         })
     }
@@ -108,9 +103,10 @@ where
     #[allow(unused)]
     pub fn new(size: usize) -> Self {
         assert!(size >= size_of::<H>(), "Size must be at least header size");
-        let buffer = Bytes::from(vec![0u8; size]);
+        let (prefix, tail) = 
MessageOwned::zeroed(size).split_at(size_of::<H>());
         Self {
-            buffer,
+            prefix,
+            tail,
             _marker: PhantomData,
         }
     }
@@ -123,71 +119,27 @@ where
         let old_header = *self.header();
 
         // Safety: We ensured that size_of::<H>() == size_of::<T>()
-        // On top of that, there is going to be only one reference to buffer 
during this function call
-        // so no other references can observe the mutation.
-        // In the future we can replace the `Bytes` buffer with something that 
does not allow sharing between different threads.
-        let buffer = self.into_inner();
-        unsafe {
-            let ptr = buffer.as_ptr() as *mut u8;
-            let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
-            // Zero out to ensure valid bit patterns before creating a typed 
reference.
-            slice.fill(0);
-            let new_header =
-                bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed 
bytes are valid");
-            f(old_header, new_header);
-        }
+        // Only the consensus header bytes are mutated. The command-specific
+        // prefix and immutable tail remain untouched.
+        let (mut prefix, tail) = self.into_inner();
+        let slice = &mut prefix[..size_of::<T>()];
+        slice.fill(0);
+        let new_header =
+            bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed bytes 
are valid");
+        f(old_header, new_header);
 
         Message {
-            buffer,
+            prefix,
+            tail,
             _marker: PhantomData,
         }
     }
 
-    /// Get a reference to the message body (everything after the header).
-    ///
-    /// Returns an empty slice if there is no body.
-    #[inline]
-    #[allow(unused)]
-    pub fn body(&self) -> &[u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-
-        if total_size > header_size {
-            &self.buffer[header_size..total_size]
-        } else {
-            &[]
-        }
-    }
-
-    /// Get the message body as zero-copy `Bytes`.
-    ///
-    /// Returns an empty `Bytes` if there is no body.
-    #[inline]
-    #[allow(unused)]
-    pub fn body_bytes(&self) -> Bytes {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-
-        if total_size > header_size {
-            self.buffer.slice(header_size..total_size)
-        } else {
-            Bytes::new()
-        }
-    }
-
-    /// Get the complete message as bytes (header + body).
-    #[inline]
-    #[allow(unused)]
-    pub fn as_bytes(&self) -> &[u8] {
-        let total_size = self.header().size() as usize;
-        &self.buffer[..total_size]
-    }
-
     /// Convert into the underlying buffer.
     #[inline]
     #[allow(unused)]
-    pub fn into_inner(self) -> Bytes {
-        self.buffer
+    pub fn into_inner(self) -> MessageBuffer {
+        (self.prefix, self.tail)
     }
 
     /// Create a message from a buffer without validation.
@@ -199,9 +151,11 @@ where
     /// - If doing a zero-cost type conversion (like to GenericHeader)
     #[inline]
     #[allow(unused)]
-    unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
+    unsafe fn from_buffer_unchecked(buffer: MessageBuffer) -> Self {
+        let (prefix, tail) = buffer;
         Self {
-            buffer,
+            prefix,
+            tail,
             _marker: PhantomData,
         }
     }
@@ -211,7 +165,7 @@ where
     /// This allows treating any message as a generic message for common 
operations.
     #[allow(unused)]
     pub fn into_generic(self) -> Message<header::GenericHeader> {
-        unsafe { Message::from_buffer_unchecked(self.buffer) }
+        unsafe { Message::from_buffer_unchecked((self.prefix, self.tail)) }
     }
 
     /// Get a reference to this message as a generic message.
@@ -237,7 +191,7 @@ where
     where
         T: ConsensusHeader,
     {
-        if self.buffer.len() < size_of::<T>() {
+        if self.total_len() < size_of::<T>() || self.prefix.len() < 
size_of::<T>() {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: T::COMMAND,
                 found: header::Command2::Reserved,
@@ -253,13 +207,13 @@ where
         }
 
         // Validate bit patterns for the target type
-        let header_bytes = &self.buffer[..size_of::<T>()];
+        let header_bytes = &self.prefix[..size_of::<T>()];
         let header = bytemuck::checked::try_from_bytes::<T>(header_bytes)
             .map_err(|_| header::ConsensusError::InvalidBitPattern)?;
 
         header.validate()?;
 
-        Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
+        Ok(unsafe { Message::<T>::from_buffer_unchecked((self.prefix, 
self.tail)) })
     }
 
     /// Try to get a reference to this message as a different header type.
@@ -271,7 +225,7 @@ where
         T: ConsensusHeader,
     {
         // check buffer size
-        if self.buffer.len() < size_of::<T>() {
+        if self.total_len() < size_of::<T>() || self.prefix.len() < 
size_of::<T>() {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: T::COMMAND,
                 found: header::Command2::Reserved,
@@ -288,7 +242,7 @@ where
         }
 
         // Validate bit patterns for the target type
-        let header_bytes = &self.buffer[..size_of::<T>()];
+        let header_bytes = &self.prefix[..size_of::<T>()];
         bytemuck::checked::try_from_bytes::<T>(header_bytes)
             .map_err(|_| header::ConsensusError::InvalidBitPattern)?
             .validate()?;
@@ -297,6 +251,21 @@ where
 
         Ok(typed_message)
     }
+
+    fn total_len(&self) -> usize {
+        self.prefix.len() + self.tail.len()
+    }
+}
+
+impl<H> TryFrom<MessageBuffer> for Message<H>
+where
+    H: ConsensusHeader,
+{
+    type Error = header::ConsensusError;
+
+    fn try_from(buffer: MessageBuffer) -> Result<Self, Self::Error> {
+        Self::from_inner(buffer)
+    }
 }
 
 #[derive(Debug)]
@@ -378,6 +347,28 @@ mod tests {
 
     use super::*;
 
+    fn merged_body<H>(message: &Message<H>) -> Vec<u8>
+    where
+        H: ConsensusHeader,
+    {
+        let total_size = message.header().size() as usize;
+        let header_size = size_of::<H>();
+        let split_at = message.prefix.len();
+
+        if total_size <= split_at {
+            return message.prefix[header_size..total_size].to_vec();
+        }
+
+        if split_at <= header_size {
+            return message.tail[..total_size - header_size].to_vec();
+        }
+
+        let mut body = Vec::with_capacity(total_size - header_size);
+        body.extend_from_slice(&message.prefix[header_size..split_at]);
+        body.extend_from_slice(&message.tail[..total_size - split_at]);
+        body
+    }
+
     trait MessageFactory: ConsensusHeader + Sized {
         fn create_test() -> Message<Self>;
     }
@@ -407,7 +398,8 @@ mod tests {
                 *item = (i % 256) as u8;
             }
 
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
+            
Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size))
+                .expect("test buffer must contain a valid consensus message")
         }
     }
 
@@ -435,7 +427,8 @@ mod tests {
             header.timestamp = 1234567890;
             header.operation = header::Operation::CreateStream;
 
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
+            
Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size))
+                .expect("test buffer must contain a valid consensus message")
         }
     }
 
@@ -457,7 +450,8 @@ mod tests {
             header.replica = 2;
             header.commit = 50;
 
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
+            
Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size))
+                .expect("test buffer must contain a valid consensus message")
         }
     }
 
@@ -482,7 +476,8 @@ mod tests {
             header.commit = 99;
             header.operation = header::Operation::CreateStream;
 
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
+            
Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size))
+                .expect("test buffer must contain a valid consensus message")
         }
     }
 
@@ -493,13 +488,13 @@ mod tests {
         assert_eq!(message.header().cluster, 12345);
         assert_eq!(message.header().command, header::Command2::Reserved);
         assert_eq!(
-            message.body().len(),
+            merged_body(&message).len(),
             message.header().size() as usize - 
size_of::<header::GenericHeader>()
         );
 
-        let body = message.body();
+        let body = merged_body(&message);
         let header_size = size_of::<header::GenericHeader>();
-        for (i, &byte) in body.iter().enumerate() {
+        for (i, byte) in body.into_iter().enumerate() {
             let expected = ((i + header_size) % 256) as u8;
             assert_eq!(byte, expected);
         }
@@ -508,8 +503,8 @@ mod tests {
     #[test]
     fn test_message_conversion() {
         let prepare_message = header::PrepareHeader::create_test();
-
-        let original_bytes = prepare_message.as_bytes().to_vec();
+        let original_header = *prepare_message.header();
+        let original_body = merged_body(&prepare_message);
 
         let generic_message = prepare_message.into_generic();
         assert_eq!(generic_message.header().command, 
header::Command2::Prepare);
@@ -521,12 +516,12 @@ mod tests {
         assert_eq!(prepare_again.header().view, 1);
         assert_eq!(prepare_again.header().cluster, 12345);
 
-        let roundtrip_bytes = prepare_again.as_bytes().to_vec();
-
         assert_eq!(
-            original_bytes, roundtrip_bytes,
-            "Bytes should be identical after round-trip conversion"
+            bytemuck::bytes_of(&original_header),
+            bytemuck::bytes_of(prepare_again.header()),
+            "Header should be identical after round-trip conversion"
         );
+        assert_eq!(original_body, merged_body(&prepare_again));
     }
 
     #[test]
diff --git a/core/common/src/types/segment_storage/messages_writer.rs 
b/core/common/src/types/segment_storage/messages_writer.rs
index 6f81f7752..ae8864699 100644
--- a/core/common/src/types/segment_storage/messages_writer.rs
+++ b/core/common/src/types/segment_storage/messages_writer.rs
@@ -117,7 +117,6 @@ impl MessagesWriter {
 
         Ok(IggyByteSize::from(messages_size))
     }
-
     pub fn path(&self) -> String {
         self.file_path.clone()
     }
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 2695f624c..c4e3e13ad 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -32,6 +32,7 @@ bit-set = { workspace = true }
 bytemuck = { workspace = true }
 bytes = { workspace = true }
 iggy_common = { workspace = true }
+iobuf = { workspace = true }
 message_bus = { workspace = true }
 rand = { workspace = true }
 rand_xoshiro = { workspace = true }
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 31356a448..0fdeb5ff8 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -18,6 +18,7 @@
 use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, 
VsrConsensus};
 use iggy_common::header::{Command2, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader};
 use iggy_common::message::Message;
+use iobuf::Owned;
 use message_bus::MessageBus;
 use std::ops::AsyncFnOnce;
 
@@ -267,8 +268,8 @@ where
         buffer[header_size..].copy_from_slice(&body);
     }
 
-    Message::<ReplyHeader>::from_bytes(buffer.freeze())
-        .expect("build_reply_message: constructed header must be valid")
+    
Message::try_from(Owned::<4096>::copy_from_slice(buffer.as_ref()).split_at(header_size))
+        .expect("reply buffer must contain a valid reply message")
 }
 
 /// Verify hash chain would not break if we add this header.
@@ -388,17 +389,17 @@ mod tests {
         async fn send_to_client(
             &self,
             _client_id: Self::Client,
-            _data: Self::Data,
-        ) -> Result<(), IggyError> {
-            Ok(())
+            data: Self::Data,
+        ) -> Result<Self::Data, IggyError> {
+            Ok(data)
         }
 
         async fn send_to_replica(
             &self,
             _replica: Self::Replica,
-            _data: Self::Data,
-        ) -> Result<(), IggyError> {
-            Ok(())
+            data: Self::Data,
+        ) -> Result<Self::Data, IggyError> {
+            Ok(data)
         }
     }
 
@@ -596,17 +597,17 @@ mod tests {
         async fn send_to_client(
             &self,
             _client_id: Self::Client,
-            _data: Self::Data,
-        ) -> Result<(), IggyError> {
-            Ok(())
+            data: Self::Data,
+        ) -> Result<Self::Data, IggyError> {
+            Ok(data)
         }
         async fn send_to_replica(
             &self,
             replica: Self::Replica,
             data: Self::Data,
-        ) -> Result<(), IggyError> {
-            self.sent.borrow_mut().push((replica, data));
-            Ok(())
+        ) -> Result<Self::Data, IggyError> {
+            self.sent.borrow_mut().push((replica, data.clone()));
+            Ok(data)
         }
     }
 
diff --git a/core/iobuf/Cargo.toml b/core/iobuf/Cargo.toml
index 62a8b8cc1..9ec4e8cd6 100644
--- a/core/iobuf/Cargo.toml
+++ b/core/iobuf/Cargo.toml
@@ -23,3 +23,4 @@ license = "Apache-2.0"
 
 [dependencies]
 aligned-vec = "0.6"
+compio-buf = "0.8.0"
diff --git a/core/iobuf/src/lib.rs b/core/iobuf/src/lib.rs
index 4b9006672..43ddcb22c 100644
--- a/core/iobuf/src/lib.rs
+++ b/core/iobuf/src/lib.rs
@@ -15,12 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::mem::ManuallyDrop;
+use std::ops::{Deref, DerefMut, RangeBounds};
 use std::ptr::NonNull;
 use std::slice;
 use std::sync::atomic::{AtomicUsize, Ordering, fence};
 
 use aligned_vec::{AVec, ConstAlign};
+use compio_buf::IoBuf;
+
+const fn assert_even_alignment<const ALIGN: usize>() {
+    assert!(
+        ALIGN % 2 == 0,
+        "ALIGN must be divisible by 2 for control-block pointer tagging"
+    );
+}
 
 #[derive(Debug)]
 pub struct Owned<const ALIGN: usize = 4096> {
@@ -40,6 +48,20 @@ impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, 
ConstAlign<ALIGN>> {
 }
 
 impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn zeroed(len: usize) -> Self {
+        const { assert_even_alignment::<ALIGN>() };
+        let mut inner: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN);
+        inner.resize(len, 0);
+        Self { inner }
+    }
+
+    pub fn copy_from_slice(data: &[u8]) -> Self {
+        const { assert_even_alignment::<ALIGN>() };
+        let mut inner: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN);
+        inner.extend_from_slice(data);
+        Self { inner }
+    }
+
     pub fn as_slice(&self) -> &[u8] {
         &self.inner
     }
@@ -48,11 +70,16 @@ impl<const ALIGN: usize> Owned<ALIGN> {
         &mut self.inner
     }
 
-    /// Split `Owned` buffer into two halves
+    /// Split `Owned` into a mutable copied-on-clone prefix and an immutable 
shared tail.
     ///
     /// # Panics
     /// Panics if `split_at > self.len()`.
-    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+    pub fn split_at(self, split_at: usize) -> (Prefix<ALIGN>, Frozen<ALIGN>) {
+        let (prefix, tail) = self.split_extent_pair(split_at);
+        (Prefix { inner: prefix }, Frozen { inner: tail })
+    }
+
+    fn split_extent_pair(self, split_at: usize) -> (Extent<ALIGN>, 
Extent<ALIGN>) {
         assert!(split_at <= self.inner.len());
 
         // Take ownership of the AVec's allocation. After this, we are 
responsible
@@ -68,143 +95,132 @@ impl<const ALIGN: usize> Owned<ALIGN> {
         // We need to increment the ref_count as the resulting halves will 
both point to the same control block.
         unsafe { ctrlb.as_ref().ref_count.fetch_add(1, Ordering::Relaxed) };
 
-        TwoHalves {
-            inner: (
-                Extent {
-                    ptr: base,
-                    len: split_at,
-                    ctrlb,
-                    _pad: 0,
-                },
-                Extent {
-                    ptr: tail,
-                    len: len - split_at,
-                    ctrlb,
-                    _pad: 0,
-                },
-            ),
-        }
+        (
+            Extent {
+                ptr: base,
+                len: split_at,
+                ctrlb,
+                _pad: 0,
+            },
+            Extent {
+                ptr: tail,
+                len: len - split_at,
+                ctrlb,
+                _pad: 0,
+            },
+        )
     }
 }
 
-pub struct TwoHalves<const ALIGN: usize> {
-    inner: (Extent<ALIGN>, Extent<ALIGN>),
+pub trait TryMerge: Sized {
+    type Output;
+
+    /// # Safety
+    ///
+    /// The caller must guarantee that the second part is the suffix of the
+    /// original frame allocation, beginning exactly after the first part.
+    unsafe fn try_merge(self) -> Result<Self::Output, Self>;
 }
 
-// SAFETY: `TwoHalves` can be sent across threads as long as the caller 
ensures that the head half is not shared between
-// threads (e.g. by cloning, which creates a new head half),
-// and that the tail half is only shared immutably (e.g. by cloning, which 
shares the tail half immutably).
-unsafe impl<const ALIGN: usize> Send for TwoHalves<ALIGN> {}
+impl<const ALIGN: usize> TryMerge for (Prefix<ALIGN>, Frozen<ALIGN>) {
+    type Output = Owned<ALIGN>;
 
-impl<const ALIGN: usize> TwoHalves<ALIGN> {
-    pub fn head(&self) -> &[u8] {
-        self.inner.0.as_slice()
+    unsafe fn try_merge(self) -> Result<Self::Output, Self> {
+        let (prefix, tail) = self;
+        match unsafe { try_merge_extents(prefix.inner, tail.inner) } {
+            Ok(owned) => Ok(owned),
+            Err((prefix, tail)) => Err((Prefix { inner: prefix }, Frozen { 
inner: tail })),
+        }
     }
+}
 
-    pub fn head_mut(&mut self) -> &mut [u8] {
-        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
-        // instead it gets copied.
-        unsafe { self.inner.0.as_mut_slice() }
-    }
+pub struct Prefix<const ALIGN: usize> {
+    inner: Extent<ALIGN>,
+}
 
-    pub fn tail(&self) -> &[u8] {
-        self.inner.1.as_slice()
-    }
+// SAFETY: `Prefix` owns the mutable front extent. Clone copies the bytes into 
a new
+// allocation, so the mutable prefix is never shared across threads by 
aliasing.
+unsafe impl<const ALIGN: usize> Send for Prefix<ALIGN> {}
 
-    pub fn split_point(&self) -> usize {
-        self.inner.0.len
+impl<const ALIGN: usize> Prefix<ALIGN> {
+    pub fn copy_from_slice(src: &[u8]) -> Self {
+        Self {
+            inner: Extent::copy_from_slice(src),
+        }
     }
 
-    pub fn total_len(&self) -> usize {
-        self.inner.0.len + self.inner.1.len
+    pub fn len(&self) -> usize {
+        self.inner.len
     }
 
-    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
-        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
-        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
-        let ref_count = unsafe {
-            self.inner
-                .1
-                .ctrlb
-                .as_ref()
-                .ref_count
-                .load(Ordering::Acquire)
-        };
-        // When ctrlb_eq, both extents share the same control block with 
refcount 2.
-        // When !ctrlb_eq (after clone), the tail has its own refcount.
-        let is_unique = if ctrlb_eq {
-            ref_count == 2
-        } else {
-            ref_count == 1
-        };
+    pub fn is_empty(&self) -> bool {
+        self.inner.len == 0
+    }
 
-        if !is_unique {
-            return Err(self);
+    fn ensure_mutable(&mut self) {
+        if !is_copy_on_write_tagged(self.inner.ctrlb) {
+            return;
         }
 
-        // Transfer ownership to prevent Extent::drop from running.
-        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run the compiler-generated drop.
-        let this = ManuallyDrop::new(self);
-        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
-        let split_at = head.len;
-
-        // SAFETY: `tail.ctrlb` is unique at this point (is_unique checked 
above),
-        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
-        // that must be released after copying.
-        unsafe {
-            if !ctrlb_eq {
-                let dst_ctrlb = tail.ctrlb.as_ref();
-
-                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
-                let dst = slice::from_raw_parts_mut(dst_ctrlb.base.as_ptr(), 
split_at);
-                dst.copy_from_slice(head.as_slice());
-            }
-
-            // Dropping the head in `ctrlb_eq` case, should decrease the 
refcount to 1, so it's safe to reuse tail control block.
-            // In case when head was it's own allocation, we guarantee that 
it's always unique.
-            drop(head);
-            let tail_ctrlb = tail.ctrlb;
-            // Prevent tail Drop from running, we're taking ownership of the 
control block.
-            std::mem::forget(tail);
-
-            let ctrlb = reclaim_unique_control_block(tail_ctrlb);
-            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
-            // are now exclusively owned by this path.
-            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
-            Ok(Owned { inner })
+        let ctrlb = clear_copy_on_write_tag(self.inner.ctrlb);
+        // Prefix reconstructed from Frozen may alias other overlapping views.
+        // Detach unless the only remaining aliases are the prefix itself and 
its paired tail.
+        let ref_count = unsafe { 
ctrlb.as_ref().ref_count.load(Ordering::Acquire) };
+        if ref_count > 2 {
+            self.inner = Extent::copy_from_slice(self.inner.as_slice());
+            return;
         }
+
+        self.inner.ctrlb = ctrlb;
     }
 }
 
-impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+impl<const ALIGN: usize> Clone for Prefix<ALIGN> {
     fn clone(&self) -> Self {
         Self {
-            inner: (
-                Extent::<ALIGN>::copy_from_slice(self.head()),
-                self.inner.1.clone(),
-            ),
+            inner: Extent::copy_from_slice(self.inner.as_slice()),
         }
     }
 }
 
-impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+impl<const ALIGN: usize> std::fmt::Debug for Prefix<ALIGN> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("TwoHalves")
-            .field("split_point", &self.split_point())
-            .field("head_len", &self.inner.0.len)
-            .field("tail_len", &self.inner.1.len)
-            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+        f.debug_struct("Prefix")
+            .field("len", &self.len())
+            .field("copy_on_write", &is_copy_on_write_tagged(self.inner.ctrlb))
             .finish()
     }
 }
 
+impl<const ALIGN: usize> Deref for Prefix<ALIGN> {
+    type Target = [u8];
+
+    fn deref(&self) -> &Self::Target {
+        self.inner.as_slice()
+    }
+}
+
+impl<const ALIGN: usize> DerefMut for Prefix<ALIGN> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        self.ensure_mutable();
+        // SAFETY: Prefix either comes from `Owned::split_at` and is safe to 
mutate
+        // in place, or it has been detached into its own allocation by 
`ensure_mutable`.
+        unsafe { self.inner.as_mut_slice() }
+    }
+}
+
 #[derive(Clone)]
 pub struct Frozen<const ALIGN: usize> {
     inner: Extent<ALIGN>,
 }
 
+// SAFETY: `Frozen` provides immutable access only, and clones share the 
underlying
+// extent immutably.
+unsafe impl<const ALIGN: usize> Send for Frozen<ALIGN> {}
+
 impl<const ALIGN: usize> From<Owned<ALIGN>> for Frozen<ALIGN> {
     fn from(value: Owned<ALIGN>) -> Self {
+        const { assert_even_alignment::<ALIGN>() };
         let inner = value.inner;
         let (ptr, _, len, capacity) = inner.into_raw_parts();
 
@@ -226,6 +242,78 @@ impl<const ALIGN: usize> Frozen<ALIGN> {
     pub fn as_slice(&self) -> &[u8] {
         self.inner.as_slice()
     }
+
+    /// Split `Frozen` into a copy-on-write prefix and immutable tail view.
+    pub fn split_at(self, split_at: usize) -> (Prefix<ALIGN>, Frozen<ALIGN>) {
+        const { assert_even_alignment::<ALIGN>() };
+        assert!(split_at <= self.inner.len);
+
+        let mut prefix = self.inner.clone();
+        prefix.len = split_at;
+        prefix.ctrlb = set_copy_on_write_tag(prefix.ctrlb);
+
+        let mut tail = self.inner;
+        // SAFETY: `split_at` is bounds-checked against the extent length.
+        tail.ptr = unsafe { 
NonNull::new_unchecked(tail.ptr.as_ptr().add(split_at)) };
+        tail.len -= split_at;
+
+        (Prefix { inner: prefix }, Frozen { inner: tail })
+    }
+
+    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
+        use std::ops::Bound;
+
+        let begin = match range.start_bound() {
+            Bound::Included(&n) => n,
+            Bound::Excluded(&n) => n + 1,
+            Bound::Unbounded => 0,
+        };
+
+        let end = match range.end_bound() {
+            Bound::Included(&n) => n.checked_add(1).expect("out of range"),
+            Bound::Excluded(&n) => n,
+            Bound::Unbounded => self.len(),
+        };
+
+        assert!(begin <= self.len());
+        assert!(begin <= end);
+        assert!(end <= self.len());
+
+        let mut sliced = self.clone();
+        // SAFETY: begin/end are bounds-checked against the current extent 
length.
+        let ptr = unsafe { 
NonNull::new_unchecked(sliced.inner.ptr.as_ptr().add(begin)) };
+        sliced.inner.ptr = ptr;
+        sliced.inner.len = end - begin;
+        sliced
+    }
+
+    pub fn len(&self) -> usize {
+        self.inner.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.inner.len == 0
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for Frozen<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Frozen").field("len", &self.len()).finish()
+    }
+}
+
+impl<const ALIGN: usize> Deref for Frozen<ALIGN> {
+    type Target = [u8];
+
+    fn deref(&self) -> &Self::Target {
+        self.as_slice()
+    }
+}
+
+impl<const ALIGN: usize> IoBuf for Frozen<ALIGN> {
+    fn as_init(&self) -> &[u8] {
+        self.as_slice()
+    }
 }
 
 #[repr(C, align(64))]
@@ -253,7 +341,7 @@ struct Extent<const ALIGN: usize> {
     ptr: NonNull<u8>,
     len: usize,
     ctrlb: NonNull<ControlBlock>,
-    // Padded to 32 bytes in order to avoid false sharing when used by 
`TwoHalves`
+    // Padded to 32 bytes in order to avoid false sharing when used by split 
frame pairs.
     // If `Extent` would be smaller than 32 bytes, two `Extent`s that are 
adjacent in memory
     // could potentially share the same cache line + some extra
     // that extra could lead to false sharing, in case of invalidation of extra
@@ -263,7 +351,7 @@ struct Extent<const ALIGN: usize> {
 impl<const ALIGN: usize> Drop for Extent<ALIGN> {
     fn drop(&mut self) {
         // SAFETY: `self.ctrlb` points to a live control block while `self` is 
alive.
-        unsafe { release_control_block_w_allocation::<ALIGN>(self.ctrlb) }
+        unsafe { 
release_control_block_w_allocation::<ALIGN>(clear_copy_on_write_tag(self.ctrlb))
 }
     }
 }
 
@@ -279,6 +367,7 @@ impl<const ALIGN: usize> Extent<ALIGN> {
     }
 
     fn copy_from_slice(src: &[u8]) -> Self {
+        const { assert_even_alignment::<ALIGN>() };
         let mut v: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN);
         v.extend_from_slice(src);
 
@@ -298,12 +387,10 @@ impl<const ALIGN: usize> Extent<ALIGN> {
 
 impl<const ALIGN: usize> Clone for Extent<ALIGN> {
     fn clone(&self) -> Self {
+        let ctrlb = clear_copy_on_write_tag(self.ctrlb);
         // SAFETY: `self.ctrlb` points to a live control block while `self` is 
alive.
         unsafe {
-            self.ctrlb
-                .as_ref()
-                .ref_count
-                .fetch_add(1, Ordering::Relaxed);
+            ctrlb.as_ref().ref_count.fetch_add(1, Ordering::Relaxed);
         }
         Self {
             ptr: self.ptr,
@@ -314,7 +401,24 @@ impl<const ALIGN: usize> Clone for Extent<ALIGN> {
     }
 }
 
+const COPY_ON_WRITE_TAG: usize = 1;
+
+fn is_copy_on_write_tagged(ctrlb: NonNull<ControlBlock>) -> bool {
+    ctrlb.as_ptr().addr() & COPY_ON_WRITE_TAG != 0
+}
+
+fn set_copy_on_write_tag(ctrlb: NonNull<ControlBlock>) -> 
NonNull<ControlBlock> {
+    // SAFETY: tagging only toggles the low alignment bit while preserving 
provenance.
+    unsafe { NonNull::new_unchecked(ctrlb.as_ptr().map_addr(|addr| addr | 
COPY_ON_WRITE_TAG)) }
+}
+
+fn clear_copy_on_write_tag(ctrlb: NonNull<ControlBlock>) -> 
NonNull<ControlBlock> {
+    // SAFETY: clearing only strips the low tag bit while preserving 
provenance.
+    unsafe { NonNull::new_unchecked(ctrlb.as_ptr().map_addr(|addr| addr & 
!COPY_ON_WRITE_TAG)) }
+}
+
 unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb: 
NonNull<ControlBlock>) {
+    const { assert_even_alignment::<ALIGN>() };
     // SAFETY: ctrlb is valid per function preconditions
     let old = unsafe { ctrlb.as_ref() }
         .ref_count
@@ -371,12 +475,65 @@ unsafe fn reclaim_unique_control_block(ctrlb: 
NonNull<ControlBlock>) -> ControlB
     unsafe { *Box::from_raw(ctrlb.as_ptr()) }
 }
 
+/// # Safety
+///
+/// The caller must guarantee that `tail` is the suffix of the original frame
+/// allocation and begins exactly after `prefix`.
+unsafe fn try_merge_extents<const ALIGN: usize>(
+    prefix: Extent<ALIGN>,
+    tail: Extent<ALIGN>,
+) -> Result<Owned<ALIGN>, (Extent<ALIGN>, Extent<ALIGN>)> {
+    const { assert_even_alignment::<ALIGN>() };
+    let split_at = prefix.len;
+    // SAFETY: `tail.ctrlb` points to a live control block while `tail` is 
alive.
+    let tail_ctrlb = clear_copy_on_write_tag(tail.ctrlb);
+    let prefix_ctrlb = clear_copy_on_write_tag(prefix.ctrlb);
+    let ctrlb_eq = std::ptr::addr_eq(prefix_ctrlb.as_ptr(), 
tail_ctrlb.as_ptr());
+    let ref_count = unsafe { 
tail_ctrlb.as_ref().ref_count.load(Ordering::Acquire) };
+
+    // When ctrlb_eq, both extents share the same control block with refcount 
2.
+    // When !ctrlb_eq, the tail must be unique for the merge to reclaim its 
allocation.
+    let is_unique = if ctrlb_eq {
+        ref_count == 2
+    } else {
+        ref_count == 1
+    };
+    if !is_unique {
+        return Err((prefix, tail));
+    }
+
+    unsafe {
+        if !ctrlb_eq {
+            let dst_ctrlb = tail_ctrlb.as_ref();
+            // We are patching up the original allocation, with the current 
prefix data,
+            // so that the resulting `Owned` has correct content.
+            let dst = slice::from_raw_parts_mut(dst_ctrlb.base.as_ptr(), 
split_at);
+            dst.copy_from_slice(prefix.as_slice());
+        }
+
+        // Dropping the prefix in `ctrlb_eq` case should decrease the refcount 
to 1,
+        // so it's safe to reuse the tail control block. In the case where the 
prefix
+        // owns its own allocation, we guarantee that it's always unique.
+        drop(prefix);
+        let tail_ctrlb = tail_ctrlb;
+        // Prevent tail Drop from running, we're taking ownership of the 
control block.
+        std::mem::forget(tail);
+
+        let ctrlb = reclaim_unique_control_block(tail_ctrlb);
+        // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+        // are now exclusively owned by this path.
+        let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+        Ok(Owned { inner })
+    }
+}
+
 // TODO: Better tests & miri.
 #[cfg(test)]
 mod tests {
-    use super::Owned;
+    use super::{Frozen, Owned, Prefix, TryMerge};
     use aligned_vec::AVec;
     use aligned_vec::ConstAlign;
+    use std::mem;
 
     fn make_owned(data: &[u8]) -> Owned {
         let mut v: AVec<u8, ConstAlign<4096>> = AVec::new(4096);
@@ -385,72 +542,71 @@ mod tests {
     }
 
     #[test]
-    fn split_exposes_head_and_tail() {
+    fn split_exposes_prefix_and_tail() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let mut buffer = owned.split_at(2);
+        let (mut prefix, tail) = owned.split_at(2);
 
-        assert_eq!(buffer.head(), &[1, 2]);
-        assert_eq!(buffer.tail(), &[3, 4, 5]);
-        assert_eq!(buffer.split_point(), 2);
-        assert_eq!(buffer.total_len(), 5);
+        assert_eq!(&*prefix, &[1, 2]);
+        assert_eq!(&*tail, &[3, 4, 5]);
 
-        buffer.head_mut().copy_from_slice(&[9, 8]);
-        assert_eq!(buffer.head(), &[9, 8]);
-        assert_eq!(buffer.tail(), &[3, 4, 5]);
+        prefix.copy_from_slice(&[9, 8]);
+        assert_eq!(&*prefix, &[9, 8]);
+        assert_eq!(&*tail, &[3, 4, 5]);
     }
 
     #[test]
-    fn clone_copies_head_and_shares_tail() {
+    fn clone_copies_prefix_and_shares_tail() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let mut original = owned.split_at(2);
-        let mut cloned = original.clone();
+        let (mut original_prefix, original_tail) = owned.split_at(2);
+        let (mut cloned_prefix, cloned_tail) = (original_prefix.clone(), 
original_tail.clone());
 
-        original.head_mut().copy_from_slice(&[9, 9]);
-        cloned.head_mut().copy_from_slice(&[7, 7]);
+        original_prefix.copy_from_slice(&[9, 9]);
+        cloned_prefix.copy_from_slice(&[7, 7]);
 
-        assert_eq!(original.head(), &[9, 9]);
-        assert_eq!(cloned.head(), &[7, 7]);
-        assert_eq!(original.tail(), &[3, 4, 5]);
-        assert_eq!(cloned.tail(), &[3, 4, 5]);
+        assert_eq!(&*original_prefix, &[9, 9]);
+        assert_eq!(&*cloned_prefix, &[7, 7]);
+        assert_eq!(&*original_tail, &[3, 4, 5]);
+        assert_eq!(&*cloned_tail, &[3, 4, 5]);
     }
 
     #[test]
     fn try_merge_reuses_original_frame_when_unique() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let mut buffer = owned.split_at(2);
-        buffer.head_mut().copy_from_slice(&[8, 9]);
+        let (mut prefix, tail) = owned.split_at(2);
+        prefix.copy_from_slice(&[8, 9]);
 
-        let merged: AVec<u8, ConstAlign<4096>> = 
buffer.try_merge().unwrap().into();
+        let merged: AVec<u8, ConstAlign<4096>> =
+            unsafe { (prefix, tail).try_merge() }.unwrap().into();
         assert_eq!(merged.as_slice(), &[8, 9, 3, 4, 5]);
     }
 
     #[test]
     fn try_merge_fails_while_tail_is_shared() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let buffer = owned.split_at(2);
-        let clone = buffer.clone();
+        let parts = owned.split_at(2);
+        let clone = parts.clone();
 
         // Merge fails because tail is shared
-        let buffer = buffer.try_merge().unwrap_err();
+        let parts = unsafe { parts.try_merge() }.unwrap_err();
 
         drop(clone);
 
         // Now merge succeeds
-        let merged: AVec<u8, ConstAlign<4096>> = 
buffer.try_merge().unwrap().into();
+        let merged: AVec<u8, ConstAlign<4096>> = unsafe { parts.try_merge() 
}.unwrap().into();
         assert_eq!(merged.as_slice(), &[1, 2, 3, 4, 5]);
     }
 
     #[test]
-    fn merge_after_cloned_head_mutation_writes_back_to_original_frame() {
+    fn merge_after_cloned_prefix_mutation_writes_back_to_original_frame() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let buffer = owned.split_at(2);
-        let mut clone = buffer.clone();
+        let parts = owned.split_at(2);
+        let mut clone = parts.clone();
 
-        drop(buffer);
+        drop(parts);
 
-        clone.head_mut().copy_from_slice(&[4, 2]);
+        clone.0.copy_from_slice(&[4, 2]);
 
-        let merged: AVec<u8, ConstAlign<4096>> = 
clone.try_merge().unwrap().into();
+        let merged: AVec<u8, ConstAlign<4096>> = unsafe { clone.try_merge() 
}.unwrap().into();
         assert_eq!(merged.as_slice(), &[4, 2, 3, 4, 5]);
     }
 
@@ -458,13 +614,13 @@ mod tests {
     fn zero_length_splits_work() {
         let owned = make_owned(&[1, 2, 3]);
         let left_empty = owned.split_at(0);
-        assert_eq!(left_empty.head(), &[]);
-        assert_eq!(left_empty.tail(), &[1, 2, 3]);
+        assert_eq!(&*left_empty.0, &[]);
+        assert_eq!(&*left_empty.1, &[1, 2, 3]);
 
         let owned = make_owned(&[1, 2, 3]);
         let right_empty = owned.split_at(3);
-        assert_eq!(right_empty.head(), &[1, 2, 3]);
-        assert_eq!(right_empty.tail(), &[]);
+        assert_eq!(&*right_empty.0, &[1, 2, 3]);
+        assert_eq!(&*right_empty.1, &[]);
     }
 
     #[test]
@@ -474,8 +630,8 @@ mod tests {
         let clone1 = original.clone();
         let _clone2 = clone1.clone();
 
-        // All clones share tail, so merge should fail
-        assert!(original.try_merge().is_err());
+        // All clones share tail, so merge should fail.
+        assert!(unsafe { original.try_merge() }.is_err());
     }
 
     #[test]
@@ -514,91 +670,164 @@ mod tests {
         owned.as_mut_slice()[0] = 99;
         owned.as_mut_slice()[4] = 88;
 
-        let buffer = owned.split_at(2);
-        assert_eq!(buffer.head(), &[99, 2]);
-        assert_eq!(buffer.tail(), &[3, 4, 88]);
+        let (prefix, tail) = owned.split_at(2);
+        assert_eq!(&*prefix, &[99, 2]);
+        assert_eq!(&*tail, &[3, 4, 88]);
     }
 
     #[test]
-    fn two_halves_head_returns_correct_slice() {
+    fn prefix_returns_correct_slice() {
         let owned = make_owned(&[10, 20, 30, 40, 50]);
-        let buffer = owned.split_at(3);
+        let (prefix, _) = owned.split_at(3);
 
-        assert_eq!(buffer.head(), &[10, 20, 30]);
+        assert_eq!(&*prefix, &[10, 20, 30]);
     }
 
     #[test]
-    fn two_halves_tail_returns_correct_slice() {
+    fn tail_returns_correct_slice() {
         let owned = make_owned(&[10, 20, 30, 40, 50]);
-        let buffer = owned.split_at(3);
+        let (_, tail) = owned.split_at(3);
 
-        assert_eq!(buffer.tail(), &[40, 50]);
+        assert_eq!(&*tail, &[40, 50]);
     }
 
     #[test]
-    fn two_halves_head_mut_allows_modification() {
+    fn prefix_allows_modification() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let mut buffer = owned.split_at(3);
+        let (mut prefix, tail) = owned.split_at(3);
 
-        buffer.head_mut()[0] = 100;
-        buffer.head_mut()[2] = 200;
+        prefix[0] = 100;
+        prefix[2] = 200;
 
-        assert_eq!(buffer.head(), &[100, 2, 200]);
-        assert_eq!(buffer.tail(), &[4, 5]);
+        assert_eq!(&*prefix, &[100, 2, 200]);
+        assert_eq!(&*tail, &[4, 5]);
     }
 
     #[test]
-    fn two_halves_head_mut_full_overwrite() {
+    fn prefix_full_overwrite() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let mut buffer = owned.split_at(3);
+        let (mut prefix, tail) = owned.split_at(3);
 
-        buffer.head_mut().copy_from_slice(&[7, 8, 9]);
+        prefix.copy_from_slice(&[7, 8, 9]);
 
-        assert_eq!(buffer.head(), &[7, 8, 9]);
-        assert_eq!(buffer.tail(), &[4, 5]);
+        assert_eq!(&*prefix, &[7, 8, 9]);
+        assert_eq!(&*tail, &[4, 5]);
     }
 
     #[test]
-    fn two_halves_head_empty_slice() {
+    fn prefix_empty_slice() {
         let owned = make_owned(&[1, 2, 3]);
-        let buffer = owned.split_at(0);
+        let (prefix, tail) = owned.split_at(0);
 
-        assert_eq!(buffer.head(), &[]);
-        assert_eq!(buffer.tail(), &[1, 2, 3]);
+        assert_eq!(&*prefix, &[]);
+        assert_eq!(&*tail, &[1, 2, 3]);
     }
 
     #[test]
-    fn two_halves_tail_empty_slice() {
+    fn tail_empty_slice() {
         let owned = make_owned(&[1, 2, 3]);
-        let buffer = owned.split_at(3);
+        let (prefix, tail) = owned.split_at(3);
 
-        assert_eq!(buffer.head(), &[1, 2, 3]);
-        assert_eq!(buffer.tail(), &[]);
+        assert_eq!(&*prefix, &[1, 2, 3]);
+        assert_eq!(&*tail, &[]);
     }
 
     #[test]
-    fn two_halves_head_mut_does_not_affect_tail() {
+    fn prefix_mutation_does_not_affect_tail() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
-        let mut buffer = owned.split_at(2);
+        let (mut prefix, tail) = owned.split_at(2);
 
-        let original_tail: Vec<u8> = buffer.tail().to_vec();
-        buffer.head_mut().copy_from_slice(&[99, 99]);
+        let original_tail: Vec<u8> = tail.to_vec();
+        prefix.copy_from_slice(&[99, 99]);
 
-        assert_eq!(buffer.tail(), original_tail.as_slice());
+        assert_eq!(&*tail, original_tail.as_slice());
     }
 
     #[test]
-    fn two_halves_cloned_head_mut_independent() {
+    fn cloned_prefix_mutations_are_independent() {
         let owned = make_owned(&[1, 2, 3, 4, 5]);
         let mut original = owned.split_at(2);
         let mut cloned = original.clone();
 
-        original.head_mut().copy_from_slice(&[10, 20]);
-        cloned.head_mut().copy_from_slice(&[30, 40]);
+        original.0.copy_from_slice(&[10, 20]);
+        cloned.0.copy_from_slice(&[30, 40]);
 
-        assert_eq!(original.head(), &[10, 20]);
-        assert_eq!(cloned.head(), &[30, 40]);
-        // Tail is shared, both should see the same data
-        assert_eq!(original.tail(), cloned.tail());
+        assert_eq!(&*original.0, &[10, 20]);
+        assert_eq!(&*cloned.0, &[30, 40]);
+        assert_eq!(&*original.1, &*cloned.1);
+    }
+
+    #[test]
+    fn prefix_clone_copies_and_frozen_clone_shares() {
+        let owned = make_owned(&[1, 2, 3, 4, 5]);
+        let (mut prefix, tail) = owned.split_at(2);
+        let mut prefix_clone = prefix.clone();
+        let tail_clone = tail.clone();
+
+        prefix.copy_from_slice(&[7, 7]);
+        prefix_clone.copy_from_slice(&[8, 8]);
+
+        assert_eq!(&*prefix, &[7, 7]);
+        assert_eq!(&*prefix_clone, &[8, 8]);
+        assert_eq!(&*tail, &[3, 4, 5]);
+        assert_eq!(&*tail_clone, &[3, 4, 5]);
+    }
+
+    #[test]
+    fn split_try_merge_reuses_original_frame_when_unique() {
+        let owned = make_owned(&[1, 2, 3, 4, 5]);
+        let (mut prefix, tail) = owned.split_at(2);
+        prefix.copy_from_slice(&[8, 9]);
+
+        let merged: AVec<u8, ConstAlign<4096>> =
+            unsafe { (prefix, tail).try_merge() }.unwrap().into();
+        assert_eq!(merged.as_slice(), &[8, 9, 3, 4, 5]);
+    }
+
+    #[test]
+    fn split_try_merge_fails_while_tail_is_shared() {
+        let owned = make_owned(&[1, 2, 3, 4, 5]);
+        let parts = owned.split_at(2);
+        let tail_clone = parts.1.clone();
+
+        let parts = unsafe { parts.try_merge() }.unwrap_err();
+        drop(tail_clone);
+
+        let merged: AVec<u8, ConstAlign<4096>> = unsafe { parts.try_merge() 
}.unwrap().into();
+        assert_eq!(merged.as_slice(), &[1, 2, 3, 4, 5]);
+    }
+
+    #[test]
+    fn frozen_can_be_split_into_prefix_and_tail() {
+        let frozen = Frozen::from(make_owned(&[1, 2, 3, 4, 5]));
+        let (prefix, tail) = frozen.split_at(2);
+
+        let merged: AVec<u8, ConstAlign<4096>> =
+            unsafe { (prefix, tail).try_merge() }.unwrap().into();
+        assert_eq!(merged.as_slice(), &[1, 2, 3, 4, 5]);
+    }
+
+    #[test]
+    fn frozen_split_prefix_detaches_before_mutation() {
+        let frozen = Frozen::from(make_owned(&[1, 2, 3, 4, 5]));
+        let alias = frozen.clone();
+        let (mut prefix, tail) = frozen.split_at(2);
+
+        prefix.copy_from_slice(&[9, 8]);
+
+        assert_eq!(&alias[..], &[1, 2, 3, 4, 5]);
+        assert_eq!(&*prefix, &[9, 8]);
+        assert_eq!(&*tail, &[3, 4, 5]);
+
+        drop(alias);
+
+        let merged: AVec<u8, ConstAlign<4096>> =
+            unsafe { (prefix, tail).try_merge() }.unwrap().into();
+        assert_eq!(merged.as_slice(), &[9, 8, 3, 4, 5]);
+    }
+
+    #[test]
+    fn prefix_stays_32_bytes() {
+        assert_eq!(mem::size_of::<Prefix<4096>>(), 32);
     }
 }
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 876061bc4..2a3132786 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -40,12 +40,12 @@ pub trait MessageBus {
         &self,
         client_id: Self::Client,
         data: Self::Data,
-    ) -> impl Future<Output = Result<(), IggyError>>;
+    ) -> impl Future<Output = Result<Self::Data, IggyError>>;
     fn send_to_replica(
         &self,
         replica: Self::Replica,
         data: Self::Data,
-    ) -> impl Future<Output = Result<(), IggyError>>;
+    ) -> impl Future<Output = Result<Self::Data, IggyError>>;
 }
 
 // TODO: explore generics for Strategy
@@ -112,25 +112,25 @@ impl MessageBus for IggyMessageBus {
     async fn send_to_client(
         &self,
         client_id: Self::Client,
-        _message: Self::Data,
-    ) -> Result<(), IggyError> {
+        message: Self::Data,
+    ) -> Result<Self::Data, IggyError> {
         #[allow(clippy::cast_possible_truncation)] // 
IggyError::ClientNotFound takes u32
         let _sender = self
             .clients
             .get(&client_id)
             .ok_or(IggyError::ClientNotFound(client_id as u32))?;
-        Ok(())
+        Ok(message)
     }
 
     async fn send_to_replica(
         &self,
         replica: Self::Replica,
-        _message: Self::Data,
-    ) -> Result<(), IggyError> {
+        message: Self::Data,
+    ) -> Result<Self::Data, IggyError> {
         // TODO: Handle lazily creating the connection.
         let _connection = self
             .get_replica_connection(replica)
             .ok_or(IggyError::ResourceNotFound(format!("Replica {replica}")))?;
-        Ok(())
+        Ok(message)
     }
 }
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index e0d024aaf..41ae29517 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -32,6 +32,7 @@ ahash = { workspace = true }
 bytes = { workspace = true }
 consensus = { workspace = true }
 iggy_common = { workspace = true }
+iobuf = { workspace = true }
 journal = { workspace = true }
 left-right = { workspace = true }
 message_bus = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 9751eb0c7..09be2e6e8 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -29,10 +29,18 @@ use iggy_common::{
     },
     message::Message,
 };
+use iobuf::TryMerge;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use tracing::{debug, warn};
 
+fn prepare_for_update(message: Message<PrepareHeader>) -> 
Message<PrepareHeader> {
+    let owned = unsafe { message.into_inner().try_merge() }
+        .expect("metadata state machine update expects a unique message 
buffer");
+    Message::try_from(owned.split_at(std::mem::size_of::<PrepareHeader>()))
+        .expect("normalized metadata prepare must remain valid")
+}
+
 #[derive(Debug, Clone)]
 #[allow(unused)]
 pub struct IggySnapshot {
@@ -228,6 +236,7 @@ where
                         )
                     });
 
+                let prepare = prepare_for_update(prepare);
                 let response = 
self.mux_stm.update(prepare).unwrap_or_else(|err| {
                     warn!(
                         "on_ack: state machine error for op={}: {err}",
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e25aaaf65..e6e37a800 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -251,7 +251,8 @@ macro_rules! collect_handlers {
                     match input.header().operation {
                         $(
                             Operation::$operation => {
-                                let body = input.body_bytes();
+                                let (_, tail) = input.into_inner();
+                                let body = 
::bytes::Bytes::copy_from_slice(&tail);
                                 let cmd = $operation::from_bytes(body)?;
                                 Ok(Either::Left([<$state 
Command>]::$operation(cmd)))
                             },
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index c355ea509..bca5a4d60 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -28,8 +28,11 @@ repository = "https://github.com/apache/iggy";
 readme = "../../README.md"
 
 [dependencies]
+bytemuck = { workspace = true }
 bytes = { workspace = true }
+compio = { workspace = true }
 consensus = { workspace = true }
+iobuf = { workspace = true }
 iggy_common = { workspace = true }
 journal = { workspace = true }
 message_bus = { workspace = true }
diff --git a/core/partitions/src/frozen_messages_writer.rs 
b/core/partitions/src/frozen_messages_writer.rs
new file mode 100644
index 000000000..7db6a86af
--- /dev/null
+++ b/core/partitions/src/frozen_messages_writer.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use compio::{
+    buf::IoBuf,
+    fs::{File, OpenOptions},
+    io::AsyncWriteAtExt,
+};
+use iggy_common::{IggyByteSize, IggyError};
+use iobuf::Frozen;
+use tracing::error;
+
+const MAX_IOV_COUNT: usize = 1024;
+
+#[derive(Debug, Clone)]
+pub struct FrozenMessagesWriter {
+    file_path: String,
+    fsync: bool,
+}
+
+impl FrozenMessagesWriter {
+    pub fn new(file_path: String, fsync: bool) -> Self {
+        Self { file_path, fsync }
+    }
+
+    pub async fn save<const ALIGN: usize>(
+        &self,
+        position: u64,
+        buffers: &[Frozen<ALIGN>],
+        strip_prefix: usize,
+    ) -> Result<IggyByteSize, IggyError> {
+        let messages_size = buffers.iter().try_fold(0u64, |size, buffer| {
+            let written = buffer
+                .len()
+                .checked_sub(strip_prefix)
+                .ok_or(IggyError::CannotWriteToFile)?;
+            Ok::<_, IggyError>(size + written as u64)
+        })?;
+
+        if messages_size == 0 {
+            return Ok(IggyByteSize::from(0));
+        }
+
+        let file = OpenOptions::new()
+            .create(true)
+            .write(true)
+            .open(&self.file_path)
+            .await
+            .map_err(|_| IggyError::CannotReadFile)?;
+
+        write_frozen_chunked(&file, &self.file_path, position, buffers, 
strip_prefix).await?;
+
+        if self.fsync {
+            file.sync_all()
+                .await
+                .map_err(|_| IggyError::CannotWriteToFile)?;
+        }
+
+        Ok(IggyByteSize::from(messages_size))
+    }
+}
+
+async fn write_frozen_chunked<const ALIGN: usize>(
+    file: &File,
+    file_path: &str,
+    mut position: u64,
+    buffers: &[Frozen<ALIGN>],
+    strip_prefix: usize,
+) -> Result<(), IggyError> {
+    for chunk in buffers.chunks(MAX_IOV_COUNT) {
+        let chunk_size = chunk.iter().try_fold(0usize, |size, buffer| {
+            let written = buffer
+                .len()
+                .checked_sub(strip_prefix)
+                .ok_or(IggyError::CannotWriteToFile)?;
+            Ok::<_, IggyError>(size + written)
+        })?;
+
+        let chunk_vec: Vec<_> = chunk
+            .iter()
+            .cloned()
+            .map(|buffer| buffer.slice(strip_prefix..))
+            .collect();
+
+        let (result, _) = (&*file)
+            .write_vectored_all_at(chunk_vec, position)
+            .await
+            .into();
+        result.map_err(|err| {
+            error!(
+                file = file_path,
+                %err,
+                "failed to write frozen messages to segment file"
+            );
+            IggyError::CannotWriteToFile
+        })?;
+
+        position += chunk_size as u64;
+    }
+
+    Ok(())
+}
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 5addf1516..241cff0d9 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -21,12 +21,11 @@ use crate::journal::{
 use crate::log::SegmentedLog;
 use crate::{
     AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer,
-    decode_send_messages_batch,
+    send_messages2::{IggyMessages2, stamp_prepare_for_persistence},
 };
 use iggy_common::{
     ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, 
ConsumerOffsets,
-    IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet, 
IggyTimestamp,
-    PartitionStats, PollingKind,
+    IggyByteSize, IggyError, IggyTimestamp, PartitionStats, PollingKind,
     header::{Operation, PrepareHeader},
     message::Message,
 };
@@ -55,36 +54,6 @@ pub struct IggyPartition {
 }
 
 impl IggyPartition {
-    fn prepare_message_from_batch(
-        mut header: PrepareHeader,
-        batch: &IggyMessagesBatchMut,
-    ) -> Message<PrepareHeader> {
-        let indexes = batch.indexes();
-        let count = batch.count();
-        let body_len = 4 + indexes.len() + batch.len();
-        let total_size = std::mem::size_of::<PrepareHeader>() + body_len;
-        header.size = u32::try_from(total_size)
-            .expect("prepare_message_from_batch: batch size exceeds u32::MAX");
-
-        let message = 
Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| {
-            *new = header;
-        });
-
-        let mut bytes = message
-            .into_inner()
-            .try_into_mut()
-            .expect("prepare_message_from_batch: expected unique bytes 
buffer");
-        let header_size = std::mem::size_of::<PrepareHeader>();
-        bytes[header_size..header_size + 
4].copy_from_slice(&count.to_le_bytes());
-        let mut position = header_size + 4;
-        bytes[position..position + indexes.len()].copy_from_slice(indexes);
-        position += indexes.len();
-        bytes[position..position + batch.len()].copy_from_slice(batch);
-
-        Message::<PrepareHeader>::from_bytes(bytes.freeze())
-            .expect("prepare_message_from_batch: invalid prepared message 
bytes")
-    }
-
     pub fn new(stats: Arc<PartitionStats>) -> Self {
         Self {
             log: SegmentedLog::default(),
@@ -111,29 +80,29 @@ impl Partition for IggyPartition {
             return Err(IggyError::CannotAppendMessage);
         }
 
-        let mut batch = decode_send_messages_batch(message.body_bytes())
-            .ok_or(IggyError::CannotAppendMessage)?;
-
-        if batch.count() == 0 {
-            return Ok(AppendResult::new(0, 0, 0));
-        }
-
         let dirty_offset = if self.should_increment_offset {
             self.dirty_offset.load(Ordering::Relaxed) + 1
         } else {
             0
         };
 
-        let segment = self.log.active_segment();
-        let segment_start_offset = segment.start_offset;
-        let current_position = segment.current_position;
-
-        batch
-            .prepare_for_persistence(segment_start_offset, dirty_offset, 
current_position, None)
-            .await;
+        let current_max_timestamp = self
+            .log
+            .journal()
+            .info
+            .end_timestamp
+            .max(self.log.active_segment().end_timestamp);
+        let batch_timestamp = 
IggyTimestamp::now().as_micros().max(current_max_timestamp);
+        let (message, batch) =
+            stamp_prepare_for_persistence(message, dirty_offset, 
batch_timestamp)
+                .map_err(|_| IggyError::CannotAppendMessage)?;
+
+        if batch.record_count == 0 {
+            return Ok(AppendResult::new(0, 0, 0));
+        }
 
-        let batch_messages_count = batch.count();
-        let batch_messages_size = batch.size();
+        let batch_messages_count = batch.record_count;
+        let batch_messages_size = batch.total_size() as u32;
 
         let last_dirty_offset = if batch_messages_count == 0 {
             dirty_offset
@@ -154,16 +123,10 @@ impl Partition for IggyPartition {
         journal.info.messages_count += batch_messages_count;
         journal.info.size += 
IggyByteSize::from(u64::from(batch_messages_size));
         journal.info.current_offset = last_dirty_offset;
-        if let Some(ts) = batch.first_timestamp()
-            && journal.info.first_timestamp == 0
-        {
-            journal.info.first_timestamp = ts;
+        if journal.info.first_timestamp == 0 {
+            journal.info.first_timestamp = batch.base_timestamp;
         }
-        if let Some(ts) = batch.last_timestamp() {
-            journal.info.end_timestamp = ts;
-        }
-
-        let message = Self::prepare_message_from_batch(header, &batch);
+        journal.info.end_timestamp = batch.base_timestamp;
         journal.inner.append(message).await;
 
         Ok(AppendResult::new(
@@ -177,9 +140,9 @@ impl Partition for IggyPartition {
         &self,
         consumer: PollingConsumer,
         args: PollingArgs,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessages2, IggyError> {
         if !self.should_increment_offset || args.count == 0 {
-            return Ok(IggyMessagesBatchSet::empty());
+            return Ok(IggyMessages2::empty());
         }
 
         let committed_offset = self.offset.load(Ordering::Relaxed);
@@ -198,15 +161,15 @@ impl Partition for IggyPartition {
                         count: args.count,
                     })
                     .await;
-                let batch_set = 
result.unwrap_or_else(IggyMessagesBatchSet::empty);
-                if let Some(first) = batch_set.first_offset() {
+                let messages = result.unwrap_or_else(IggyMessages2::empty);
+                if let Some(first) = messages.first_offset() {
                     if first > committed_offset {
-                        return Ok(IggyMessagesBatchSet::empty());
+                        return Ok(IggyMessages2::empty());
                     }
                     let max_count = u32::try_from(committed_offset - first + 
1).unwrap_or(u32::MAX);
-                    return Ok(batch_set.get_by_offset(first, 
batch_set.count().min(max_count)));
+                    return Ok(messages.limit(max_count));
                 }
-                return Ok(batch_set);
+                return Ok(messages);
             }
             PollingKind::Next => self
                 .get_consumer_offset(consumer)
@@ -214,7 +177,7 @@ impl Partition for IggyPartition {
         };
 
         if start_offset > committed_offset {
-            return Ok(IggyMessagesBatchSet::empty());
+            return Ok(IggyMessages2::empty());
         }
 
         let max_count = u32::try_from(committed_offset - start_offset + 
1).unwrap_or(u32::MAX);
@@ -230,10 +193,12 @@ impl Partition for IggyPartition {
             })
             .await;
 
-        let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty);
+        let messages = result.unwrap_or_else(IggyMessages2::empty);
 
-        if args.auto_commit && !batch_set.is_empty() {
-            let last_offset = start_offset + u64::from(batch_set.count()) - 1;
+        if args.auto_commit && !messages.is_empty() {
+            let last_offset = messages
+                .last_offset()
+                .expect("non-empty poll result must have a last offset");
             if let Err(err) = self.store_consumer_offset(consumer, 
last_offset) {
                 // warning for now.
                 warn!(
@@ -245,7 +210,7 @@ impl Partition for IggyPartition {
             }
         }
 
-        Ok(batch_set)
+        Ok(messages)
     }
 
     #[allow(clippy::cast_possible_truncation)]
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index d219c905e..bc2cceb84 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -20,7 +20,9 @@
 use crate::IggyPartition;
 use crate::Partition;
 use crate::PollingConsumer;
+use crate::frozen_messages_writer::FrozenMessagesWriter;
 use crate::log::JournalInfo;
+use crate::send_messages2::convert_request_message;
 use crate::types::PartitionsConfig;
 use consensus::PlaneIdentity;
 use consensus::{
@@ -38,6 +40,7 @@ use iggy_common::{
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
+use iobuf::{Frozen, TryMerge};
 use message_bus::MessageBus;
 use std::cell::UnsafeCell;
 use std::collections::{HashMap, HashSet};
@@ -350,6 +353,17 @@ where
             .expect("on_request: consensus not initialized");
 
         debug!(?namespace, "handling partition request");
+        let message = if message.header().operation == Operation::SendMessages 
{
+            match convert_request_message(namespace, message) {
+                Ok(message) => message,
+                Err(error) => {
+                    warn!(?namespace, %error, "on_request: failed to convert 
SendMessages");
+                    return;
+                }
+            }
+        } else {
+            message
+        };
         let prepare = message.project(consensus);
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
@@ -563,8 +577,10 @@ where
                 );
             }
             Operation::StoreConsumerOffset => {
-                let body = message.body_bytes();
-                let body = body.as_ref();
+                let total_size = header.size() as usize;
+                let owned = unsafe { message.into_inner().try_merge() }
+                    .expect("consumer offset prepare expects a unique message 
buffer");
+                let body = 
&owned.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size];
                 let consumer_kind = body[0];
                 let consumer_id = 
u32::from_le_bytes(body[1..5].try_into().unwrap()) as usize;
                 let offset = 
u64::from_le_bytes(body[5..13].try_into().unwrap());
@@ -689,22 +705,53 @@ where
 
         if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
             // Freeze journal batches.
-            let frozen_batches = {
-                let batches = partition.log.journal_mut().inner.commit();
+            let (frozen_batches, batch_count) = {
+                let entries = partition.log.journal_mut().inner.commit();
+                let segment = partition.log.active_segment();
+                let segment_start_offset = segment.start_offset;
+                let segment_base_timestamp = if segment.start_timestamp == 0 {
+                    journal_info.first_timestamp
+                } else {
+                    segment.start_timestamp
+                };
+                let mut file_position = segment.size.as_bytes_u64() as u32;
                 partition.log.ensure_indexes();
-                
batches.append_indexes_to(partition.log.active_indexes_mut().unwrap());
-
-                let frozen: Vec<_> = batches
-                    .into_inner()
-                    .into_iter()
-                    .map(|mut b| b.freeze())
-                    .collect();
-                partition.log.set_in_flight(frozen.clone());
-                frozen
+                let indexes = partition.log.active_indexes_mut().unwrap();
+                let mut frozen = Vec::with_capacity(entries.len());
+                let mut batch_count = 0u32;
+
+                for entry in entries {
+                    let owned = unsafe { entry.into_inner().try_merge() }
+                        .expect("journal commit expects mergeable prepare 
buffers");
+                    let Ok(batch) = 
crate::send_messages2::decode_prepare_slice(owned.as_slice())
+                    else {
+                        continue;
+                    };
+                    if batch.header.record_count == 0 {
+                        continue;
+                    }
+
+                    let relative_offset =
+                        u32::try_from(batch.header.base_offset - 
segment_start_offset)
+                            .expect("relative batch offset exceeds u32::MAX");
+                    let relative_timestamp =
+                        u32::try_from(batch.header.base_timestamp - 
segment_base_timestamp)
+                            .expect("relative batch timestamp exceeds 
u32::MAX");
+                    indexes.insert(
+                        relative_offset,
+                        file_position,
+                        u64::from(relative_timestamp),
+                    );
+                    file_position += batch.header.total_size() as u32;
+                    batch_count += batch.header.record_count;
+                    frozen.push(owned.into());
+                }
+
+                (frozen, batch_count)
             };
 
             // Persist to disk.
-            self.persist_frozen_batches_to_disk(namespace, frozen_batches)
+            self.persist_frozen_batches_to_disk(namespace, frozen_batches, 
batch_count)
                 .await;
 
             if is_full {
@@ -731,13 +778,9 @@ where
     async fn persist_frozen_batches_to_disk(
         &self,
         namespace: &IggyNamespace,
-        frozen_batches: Vec<iggy_common::IggyMessagesBatch>,
+        frozen_batches: Vec<Frozen<4096>>,
+        batch_count: u32,
     ) {
-        let batch_count: u32 = frozen_batches
-            .iter()
-            .map(iggy_common::IggyMessagesBatch::count)
-            .sum();
-
         if batch_count == 0 {
             return;
         }
@@ -750,13 +793,13 @@ where
             return;
         }
 
-        let messages_writer = partition
+        let messages_path = partition
             .log
             .active_storage()
             .messages_writer
             .as_ref()
             .expect("Messages writer not initialized")
-            .clone();
+            .path();
         let index_writer = partition
             .log
             .active_storage()
@@ -764,10 +807,15 @@ where
             .as_ref()
             .expect("Index writer not initialized")
             .clone();
+        let position = partition.log.active_segment().size.as_bytes_u64();
+        let messages_writer = FrozenMessagesWriter::new(messages_path, 
self.config.enforce_fsync);
 
         let saved = messages_writer
-            .as_ref()
-            .save_frozen_batches(&frozen_batches)
+            .save(
+                position,
+                &frozen_batches,
+                std::mem::size_of::<PrepareHeader>(),
+            )
             .await
             .expect("persist: failed to save frozen batches");
 
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index d3b779fee..fa9450ee8 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -15,19 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
 use iggy_common::{
-    IggyMessagesBatchMut, IggyMessagesBatchSet,
     header::{Operation, PrepareHeader},
     message::Message,
 };
+use iobuf::{Frozen, Owned, TryMerge};
 use journal::{Journal, Storage};
 use std::{
     cell::UnsafeCell,
     collections::{BTreeMap, HashMap},
 };
 
+use crate::send_messages2::{IggyMessages2, PREPARE_SPLIT_POINT, 
decode_prepare_slice};
+
 const ZERO_LEN: usize = 0;
+type JournalBuffer = Frozen<4096>;
+
+fn message_from_frozen(
+    inner: JournalBuffer,
+) -> Result<Message<PrepareHeader>, iggy_common::header::ConsensusError> {
+    let split_at = PREPARE_SPLIT_POINT.min(inner.len());
+    Message::try_from(inner.split_at(split_at))
+}
 
 /// Lookup key for querying messages from the journal.
 #[derive(Debug, Clone, Copy)]
@@ -53,12 +62,12 @@ where
 {
     type Query;
 
-    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
+    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessages2>>;
 }
 
 #[derive(Debug, Default)]
 pub struct PartitionJournalMemStorage {
-    entries: UnsafeCell<Vec<Bytes>>,
+    entries: UnsafeCell<Vec<JournalBuffer>>,
     /// Maps byte offset (as if disk-backed) to index in entries Vec
     offset_to_index: UnsafeCell<HashMap<usize, usize>>,
     /// Current write position (cumulative byte offset)
@@ -66,7 +75,7 @@ pub struct PartitionJournalMemStorage {
 }
 
 impl Storage for PartitionJournalMemStorage {
-    type Buffer = Bytes;
+    type Buffer = JournalBuffer;
 
     async fn write(&self, buf: Self::Buffer) -> usize {
         let len = buf.len();
@@ -87,17 +96,20 @@ impl Storage for PartitionJournalMemStorage {
     async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
         let offset_to_index = unsafe { &*self.offset_to_index.get() };
         let Some(&index) = offset_to_index.get(&offset) else {
-            return Bytes::new();
+            return Owned::<4096>::zeroed(0).into();
         };
 
         let entries = unsafe { &*self.entries.get() };
-        entries.get(index).cloned().unwrap_or_default()
+        entries
+            .get(index)
+            .cloned()
+            .unwrap_or_else(|| Owned::<4096>::zeroed(0).into())
     }
 }
 
 pub struct PartitionJournal<S>
 where
-    S: Storage<Buffer = Bytes>,
+    S: Storage<Buffer = JournalBuffer>,
 {
     /// Maps op -> storage byte offset (for all entries)
     op_to_storage_offset: UnsafeCell<BTreeMap<u64, usize>>,
@@ -111,7 +123,7 @@ where
 
 impl<S> Default for PartitionJournal<S>
 where
-    S: Storage<Buffer = Bytes> + Default,
+    S: Storage<Buffer = JournalBuffer> + Default,
 {
     fn default() -> Self {
         Self {
@@ -128,7 +140,7 @@ where
 
 impl<S> std::fmt::Debug for PartitionJournal<S>
 where
-    S: Storage<Buffer = Bytes>,
+    S: Storage<Buffer = JournalBuffer>,
 {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("PartitionJournal2Impl").finish()
@@ -137,13 +149,13 @@ where
 
 struct JournalInner<S>
 where
-    S: Storage<Buffer = Bytes>,
+    S: Storage<Buffer = JournalBuffer>,
 {
     storage: S,
 }
 
 impl PartitionJournalMemStorage {
-    fn drain(&self) -> Vec<Bytes> {
+    fn drain(&self) -> Vec<JournalBuffer> {
         let entries = unsafe { &mut *self.entries.get() };
         let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
         let current_offset = unsafe { &mut *self.current_offset.get() };
@@ -162,19 +174,12 @@ impl PartitionJournalMemStorage {
 
 impl PartitionJournal<PartitionJournalMemStorage> {
     /// Drain all accumulated batches, matching the legacy `PartitionJournal` 
API.
-    pub fn commit(&self) -> IggyMessagesBatchSet {
+    pub fn commit(&self) -> Vec<Message<PrepareHeader>> {
         let entries = {
             let inner = unsafe { &*self.inner.get() };
             inner.storage.drain()
         };
 
-        let mut messages = Vec::with_capacity(entries.len());
-        for bytes in entries {
-            if let Ok(message) = Message::from_bytes(bytes) {
-                messages.push(message);
-            }
-        }
-
         let headers = unsafe { &mut *self.headers.get() };
         headers.clear();
         let op_to_storage_offset = unsafe { &mut 
*self.op_to_storage_offset.get() };
@@ -184,7 +189,13 @@ impl PartitionJournal<PartitionJournalMemStorage> {
         let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
         timestamp_to_op.clear();
 
-        Self::messages_to_batch_set(&messages)
+        entries
+            .into_iter()
+            .map(|entry| {
+                message_from_frozen(entry)
+                    .expect("partition journal storage must contain valid 
prepare batches")
+            })
+            .collect()
     }
 
     pub fn is_empty(&self) -> bool {
@@ -195,28 +206,8 @@ impl PartitionJournal<PartitionJournalMemStorage> {
 
 impl<S> PartitionJournal<S>
 where
-    S: Storage<Buffer = Bytes>,
+    S: Storage<Buffer = JournalBuffer>,
 {
-    fn message_to_batch(message: &Message<PrepareHeader>) -> 
Option<IggyMessagesBatchMut> {
-        if message.header().operation != Operation::SendMessages {
-            return None;
-        }
-
-        crate::decode_send_messages_batch(message.body_bytes())
-    }
-
-    fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) -> 
IggyMessagesBatchSet {
-        let mut batch_set = IggyMessagesBatchSet::empty();
-
-        for message in messages {
-            if let Some(batch) = Self::message_to_batch(message) {
-                batch_set.add_batch(batch);
-            }
-        }
-
-        batch_set
-    }
-
     #[allow(dead_code)]
     fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
         match query {
@@ -255,8 +246,8 @@ where
         }
 
         Some(
-            Message::from_bytes(bytes)
-                .expect("partition.journal.storage.read: invalid bytes for 
message"),
+            message_from_frozen(bytes)
+                .expect("partition journal storage must contain a valid 
prepare batch"),
         )
     }
 
@@ -264,10 +255,14 @@ where
     async fn load_messages_from_storage(
         &self,
         start_op: u64,
-        count: u32,
-    ) -> Vec<Message<PrepareHeader>> {
+        query: MessageLookup,
+    ) -> IggyMessages2 {
+        let count = match query {
+            MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { 
count, .. } => count,
+        };
+
         if count == 0 {
-            return Vec::new();
+            return IggyMessages2::empty();
         }
 
         // Get (op, storage_offset) pairs directly from the mapping
@@ -280,11 +275,10 @@ where
                 .collect()
         };
 
-        let mut messages = Vec::new();
-        let mut loaded_messages = 0u32;
+        let mut messages = IggyMessages2::with_capacity(count as usize);
 
         for (_, storage_offset) in op_offsets {
-            if loaded_messages >= count {
+            if messages.count() >= count {
                 break;
             }
 
@@ -297,12 +291,36 @@ where
                 continue;
             }
 
-            let message = Message::from_bytes(bytes)
-                .expect("partition.journal.storage.read: invalid bytes for 
message");
+            let header_bytes = &bytes[..std::mem::size_of::<PrepareHeader>()];
+            let header = 
*bytemuck::checked::try_from_bytes::<PrepareHeader>(header_bytes)
+                .expect("partition journal storage must contain a valid 
prepare header");
+            if header.operation != Operation::SendMessages {
+                continue;
+            }
+            let Ok(batch) = decode_prepare_slice(bytes.as_slice()) else {
+                continue;
+            };
 
-            if let Some(batch) = Self::message_to_batch(&message) {
-                loaded_messages = 
loaded_messages.saturating_add(batch.count());
-                messages.push(message);
+            for record in batch.iter() {
+                if messages.count() >= count {
+                    break;
+                }
+
+                let owned = record.to_owned(&batch.header);
+                match query {
+                    MessageLookup::Offset { offset, .. } => {
+                        if owned.header.offset < offset {
+                            continue;
+                        }
+                    }
+                    MessageLookup::Timestamp { timestamp, .. } => {
+                        if owned.header.timestamp < timestamp {
+                            continue;
+                        }
+                    }
+                }
+
+                messages.push(owned);
             }
         }
 
@@ -312,10 +330,10 @@ where
 
 impl<S> Journal<S> for PartitionJournal<S>
 where
-    S: Storage<Buffer = Bytes>,
+    S: Storage<Buffer = JournalBuffer>,
 {
     type Header = PrepareHeader;
-    type Entry = Message<Self::Header>;
+    type Entry = Message<PrepareHeader>;
     #[rustfmt::skip] // Scuffed formatter.
     type HeaderRef<'a> = &'a Self::Header where S: 'a;
 
@@ -335,18 +353,27 @@ where
     }
 
     async fn append(&self, entry: Self::Entry) {
-        let first_offset_and_timestamp = Self::message_to_batch(&entry)
-            .and_then(|batch| Some((batch.first_offset()?, 
batch.first_timestamp()?)));
-
         let header = *entry.header();
         let op = header.op;
+        let owned = unsafe { entry.into_inner().try_merge() }
+            .expect("partition journal append expects a unique message 
buffer");
+        let first_offset_and_timestamp = if header.operation == 
Operation::SendMessages {
+            decode_prepare_slice(owned.as_slice())
+                .ok()
+                .and_then(|batch| {
+                    (batch.header.record_count != 0)
+                        .then_some((batch.header.base_offset, 
batch.header.base_timestamp))
+                })
+        } else {
+            None
+        };
 
         {
             let headers = unsafe { &mut *self.headers.get() };
             headers.push(header);
         };
 
-        let bytes = entry.into_inner();
+        let bytes: JournalBuffer = owned.into();
         let storage_offset = {
             let inner = unsafe { &*self.inner.get() };
             inner.storage.write(bytes).await
@@ -373,26 +400,14 @@ where
 
 impl<S> QueryableJournal<S> for PartitionJournal<S>
 where
-    S: Storage<Buffer = Bytes>,
+    S: Storage<Buffer = JournalBuffer>,
 {
     type Query = MessageLookup;
 
-    async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> {
+    async fn get(&self, query: &Self::Query) -> Option<IggyMessages2> {
         let query = *query;
         let start_op = self.candidate_start_op(&query)?;
-        let count = match query {
-            MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { 
count, .. } => count,
-        };
-
-        let messages = self.load_messages_from_storage(start_op, count).await;
-
-        let batch_set = Self::messages_to_batch_set(&messages);
-        let result = match query {
-            MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(offset, count),
-            MessageLookup::Timestamp { timestamp, count } => {
-                batch_set.get_by_timestamp(timestamp, count)
-            }
-        };
+        let result = self.load_messages_from_storage(start_op, query).await;
 
         if result.is_empty() {
             None
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 1f18e32ed..4747d0ed5 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -17,51 +17,23 @@
 
 #![allow(clippy::future_not_send)]
 
+mod frozen_messages_writer;
 mod iggy_partition;
 mod iggy_partitions;
 mod journal;
 mod log;
+mod send_messages2;
 mod types;
 
-use bytes::{Bytes, BytesMut};
-use iggy_common::{
-    INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet,
-    PooledBuffer, header::PrepareHeader, message::Message,
-};
+use iggy_common::{IggyError, header::PrepareHeader, message::Message};
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
+pub use send_messages2::{IggyMessage2, IggyMessage2Header, IggyMessages2};
 pub use types::{
     AppendResult, PartitionOffsets, PartitionsConfig, PollingArgs, 
PollingConsumer,
     SendMessagesResult,
 };
 
-pub(crate) fn decode_send_messages_batch(body: Bytes) -> 
Option<IggyMessagesBatchMut> {
-    // TODO: This very is bad, IGGY-114 Fixes this.
-    let mut body = body
-        .try_into_mut()
-        .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
-
-    if body.len() < 4 {
-        return None;
-    }
-
-    let count_bytes = body.split_to(4);
-    let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?);
-    let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?;
-
-    if body.len() < indexes_len {
-        return None;
-    }
-
-    let indexes_bytes = body.split_to(indexes_len);
-    let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
-    let messages = PooledBuffer::from(body);
-
-    Some(IggyMessagesBatchMut::from_indexes_and_messages(
-        indexes, messages,
-    ))
-}
-
 /// Partition-level data plane operations.
 ///
 /// `send_messages` MUST only append to the partition journal (prepare phase),
@@ -76,7 +48,7 @@ pub trait Partition {
         &self,
         consumer: PollingConsumer,
         args: PollingArgs,
-    ) -> impl Future<Output = Result<IggyMessagesBatchSet, IggyError>> {
+    ) -> impl Future<Output = Result<IggyMessages2, IggyError>> {
         let _ = (consumer, args);
         async { Err(IggyError::FeatureUnavailable) }
     }
diff --git a/core/partitions/src/send_messages2.rs 
b/core/partitions/src/send_messages2.rs
new file mode 100644
index 000000000..4e31c33d2
--- /dev/null
+++ b/core/partitions/src/send_messages2.rs
@@ -0,0 +1,552 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_common::{
+    INDEX_SIZE, IggyError, calculate_checksum,
+    header::{PrepareHeader, RequestHeader},
+    message::Message,
+    random_id,
+    sharding::IggyNamespace,
+};
+use iobuf::{Owned, TryMerge};
+
+const MESSAGE_ALIGN: usize = 4096;
+pub const COMMAND_HEADER_SIZE: usize = 256;
+pub const PREPARE_SPLIT_POINT: usize = 512;
+const MESSAGE_HEADER_SIZE: usize = 48;
+const LEGACY_MESSAGE_HEADER_SIZE: usize = 64;
+const SEND_MESSAGES2_MAGIC: u32 = u32::from_le_bytes(*b"SMG2");
+const SEND_MESSAGES2_VERSION: u16 = 1;
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SendMessages2Header {
+    pub magic: u32,
+    pub version: u16,
+    pub flags: u16,
+    pub partition_id: u64,
+    pub base_offset: u64,
+    pub base_timestamp: u64,
+    pub origin_timestamp: u64,
+    pub record_count: u32,
+    pub body_len: u32,
+}
+
+impl SendMessages2Header {
+    pub fn new(partition_id: u64, origin_timestamp: u64, record_count: u32, 
body_len: u32) -> Self {
+        Self {
+            magic: SEND_MESSAGES2_MAGIC,
+            version: SEND_MESSAGES2_VERSION,
+            flags: 0,
+            partition_id,
+            base_offset: 0,
+            base_timestamp: 0,
+            origin_timestamp,
+            record_count,
+            body_len,
+        }
+    }
+
+    pub fn decode(bytes: &[u8]) -> Result<Self, IggyError> {
+        if bytes.len() < COMMAND_HEADER_SIZE {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let magic = read_u32(bytes, 0)?;
+        let version = read_u16(bytes, 4)?;
+        if magic != SEND_MESSAGES2_MAGIC || version != SEND_MESSAGES2_VERSION {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        Ok(Self {
+            magic,
+            version,
+            flags: read_u16(bytes, 6)?,
+            partition_id: read_u64(bytes, 8)?,
+            base_offset: read_u64(bytes, 16)?,
+            base_timestamp: read_u64(bytes, 24)?,
+            origin_timestamp: read_u64(bytes, 32)?,
+            record_count: read_u32(bytes, 40)?,
+            body_len: read_u32(bytes, 44)?,
+        })
+    }
+
+    pub fn encode_into(&self, bytes: &mut [u8]) {
+        assert!(bytes.len() >= COMMAND_HEADER_SIZE);
+        bytes[..COMMAND_HEADER_SIZE].fill(0);
+        bytes[0..4].copy_from_slice(&self.magic.to_le_bytes());
+        bytes[4..6].copy_from_slice(&self.version.to_le_bytes());
+        bytes[6..8].copy_from_slice(&self.flags.to_le_bytes());
+        bytes[8..16].copy_from_slice(&self.partition_id.to_le_bytes());
+        bytes[16..24].copy_from_slice(&self.base_offset.to_le_bytes());
+        bytes[24..32].copy_from_slice(&self.base_timestamp.to_le_bytes());
+        bytes[32..40].copy_from_slice(&self.origin_timestamp.to_le_bytes());
+        bytes[40..44].copy_from_slice(&self.record_count.to_le_bytes());
+        bytes[44..48].copy_from_slice(&self.body_len.to_le_bytes());
+    }
+
+    pub fn total_size(&self) -> usize {
+        COMMAND_HEADER_SIZE + self.body_len as usize
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct SendMessages2Owned {
+    pub header: SendMessages2Header,
+    pub blob: Bytes,
+}
+
+impl SendMessages2Owned {
+    pub fn from_legacy_request(namespace: IggyNamespace, body: &[u8]) -> 
Result<Self, IggyError> {
+        let (message_count, messages) = legacy_messages_slice(body)?;
+        let mut parsed = Vec::with_capacity(message_count as usize);
+        let mut origin_timestamp = u64::MAX;
+        let mut cursor = 0usize;
+
+        while cursor < messages.len() && parsed.len() < message_count as usize 
{
+            let legacy = LegacyMessageRef::decode(&messages[cursor..])?;
+            origin_timestamp = origin_timestamp.min(legacy.origin_timestamp);
+            cursor += legacy.total_size;
+            parsed.push(legacy);
+        }
+
+        if parsed.len() != message_count as usize || cursor != messages.len() {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        if origin_timestamp == u64::MAX {
+            origin_timestamp = 0;
+        }
+
+        let mut blob = BytesMut::with_capacity(messages.len());
+        for (index, legacy) in parsed.iter().enumerate() {
+            let id = if legacy.id == 0 {
+                random_id::get_uuid()
+            } else {
+                legacy.id
+            };
+            let timestamp_delta = legacy
+                .origin_timestamp
+                .checked_sub(origin_timestamp)
+                .and_then(|delta| u32::try_from(delta).ok())
+                .ok_or(IggyError::InvalidCommand)?;
+
+            let mut header = [0u8; MESSAGE_HEADER_SIZE];
+            header[8..24].copy_from_slice(&id.to_le_bytes());
+            header[24..28].copy_from_slice(&(index as u32).to_le_bytes());
+            header[28..32].copy_from_slice(&timestamp_delta.to_le_bytes());
+            header[32..36].copy_from_slice(&(legacy.user_headers.len() as 
u32).to_le_bytes());
+            header[36..40].copy_from_slice(&(legacy.payload.len() as 
u32).to_le_bytes());
+
+            let checksum =
+                calculate_checksum_parts(&header[8..], legacy.user_headers, 
legacy.payload);
+            header[0..8].copy_from_slice(&checksum.to_le_bytes());
+
+            blob.extend_from_slice(&header);
+            blob.extend_from_slice(legacy.user_headers);
+            blob.extend_from_slice(legacy.payload);
+        }
+
+        let blob = blob.freeze();
+        let header = SendMessages2Header::new(
+            namespace.partition_id() as u64,
+            origin_timestamp,
+            message_count,
+            blob.len() as u32,
+        );
+
+        Ok(Self { header, blob })
+    }
+
+    pub fn encode_request(
+        self,
+        request_header: RequestHeader,
+    ) -> Result<Message<RequestHeader>, IggyError> {
+        let total_size = std::mem::size_of::<RequestHeader>() + 
self.header.total_size();
+        let mut buffer = Owned::<MESSAGE_ALIGN>::zeroed(total_size);
+        let bytes = buffer.as_mut_slice();
+        bytes[0..std::mem::size_of::<RequestHeader>()]
+            .copy_from_slice(bytemuck::bytes_of(&request_header));
+        self.header.encode_into(
+            &mut bytes[std::mem::size_of::<RequestHeader>()
+                ..std::mem::size_of::<RequestHeader>() + COMMAND_HEADER_SIZE],
+        );
+        bytes[PREPARE_SPLIT_POINT..PREPARE_SPLIT_POINT + self.blob.len()]
+            .copy_from_slice(&self.blob);
+
+        Message::try_from(buffer.split_at(PREPARE_SPLIT_POINT))
+            .map_err(|_| IggyError::InvalidCommand)
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub struct IggyMessage2Header {
+    pub checksum: u64,
+    pub id: u128,
+    pub offset: u64,
+    pub timestamp: u64,
+    pub origin_timestamp: u64,
+    pub user_headers_length: u32,
+    pub payload_length: u32,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct IggyMessage2 {
+    pub header: IggyMessage2Header,
+    pub payload: Bytes,
+    pub user_headers: Option<Bytes>,
+}
+
+#[derive(Debug, Clone, Default, PartialEq, Eq)]
+pub struct IggyMessages2 {
+    messages: Vec<IggyMessage2>,
+}
+
+impl IggyMessages2 {
+    pub fn empty() -> Self {
+        Self::default()
+    }
+
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            messages: Vec::with_capacity(capacity),
+        }
+    }
+
+    pub fn push(&mut self, message: IggyMessage2) {
+        self.messages.push(message);
+    }
+
+    pub fn count(&self) -> u32 {
+        self.messages.len() as u32
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.messages.is_empty()
+    }
+
+    pub fn first_offset(&self) -> Option<u64> {
+        self.messages.first().map(|message| message.header.offset)
+    }
+
+    pub fn last_offset(&self) -> Option<u64> {
+        self.messages.last().map(|message| message.header.offset)
+    }
+
+    pub fn limit(self, count: u32) -> Self {
+        let mut messages = self.messages;
+        messages.truncate(count as usize);
+        Self { messages }
+    }
+
+    pub fn iter(&self) -> std::slice::Iter<'_, IggyMessage2> {
+        self.messages.iter()
+    }
+}
+
+impl IntoIterator for IggyMessages2 {
+    type Item = IggyMessage2;
+    type IntoIter = std::vec::IntoIter<IggyMessage2>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.messages.into_iter()
+    }
+}
+
+impl<'a> IntoIterator for &'a IggyMessages2 {
+    type Item = &'a IggyMessage2;
+    type IntoIter = std::slice::Iter<'a, IggyMessage2>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.messages.iter()
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct SendMessages2Ref<'a> {
+    pub header: SendMessages2Header,
+    blob: &'a [u8],
+}
+
+impl<'a> SendMessages2Ref<'a> {
+    pub fn iter(&self) -> SendMessages2Iterator<'a> {
+        SendMessages2Iterator {
+            blob: self.blob,
+            position: 0,
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct SendMessages2MessageHeader {
+    pub checksum: u64,
+    pub id: u128,
+    pub offset_delta: u32,
+    pub timestamp_delta: u32,
+    pub user_headers_length: u32,
+    pub payload_length: u32,
+}
+
+impl SendMessages2MessageHeader {
+    fn decode(bytes: &[u8]) -> Result<Self, IggyError> {
+        if bytes.len() < MESSAGE_HEADER_SIZE {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let reserved = read_u64(bytes, 40)?;
+        if reserved != 0 {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        Ok(Self {
+            checksum: read_u64(bytes, 0)?,
+            id: read_u128(bytes, 8)?,
+            offset_delta: read_u32(bytes, 24)?,
+            timestamp_delta: read_u32(bytes, 28)?,
+            user_headers_length: read_u32(bytes, 32)?,
+            payload_length: read_u32(bytes, 36)?,
+        })
+    }
+
+    fn total_size(&self) -> usize {
+        MESSAGE_HEADER_SIZE + self.user_headers_length as usize + 
self.payload_length as usize
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct SendMessages2MessageView<'a> {
+    pub header: SendMessages2MessageHeader,
+    pub user_headers: &'a [u8],
+    pub payload: &'a [u8],
+}
+
+impl SendMessages2MessageView<'_> {
+    pub fn to_owned(&self, batch: &SendMessages2Header) -> IggyMessage2 {
+        IggyMessage2 {
+            header: IggyMessage2Header {
+                checksum: self.header.checksum,
+                id: self.header.id,
+                offset: batch.base_offset + 
u64::from(self.header.offset_delta),
+                timestamp: batch.base_timestamp,
+                origin_timestamp: batch.origin_timestamp + 
u64::from(self.header.timestamp_delta),
+                user_headers_length: self.header.user_headers_length,
+                payload_length: self.header.payload_length,
+            },
+            payload: Bytes::copy_from_slice(self.payload),
+            user_headers: (!self.user_headers.is_empty())
+                .then(|| Bytes::copy_from_slice(self.user_headers)),
+        }
+    }
+}
+
+pub struct SendMessages2Iterator<'a> {
+    blob: &'a [u8],
+    position: usize,
+}
+
+impl<'a> Iterator for SendMessages2Iterator<'a> {
+    type Item = SendMessages2MessageView<'a>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.position >= self.blob.len() {
+            return None;
+        }
+
+        let header = 
SendMessages2MessageHeader::decode(&self.blob[self.position..]).ok()?;
+        let start = self.position + MESSAGE_HEADER_SIZE;
+        let headers_end = start + header.user_headers_length as usize;
+        let payload_end = headers_end + header.payload_length as usize;
+        let user_headers = self.blob.get(start..headers_end)?;
+        let payload = self.blob.get(headers_end..payload_end)?;
+        self.position += header.total_size();
+        Some(SendMessages2MessageView {
+            header,
+            user_headers,
+            payload,
+        })
+    }
+}
+
+pub fn convert_request_message(
+    namespace: IggyNamespace,
+    message: Message<RequestHeader>,
+) -> Result<Message<RequestHeader>, IggyError> {
+    let request_header = *message.header();
+    let total_size = request_header.size as usize;
+    let owned = unsafe { message.into_inner().try_merge() }
+        .expect("request conversion expects a unique message buffer");
+    let body = 
&owned.as_slice()[std::mem::size_of::<RequestHeader>()..total_size];
+    SendMessages2Owned::from_legacy_request(namespace, 
body)?.encode_request(request_header)
+}
+
+pub fn decode_prepare_slice(bytes: &[u8]) -> Result<SendMessages2Ref<'_>, 
IggyError> {
+    let header_size = std::mem::size_of::<PrepareHeader>();
+    if bytes.len() < header_size {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let prepare = 
bytemuck::checked::try_from_bytes::<PrepareHeader>(&bytes[..header_size])
+        .map_err(|_| IggyError::InvalidCommand)?;
+    let total_size = prepare.size as usize;
+    if bytes.len() < total_size {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let body = &bytes[header_size..total_size];
+    if body.len() < COMMAND_HEADER_SIZE {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let header = SendMessages2Header::decode(&body[..COMMAND_HEADER_SIZE])?;
+    let blob = &body[COMMAND_HEADER_SIZE..];
+    if blob.len() < header.body_len as usize {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    Ok(SendMessages2Ref {
+        header,
+        blob: &blob[..header.body_len as usize],
+    })
+}
+
+pub fn stamp_prepare_for_persistence(
+    message: Message<PrepareHeader>,
+    base_offset: u64,
+    base_timestamp: u64,
+) -> Result<(Message<PrepareHeader>, SendMessages2Header), IggyError> {
+    let (mut prefix, tail) = message.into_inner();
+    if prefix.len() < PREPARE_SPLIT_POINT {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let header_offset = std::mem::size_of::<PrepareHeader>();
+    let header_bytes = &mut prefix[header_offset..header_offset + 
COMMAND_HEADER_SIZE];
+    let mut command = SendMessages2Header::decode(header_bytes)?;
+    command.base_offset = base_offset;
+    command.base_timestamp = base_timestamp;
+    command.encode_into(header_bytes);
+
+    let message = Message::from_inner((prefix, tail)).map_err(|_| 
IggyError::InvalidCommand)?;
+    Ok((message, command))
+}
+
+fn legacy_messages_slice(body: &[u8]) -> Result<(u32, &[u8]), IggyError> {
+    if body.len() < 4 {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let metadata_length = read_u32(body, 0)? as usize;
+    let metadata_end = 4usize
+        .checked_add(metadata_length)
+        .ok_or(IggyError::InvalidCommand)?;
+    if metadata_end < 4 || body.len() < metadata_end {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let message_count = read_u32(body, metadata_end - 4)?;
+    let indexes_len = usize::try_from(message_count)
+        .ok()
+        .and_then(|count| count.checked_mul(INDEX_SIZE))
+        .ok_or(IggyError::InvalidCommand)?;
+    let messages_start = metadata_end
+        .checked_add(indexes_len)
+        .ok_or(IggyError::InvalidCommand)?;
+    if body.len() < messages_start {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    Ok((message_count, &body[messages_start..]))
+}
+
+#[derive(Clone, Copy)]
+struct LegacyMessageRef<'a> {
+    id: u128,
+    origin_timestamp: u64,
+    user_headers: &'a [u8],
+    payload: &'a [u8],
+    total_size: usize,
+}
+
+impl<'a> LegacyMessageRef<'a> {
+    fn decode(bytes: &'a [u8]) -> Result<Self, IggyError> {
+        if bytes.len() < LEGACY_MESSAGE_HEADER_SIZE {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let user_headers_length = read_u32(bytes, 48)? as usize;
+        let payload_length = read_u32(bytes, 52)? as usize;
+        let total_size = LEGACY_MESSAGE_HEADER_SIZE
+            .checked_add(payload_length)
+            .and_then(|size| size.checked_add(user_headers_length))
+            .ok_or(IggyError::InvalidCommand)?;
+        if bytes.len() < total_size {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let payload_start = LEGACY_MESSAGE_HEADER_SIZE;
+        let payload_end = payload_start + payload_length;
+        let headers_end = payload_end + user_headers_length;
+
+        Ok(Self {
+            id: read_u128(bytes, 8)?,
+            origin_timestamp: read_u64(bytes, 40)?,
+            user_headers: &bytes[payload_end..headers_end],
+            payload: &bytes[payload_start..payload_end],
+            total_size,
+        })
+    }
+}
+
+fn calculate_checksum_parts(header_tail: &[u8], user_headers: &[u8], payload: 
&[u8]) -> u64 {
+    let mut bytes = BytesMut::with_capacity(header_tail.len() + 
user_headers.len() + payload.len());
+    bytes.put_slice(header_tail);
+    bytes.put_slice(user_headers);
+    bytes.put_slice(payload);
+    calculate_checksum(&bytes.freeze())
+}
+
+fn read_u16(bytes: &[u8], offset: usize) -> Result<u16, IggyError> {
+    bytes
+        .get(offset..offset + 2)
+        .and_then(|slice| slice.try_into().ok())
+        .map(u16::from_le_bytes)
+        .ok_or(IggyError::InvalidNumberEncoding)
+}
+
+fn read_u32(bytes: &[u8], offset: usize) -> Result<u32, IggyError> {
+    bytes
+        .get(offset..offset + 4)
+        .and_then(|slice| slice.try_into().ok())
+        .map(u32::from_le_bytes)
+        .ok_or(IggyError::InvalidNumberEncoding)
+}
+
+fn read_u64(bytes: &[u8], offset: usize) -> Result<u64, IggyError> {
+    bytes
+        .get(offset..offset + 8)
+        .and_then(|slice| slice.try_into().ok())
+        .map(u64::from_le_bytes)
+        .ok_or(IggyError::InvalidNumberEncoding)
+}
+
+fn read_u128(bytes: &[u8], offset: usize) -> Result<u128, IggyError> {
+    bytes
+        .get(offset..offset + 16)
+        .and_then(|slice| slice.try_into().ok())
+        .map(u128::from_le_bytes)
+        .ok_or(IggyError::InvalidNumberEncoding)
+}
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 559f5fae9..5e4d36d42 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -27,6 +27,7 @@ consensus = { path = "../consensus" }
 enumset = { workspace = true }
 futures = { workspace = true }
 iggy_common = { path = "../common" }
+iobuf = { workspace = true }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
 metadata = { path = "../metadata" }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 371258aef..adca4ca80 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -94,7 +94,7 @@ impl MessageBus for MemBus {
         &self,
         client_id: Self::Client,
         message: Self::Data,
-    ) -> Result<(), IggyError> {
+    ) -> Result<Self::Data, IggyError> {
         if !self.clients.lock().unwrap().contains(&client_id) {
             #[allow(clippy::cast_possible_truncation)]
             return Err(IggyError::ClientNotFound(client_id as u32));
@@ -104,17 +104,17 @@ impl MessageBus for MemBus {
             from_replica: None,
             to_replica: None,
             to_client: Some(client_id),
-            message,
+            message: message.clone(),
         });
 
-        Ok(())
+        Ok(message)
     }
 
     async fn send_to_replica(
         &self,
         replica: Self::Replica,
         message: Self::Data,
-    ) -> Result<(), IggyError> {
+    ) -> Result<Self::Data, IggyError> {
         if !self.replicas.lock().unwrap().contains(&replica) {
             return Err(IggyError::ResourceNotFound(format!("Replica 
{replica}")));
         }
@@ -123,10 +123,10 @@ impl MessageBus for MemBus {
             from_replica: None,
             to_replica: Some(replica),
             to_client: None,
-            message,
+            message: message.clone(),
         });
 
-        Ok(())
+        Ok(message)
     }
 }
 
@@ -167,7 +167,7 @@ impl MessageBus for SharedMemBus {
         &self,
         client_id: Self::Client,
         message: Self::Data,
-    ) -> Result<(), IggyError> {
+    ) -> Result<Self::Data, IggyError> {
         self.0.send_to_client(client_id, message).await
     }
 
@@ -175,7 +175,7 @@ impl MessageBus for SharedMemBus {
         &self,
         replica: Self::Replica,
         message: Self::Data,
-    ) -> Result<(), IggyError> {
+    ) -> Result<Self::Data, IggyError> {
         self.0.send_to_replica(replica, message).await
     }
 }
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 6dc9e1eab..23c57915d 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -23,6 +23,7 @@ use iggy_common::{
     message::Message,
     sharding::IggyNamespace,
 };
+use iobuf::Owned;
 use std::cell::Cell;
 
 // TODO: Proper client which implements the full client SDK API
@@ -130,8 +131,6 @@ impl SimClient {
         payload: &[u8],
         namespace: IggyNamespace,
     ) -> Message<RequestHeader> {
-        use bytes::Bytes;
-
         let header_size = std::mem::size_of::<RequestHeader>();
         let total_size = header_size + payload.len();
 
@@ -159,14 +158,12 @@ impl SimClient {
         buffer.extend_from_slice(header_bytes);
         buffer.extend_from_slice(payload);
 
-        Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
-            .expect("failed to build request message")
+        
Message::try_from(Owned::<4096>::copy_from_slice(&buffer).split_at(header_size))
+            .expect("request buffer must contain a valid request message")
     }
 
     #[allow(clippy::cast_possible_truncation)]
     fn build_request(&self, operation: Operation, payload: &[u8]) -> 
Message<RequestHeader> {
-        use bytes::Bytes;
-
         let header_size = std::mem::size_of::<RequestHeader>();
         let total_size = header_size + payload.len();
 
@@ -193,7 +190,7 @@ impl SimClient {
         buffer.extend_from_slice(header_bytes);
         buffer.extend_from_slice(payload);
 
-        Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
-            .expect("failed to build request message")
+        
Message::try_from(Owned::<4096>::copy_from_slice(&buffer).split_at(header_size))
+            .expect("request buffer must contain a valid request message")
     }
 }
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 9f4d64b79..a7f0a9630 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
 use iggy_common::header::PrepareHeader;
 use iggy_common::message::Message;
 use iggy_common::variadic;
+use iobuf::{Owned, TryMerge};
 use journal::{Journal, JournalHandle, Storage};
 use metadata::MuxStateMachine;
 use metadata::stm::consumer_group::ConsumerGroups;
@@ -108,8 +108,10 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
         let offset = *offsets.get(&header.op)?;
 
         let buffer = self.storage.read(offset, header.size as usize).await;
-        let message =
-            Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes 
should be valid");
+        let message = Message::try_from(
+            
Owned::<4096>::copy_from_slice(&buffer).split_at(std::mem::size_of::<PrepareHeader>()),
+        )
+        .expect("prepare buffer must contain a valid prepare message");
         Some(message)
     }
 
@@ -122,9 +124,10 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
 
     async fn append(&self, entry: Self::Entry) {
         let header = *entry.header();
-        let message_bytes = entry.as_bytes();
+        let message_bytes = unsafe { entry.into_inner().try_merge() }
+            .expect("simulator journal append expects a unique message 
buffer");
 
-        let bytes_written = self.storage.write(message_bytes.to_vec()).await;
+        let bytes_written = 
self.storage.write(message_bytes.as_slice().to_vec()).await;
 
         let offset = self.write_offset.get();
         unsafe { &mut *self.headers.get() }.insert(header.op, header);
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 5e64aca81..093f11aae 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -25,12 +25,12 @@ pub mod replica;
 
 use bus::MemBus;
 use consensus::PartitionsHandle;
+use iggy_common::IggyError;
 use iggy_common::header::ReplyHeader;
 use iggy_common::message::Message;
 use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyError, IggyMessagesBatchSet};
 use message_bus::MessageBus;
-use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer};
+use partitions::{IggyMessages2, Partition, PartitionOffsets, PollingArgs, 
PollingConsumer};
 use replica::{Replica, new_replica};
 use std::sync::Arc;
 
@@ -155,7 +155,7 @@ impl Simulator {
         namespace: IggyNamespace,
         consumer: PollingConsumer,
         args: PollingArgs,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessages2, IggyError> {
         let replica = &self.replicas[replica_idx];
         let partition =
             replica
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 9aba6e17a..95e84d617 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -744,7 +744,11 @@ mod tests {
         let header: &mut GenericHeader =
             bytemuck::checked::try_from_bytes_mut(&mut buf).expect("zeroed 
bytes are valid");
         header.command = command;
-        Message::<GenericHeader>::from_bytes(bytes::Bytes::from(buf)).unwrap()
+        Message::try_from(
+            iobuf::Owned::<4096>::copy_from_slice(&buf)
+                .split_at(std::mem::size_of::<GenericHeader>()),
+        )
+        .expect("generic test buffer must contain a valid generic message")
     }
 
     /// Helper: disable all links to/from a given replica (isolate it).

Reply via email to