This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e3029d6b refactor(consensus): consolidate VSR types in 
binary_protocol (#3014)
4e3029d6b is described below

commit 4e3029d6be764b6de0cc3db2c45ad743dfb9608b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Mar 25 08:47:56 2026 +0100

    refactor(consensus): consolidate VSR types in binary_protocol (#3014)
---
 Cargo.lock                                    |   8 +-
 core/binary_protocol/src/consensus/message.rs | 113 +++-
 core/binary_protocol/src/consensus/mod.rs     |   1 +
 core/binary_protocol/src/lib.rs               |   6 +-
 core/common/Cargo.toml                        |   2 -
 core/common/src/lib.rs                        |   1 -
 core/common/src/types/consensus/header.rs     | 879 --------------------------
 core/common/src/types/consensus/message.rs    | 719 ---------------------
 core/common/src/types/consensus/mod.rs        |  19 -
 core/common/src/types/mod.rs                  |   1 -
 core/consensus/Cargo.toml                     |   1 +
 core/consensus/src/impls.rs                   |   7 +-
 core/consensus/src/lib.rs                     |   3 +-
 core/consensus/src/namespaced_pipeline.rs     |   5 +-
 core/consensus/src/plane_helpers.rs           |   7 +-
 core/message_bus/Cargo.toml                   |   1 +
 core/message_bus/src/lib.rs                   |   3 +-
 core/metadata/Cargo.toml                      |   1 +
 core/metadata/src/impls/metadata.rs           |   8 +-
 core/metadata/src/stm/mod.rs                  |   4 +-
 core/metadata/src/stm/mux.rs                  |   5 +-
 core/partitions/Cargo.toml                    |   1 +
 core/partitions/src/iggy_partition.rs         |   3 +-
 core/partitions/src/iggy_partitions.rs        |   9 +-
 core/partitions/src/journal.rs                |   7 +-
 core/partitions/src/lib.rs                    |   4 +-
 core/shard/Cargo.toml                         |   1 +
 core/shard/src/lib.rs                         |   7 +-
 core/shard/src/router.rs                      |   3 +-
 core/simulator/Cargo.toml                     |   1 +
 core/simulator/src/bus.rs                     |   3 +-
 core/simulator/src/client.rs                  |  11 +-
 core/simulator/src/deps.rs                    |   3 +-
 core/simulator/src/lib.rs                     |   9 +-
 core/simulator/src/main.rs                    |   3 +-
 core/simulator/src/network.rs                 |   2 +-
 core/simulator/src/packet.rs                  |   3 +-
 37 files changed, 173 insertions(+), 1691 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index cdf2f962b..e69629314 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2231,6 +2231,7 @@ dependencies = [
  "bytemuck",
  "bytes",
  "futures",
+ "iggy_binary_protocol",
  "iggy_common",
  "message_bus",
  "rand 0.10.0",
@@ -5416,7 +5417,6 @@ dependencies = [
  "blake3",
  "bon",
  "byte-unit",
- "bytemuck",
  "bytes",
  "chrono",
  "clap",
@@ -5424,7 +5424,6 @@ dependencies = [
  "compio",
  "crossbeam",
  "derive_more",
- "enumset",
  "err_trail",
  "human-repr",
  "humantime",
@@ -6660,6 +6659,7 @@ dependencies = [
 name = "message_bus"
 version = "0.1.0"
 dependencies = [
+ "iggy_binary_protocol",
  "iggy_common",
  "rand 0.10.0",
 ]
@@ -6671,6 +6671,7 @@ dependencies = [
  "ahash 0.8.12",
  "bytes",
  "consensus",
+ "iggy_binary_protocol",
  "iggy_common",
  "journal",
  "left-right",
@@ -7648,6 +7649,7 @@ version = "0.1.0"
 dependencies = [
  "bytes",
  "consensus",
+ "iggy_binary_protocol",
  "iggy_common",
  "journal",
  "message_bus",
@@ -9833,6 +9835,7 @@ dependencies = [
  "crossfire",
  "futures",
  "hash32 1.0.0",
+ "iggy_binary_protocol",
  "iggy_common",
  "journal",
  "message_bus",
@@ -9942,6 +9945,7 @@ dependencies = [
  "consensus",
  "enumset",
  "futures",
+ "iggy_binary_protocol",
  "iggy_common",
  "journal",
  "message_bus",
diff --git a/core/binary_protocol/src/consensus/message.rs 
b/core/binary_protocol/src/consensus/message.rs
index d8f56d872..f63cf1c19 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -20,10 +20,26 @@
 //! `Message<H>` wraps a `Bytes` buffer and provides typed access to the
 //! header via `bytemuck` pointer cast (zero allocation, zero copy).
 
-use crate::consensus::{Command2, ConsensusError, ConsensusHeader, 
GenericHeader};
+use crate::consensus::{
+    Command2, ConsensusError, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader,
+    RequestHeader,
+};
 use bytes::Bytes;
 use std::marker::PhantomData;
 
+/// Trait for types that provide access to a consensus header.
+pub trait ConsensusMessage<H: ConsensusHeader> {
+    fn header(&self) -> &H;
+}
+
+impl<H: ConsensusHeader> ConsensusMessage<H> for Message<H> {
+    fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::checked::try_from_bytes(header_bytes)
+            .expect("header validated at construction time")
+    }
+}
+
 /// Zero-copy message wrapping a `Bytes` buffer with a typed header.
 ///
 /// The header is accessed via `bytemuck::try_from_bytes` - a pointer cast,
@@ -197,6 +213,36 @@ impl<H: ConsensusHeader> Message<H> {
         Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
     }
 
+    /// Try to borrow as a different header type without consuming.
+    ///
+    /// # Errors
+    /// Returns `ConsensusError` if the header command doesn't match or 
validation fails.
+    pub fn try_as_typed<T: ConsensusHeader>(&self) -> Result<&Message<T>, 
ConsensusError> {
+        if self.buffer.len() < size_of::<T>() {
+            return Err(ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: Command2::Reserved,
+            });
+        }
+
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let header_bytes = &self.buffer[..size_of::<T>()];
+        bytemuck::checked::try_from_bytes::<T>(header_bytes)
+            .map_err(|_| ConsensusError::InvalidBitPattern)?
+            .validate()?;
+
+        // SAFETY: Message<H> and Message<T> have identical layout (#[repr(C)],
+        // differ only in PhantomData). Header validated above.
+        Ok(unsafe { &*std::ptr::from_ref(self).cast::<Message<T>>() })
+    }
+
     /// Transmute the header to a different type, preserving the buffer.
     ///
     /// The callback receives the old header (by value) and a mutable reference
@@ -240,6 +286,71 @@ impl<H: ConsensusHeader> Message<H> {
     }
 }
 
+/// Type-erased message bag for dispatching incoming consensus messages.
+#[derive(Debug)]
+pub enum MessageBag {
+    Request(Message<RequestHeader>),
+    Prepare(Message<PrepareHeader>),
+    PrepareOk(Message<PrepareOkHeader>),
+}
+
+impl MessageBag {
+    #[must_use]
+    pub fn command(&self) -> Command2 {
+        match self {
+            Self::Request(m) => m.header().command,
+            Self::Prepare(m) => m.header().command,
+            Self::PrepareOk(m) => m.header().command,
+        }
+    }
+
+    #[must_use]
+    pub fn size(&self) -> u32 {
+        match self {
+            Self::Request(m) => m.header().size(),
+            Self::Prepare(m) => m.header().size(),
+            Self::PrepareOk(m) => m.header().size(),
+        }
+    }
+
+    #[must_use]
+    pub fn operation(&self) -> crate::consensus::Operation {
+        match self {
+            Self::Request(m) => m.header().operation,
+            Self::Prepare(m) => m.header().operation,
+            Self::PrepareOk(m) => m.header().operation,
+        }
+    }
+}
+
+impl<T: ConsensusHeader> TryFrom<Message<T>> for MessageBag {
+    type Error = ConsensusError;
+
+    fn try_from(value: Message<T>) -> Result<Self, Self::Error> {
+        let command = value.as_generic().header().command;
+        let buffer = value.into_inner();
+
+        match command {
+            Command2::Request => {
+                let msg = unsafe { 
Message::<RequestHeader>::from_buffer_unchecked(buffer) };
+                Ok(Self::Request(msg))
+            }
+            Command2::Prepare => {
+                let msg = unsafe { 
Message::<PrepareHeader>::from_buffer_unchecked(buffer) };
+                Ok(Self::Prepare(msg))
+            }
+            Command2::PrepareOk => {
+                let msg = unsafe { 
Message::<PrepareOkHeader>::from_buffer_unchecked(buffer) };
+                Ok(Self::PrepareOk(msg))
+            }
+            other => Err(ConsensusError::InvalidCommand {
+                expected: Command2::Reserved,
+                found: other,
+            }),
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::Message;
diff --git a/core/binary_protocol/src/consensus/mod.rs 
b/core/binary_protocol/src/consensus/mod.rs
index 11bead1ef..3840eec82 100644
--- a/core/binary_protocol/src/consensus/mod.rs
+++ b/core/binary_protocol/src/consensus/mod.rs
@@ -51,4 +51,5 @@ pub use header::{
     CommitHeader, ConsensusHeader, DoViewChangeHeader, GenericHeader, 
HEADER_SIZE, PrepareHeader,
     PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader, 
StartViewHeader,
 };
+pub use message::{ConsensusMessage, MessageBag};
 pub use operation::Operation;
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 7ed4fdad0..cce1e5d56 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -68,9 +68,9 @@ pub mod responses;
 
 pub use codec::{WireDecode, WireEncode};
 pub use consensus::{
-    Command2, CommitHeader, ConsensusError, ConsensusHeader, 
DoViewChangeHeader, GenericHeader,
-    HEADER_SIZE, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader, 
RequestHeader,
-    StartViewChangeHeader, StartViewHeader, message::Message,
+    Command2, CommitHeader, ConsensusError, ConsensusHeader, ConsensusMessage, 
DoViewChangeHeader,
+    GenericHeader, HEADER_SIZE, MessageBag, Operation, PrepareHeader, 
PrepareOkHeader, ReplyHeader,
+    RequestHeader, StartViewChangeHeader, StartViewHeader, message::Message,
 };
 pub use dispatch::{COMMAND_TABLE, CommandMeta, lookup_by_operation, 
lookup_command};
 pub use error::WireError;
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index b7e610181..3af8fa87f 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -37,7 +37,6 @@ base64 = { workspace = true }
 blake3 = { workspace = true }
 bon = { workspace = true }
 byte-unit = { workspace = true }
-bytemuck = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
@@ -45,7 +44,6 @@ comfy-table = { workspace = true }
 compio = { workspace = true }
 crossbeam = { workspace = true }
 derive_more = { workspace = true }
-enumset = { workspace = true }
 err_trail = { workspace = true }
 human-repr = { workspace = true }
 humantime = { workspace = true }
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index b1b8aeb09..c690b5a1c 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -96,7 +96,6 @@ pub use 
types::configuration::websocket_config::websocket_client_config::*;
 pub use 
types::configuration::websocket_config::websocket_client_config_builder::*;
 pub use 
types::configuration::websocket_config::websocket_client_reconnection_config::*;
 pub use 
types::configuration::websocket_config::websocket_connection_string_options::*;
-pub use types::consensus::*;
 pub use types::consumer::consumer_group::*;
 pub use types::consumer::consumer_group_id::*;
 pub use types::consumer::consumer_group_offsets::*;
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
deleted file mode 100644
index e7d62e758..000000000
--- a/core/common/src/types/consensus/header.rs
+++ /dev/null
@@ -1,879 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use bytemuck::{CheckedBitPattern, NoUninit};
-use enumset::EnumSetType;
-use thiserror::Error;
-
-const HEADER_SIZE: usize = 256;
-pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
-    const COMMAND: Command2;
-
-    fn validate(&self) -> Result<(), ConsensusError>;
-    fn operation(&self) -> Operation;
-    fn command(&self) -> Command2;
-    fn size(&self) -> u32;
-}
-
-#[derive(Default, Debug, EnumSetType)]
-#[repr(u8)]
-pub enum Command2 {
-    #[default]
-    Reserved = 0,
-
-    Ping = 1,
-    Pong = 2,
-    PingClient = 3,
-    PongClient = 4,
-
-    Request = 5,
-    Prepare = 6,
-    PrepareOk = 7,
-    Reply = 8,
-    Commit = 9,
-
-    StartViewChange = 10,
-    DoViewChange = 11,
-    StartView = 12,
-}
-
-// SAFETY: Command2 is #[repr(u8)] with no padding bytes.
-unsafe impl NoUninit for Command2 {}
-
-// SAFETY: Command2 is #[repr(u8)]; is_valid_bit_pattern matches all defined 
discriminants.
-unsafe impl CheckedBitPattern for Command2 {
-    type Bits = u8;
-
-    fn is_valid_bit_pattern(bits: &u8) -> bool {
-        *bits <= 12
-    }
-}
-
-#[derive(Debug, Clone, Error, PartialEq, Eq)]
-pub enum ConsensusError {
-    #[error("invalid command: expected {expected:?}, found {found:?}")]
-    InvalidCommand { expected: Command2, found: Command2 },
-
-    #[error("invalid size: expected {expected:?}, found {found:?}")]
-    InvalidSize { expected: u32, found: u32 },
-
-    #[error("invalid checksum")]
-    InvalidChecksum,
-
-    #[error("invalid cluster ID")]
-    InvalidCluster,
-
-    #[error("invalid field: {0}")]
-    InvalidField(String),
-
-    #[error("parent_padding must be 0")]
-    PrepareParentPaddingNonZero,
-
-    #[error("request_checksum_padding must be 0")]
-    PrepareRequestChecksumPaddingNonZero,
-
-    #[error("command must be Commit")]
-    CommitInvalidCommand2,
-
-    #[error("size must be 256, found {0}")]
-    CommitInvalidSize(u32),
-
-    // ReplyHeader specific
-    #[error("command must be Reply")]
-    ReplyInvalidCommand2,
-
-    #[error("request_checksum_padding must be 0")]
-    ReplyRequestChecksumPaddingNonZero,
-
-    #[error("context_padding must be 0")]
-    ReplyContextPaddingNonZero,
-
-    #[error("invalid bit pattern in header (enum discriminant out of range)")]
-    InvalidBitPattern,
-}
-
-#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, NoUninit, 
CheckedBitPattern)]
-#[repr(u8)]
-pub enum Operation {
-    #[default]
-    Reserved = 0,
-    CreateStream = 128,
-    UpdateStream = 129,
-    DeleteStream = 130,
-    PurgeStream = 131,
-    CreateTopic = 132,
-    UpdateTopic = 133,
-    DeleteTopic = 134,
-    PurgeTopic = 135,
-    CreatePartitions = 136,
-    DeletePartitions = 137,
-    DeleteSegments = 138,
-    CreateConsumerGroup = 139,
-    DeleteConsumerGroup = 140,
-    CreateUser = 141,
-    UpdateUser = 142,
-    DeleteUser = 143,
-    ChangePassword = 144,
-    UpdatePermissions = 145,
-    CreatePersonalAccessToken = 146,
-    DeletePersonalAccessToken = 147,
-
-    // Partition operations (replicated via consensus)
-    SendMessages = 160,
-    StoreConsumerOffset = 161,
-}
-
-impl Operation {
-    /// Returns `true` for metadata / control-plane operations (streams, 
topics,
-    /// users, consumer groups, etc.) that are always handled by shard 0.
-    #[inline]
-    pub fn is_metadata(&self) -> bool {
-        matches!(
-            self,
-            Operation::CreateStream
-                | Operation::UpdateStream
-                | Operation::DeleteStream
-                | Operation::PurgeStream
-                | Operation::CreateTopic
-                | Operation::UpdateTopic
-                | Operation::DeleteTopic
-                | Operation::PurgeTopic
-                | Operation::CreatePartitions
-                | Operation::DeletePartitions
-                | Operation::CreateConsumerGroup
-                | Operation::DeleteConsumerGroup
-                | Operation::CreateUser
-                | Operation::UpdateUser
-                | Operation::DeleteUser
-                | Operation::ChangePassword
-                | Operation::UpdatePermissions
-                | Operation::CreatePersonalAccessToken
-                | Operation::DeletePersonalAccessToken
-        )
-    }
-
-    /// Returns `true` for data-plane operations that are routed to the shard
-    /// owning the partition identified by the message's namespace.
-    #[inline]
-    pub fn is_partition(&self) -> bool {
-        matches!(
-            self,
-            Operation::SendMessages | Operation::StoreConsumerOffset | 
Operation::DeleteSegments
-        )
-    }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct GenericHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub reserved_command: [u8; 128],
-}
-const _: () = {
-    assert!(core::mem::size_of::<GenericHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(GenericHeader, reserved_command)
-            == core::mem::offset_of!(GenericHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(GenericHeader, reserved_command) + 
core::mem::size_of::<[u8; 128]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for GenericHeader {
-    const COMMAND: Command2 = Command2::Reserved;
-
-    fn operation(&self) -> Operation {
-        Operation::Reserved
-    }
-
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct RequestHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub client: u128,
-    pub request_checksum: u128,
-    pub timestamp: u64,
-    pub request: u64,
-    pub operation: Operation,
-    pub operation_padding: [u8; 7],
-    pub namespace: u64,
-    pub reserved: [u8; 64],
-}
-const _: () = {
-    assert!(core::mem::size_of::<RequestHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(RequestHeader, client)
-            == core::mem::offset_of!(RequestHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(RequestHeader, reserved) + 
core::mem::size_of::<[u8; 64]>()
-            == HEADER_SIZE
-    );
-};
-
-impl Default for RequestHeader {
-    fn default() -> Self {
-        Self {
-            checksum: 0,
-            checksum_body: 0,
-            cluster: 0,
-            size: 0,
-            view: 0,
-            release: 0,
-            command: Command2::Reserved,
-            replica: 0,
-            reserved_frame: [0; 66],
-            client: 0,
-            request_checksum: 0,
-            timestamp: 0,
-            request: 0,
-            operation: Operation::Reserved,
-            operation_padding: [0; 7],
-            namespace: 0,
-            reserved: [0; 64],
-        }
-    }
-}
-
-impl ConsensusHeader for RequestHeader {
-    const COMMAND: Command2 = Command2::Request;
-
-    fn operation(&self) -> Operation {
-        self.operation
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::Request {
-            return Err(ConsensusError::InvalidCommand {
-                expected: Command2::Request,
-                found: self.command,
-            });
-        }
-        Ok(())
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-// TODO: Manually impl default (and use a const for the `release`)
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct PrepareHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub client: u128,
-    pub parent: u128,
-    pub request_checksum: u128,
-    pub op: u64,
-    pub commit: u64,
-    pub timestamp: u64,
-    pub request: u64,
-    pub operation: Operation,
-    pub operation_padding: [u8; 7],
-    pub namespace: u64,
-    pub reserved: [u8; 32],
-}
-const _: () = {
-    assert!(core::mem::size_of::<PrepareHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(PrepareHeader, client)
-            == core::mem::offset_of!(PrepareHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(PrepareHeader, reserved) + 
core::mem::size_of::<[u8; 32]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for PrepareHeader {
-    const COMMAND: Command2 = Command2::Prepare;
-
-    fn operation(&self) -> Operation {
-        self.operation
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::Prepare {
-            return Err(ConsensusError::InvalidCommand {
-                expected: Command2::Prepare,
-                found: self.command,
-            });
-        }
-        Ok(())
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-impl Default for PrepareHeader {
-    fn default() -> Self {
-        Self {
-            checksum: 0,
-            checksum_body: 0,
-            cluster: 0,
-            size: 0,
-            view: 0,
-            release: 0,
-            command: Command2::Reserved,
-            replica: 0,
-            reserved_frame: [0; 66],
-            client: 0,
-            parent: 0,
-            request_checksum: 0,
-            op: 0,
-            commit: 0,
-            timestamp: 0,
-            request: 0,
-            operation: Operation::Reserved,
-            operation_padding: [0; 7],
-            namespace: 0,
-            reserved: [0; 32],
-        }
-    }
-}
-
-// TODO: Manually impl default (and use a const for the `release`)
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct PrepareOkHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub parent: u128,
-    pub prepare_checksum: u128,
-    pub op: u64,
-    pub commit: u64,
-    pub timestamp: u64,
-    pub request: u64,
-    pub operation: Operation,
-    pub operation_padding: [u8; 7],
-    pub namespace: u64,
-    pub reserved: [u8; 48],
-}
-const _: () = {
-    assert!(core::mem::size_of::<PrepareOkHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(PrepareOkHeader, parent)
-            == core::mem::offset_of!(PrepareOkHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(PrepareOkHeader, reserved) + 
core::mem::size_of::<[u8; 48]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for PrepareOkHeader {
-    const COMMAND: Command2 = Command2::PrepareOk;
-
-    fn operation(&self) -> Operation {
-        self.operation
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::PrepareOk {
-            return Err(ConsensusError::InvalidCommand {
-                expected: Command2::PrepareOk,
-                found: self.command,
-            });
-        }
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-impl Default for PrepareOkHeader {
-    fn default() -> Self {
-        Self {
-            checksum: 0,
-            checksum_body: 0,
-            cluster: 0,
-            size: 0,
-            view: 0,
-            release: 0,
-            command: Command2::Reserved,
-            replica: 0,
-            reserved_frame: [0; 66],
-            parent: 0,
-            prepare_checksum: 0,
-            op: 0,
-            commit: 0,
-            timestamp: 0,
-            request: 0,
-            operation: Operation::Reserved,
-            operation_padding: [0; 7],
-            namespace: 0,
-            reserved: [0; 48],
-        }
-    }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct CommitHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub commit_checksum: u128,
-    pub timestamp_monotonic: u64,
-    pub commit: u64,
-    pub checkpoint_op: u64,
-    pub namespace: u64,
-    pub reserved: [u8; 80],
-}
-const _: () = {
-    assert!(core::mem::size_of::<CommitHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(CommitHeader, commit_checksum)
-            == core::mem::offset_of!(CommitHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(CommitHeader, reserved) + 
core::mem::size_of::<[u8; 80]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for CommitHeader {
-    const COMMAND: Command2 = Command2::Commit;
-
-    fn operation(&self) -> Operation {
-        Operation::Reserved
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::Commit {
-            return Err(ConsensusError::CommitInvalidCommand2);
-        }
-        if self.size != 256 {
-            return Err(ConsensusError::CommitInvalidSize(self.size));
-        }
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct ReplyHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub request_checksum: u128,
-    pub context: u128,
-    pub op: u64,
-    pub commit: u64,
-    pub timestamp: u64,
-    pub request: u64,
-    pub operation: Operation,
-    pub operation_padding: [u8; 7],
-    pub namespace: u64,
-    pub reserved: [u8; 48],
-}
-const _: () = {
-    assert!(core::mem::size_of::<ReplyHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(ReplyHeader, request_checksum)
-            == core::mem::offset_of!(ReplyHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(ReplyHeader, reserved) + 
core::mem::size_of::<[u8; 48]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for ReplyHeader {
-    const COMMAND: Command2 = Command2::Reply;
-
-    fn operation(&self) -> Operation {
-        self.operation
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::Reply {
-            return Err(ConsensusError::ReplyInvalidCommand2);
-        }
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-impl Default for ReplyHeader {
-    fn default() -> Self {
-        Self {
-            checksum: 0,
-            checksum_body: 0,
-            cluster: 0,
-            size: 0,
-            view: 0,
-            release: 0,
-            command: Command2::Reserved,
-            replica: 0,
-            reserved_frame: [0; 66],
-            request_checksum: 0,
-            context: 0,
-            op: 0,
-            commit: 0,
-            timestamp: 0,
-            request: 0,
-            operation: Operation::Reserved,
-            operation_padding: [0; 7],
-            namespace: 0,
-            reserved: [0; 48],
-        }
-    }
-}
-
-/// StartViewChange message header.
-///
-/// Sent by a replica when it suspects the primary has failed.
-/// This is a header-only message with no body.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
-#[repr(C)]
-pub struct StartViewChangeHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    pub namespace: u64,
-    pub reserved: [u8; 120],
-}
-const _: () = {
-    assert!(core::mem::size_of::<StartViewChangeHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(StartViewChangeHeader, namespace)
-            == core::mem::offset_of!(StartViewChangeHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(StartViewChangeHeader, reserved) + 
core::mem::size_of::<[u8; 120]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for StartViewChangeHeader {
-    const COMMAND: Command2 = Command2::StartViewChange;
-
-    fn operation(&self) -> Operation {
-        Operation::Reserved
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::StartViewChange {
-            return Err(ConsensusError::InvalidCommand {
-                expected: Command2::StartViewChange,
-                found: self.command,
-            });
-        }
-
-        if self.release != 0 {
-            return Err(ConsensusError::InvalidField("release != 
0".to_string()));
-        }
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-/// DoViewChange message header.
-///
-/// Sent by replicas to the primary candidate after collecting a quorum of
-/// StartViewChange messages.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
-#[repr(C)]
-pub struct DoViewChangeHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    /// The highest op-number in this replica's log.
-    /// Used to select the most complete log when log_view values are equal.
-    pub op: u64,
-    /// The replica's commit number (highest committed op).
-    /// The new primary sets its commit to max(commit) across all DVCs.
-    pub commit: u64,
-    pub namespace: u64,
-    /// The view number when this replica's status was last normal.
-    /// This is the key field for log selection: the replica with the
-    /// highest log_view has the most authoritative log.
-    pub log_view: u32,
-    pub reserved: [u8; 100],
-}
-const _: () = {
-    assert!(core::mem::size_of::<DoViewChangeHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(DoViewChangeHeader, op)
-            == core::mem::offset_of!(DoViewChangeHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(DoViewChangeHeader, reserved) + 
core::mem::size_of::<[u8; 100]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for DoViewChangeHeader {
-    const COMMAND: Command2 = Command2::DoViewChange;
-
-    fn operation(&self) -> Operation {
-        Operation::Reserved
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::DoViewChange {
-            return Err(ConsensusError::InvalidCommand {
-                expected: Command2::DoViewChange,
-                found: self.command,
-            });
-        }
-
-        if self.release != 0 {
-            return Err(ConsensusError::InvalidField(
-                "release must be 0".to_string(),
-            ));
-        }
-
-        // log_view must be <= view (can't have been normal in a future view)
-        if self.log_view > self.view {
-            return Err(ConsensusError::InvalidField(
-                "log_view cannot exceed view".to_string(),
-            ));
-        }
-
-        // commit must be <= op (can't commit what we haven't seen)
-        if self.commit > self.op {
-            return Err(ConsensusError::InvalidField(
-                "commit cannot exceed op".to_string(),
-            ));
-        }
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
-
-/// StartView message header.
-///
-/// Sent by the new primary to all replicas after collecting a quorum of
-/// DoViewChange messages.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
-#[repr(C)]
-pub struct StartViewHeader {
-    pub checksum: u128,
-    pub checksum_body: u128,
-    pub cluster: u128,
-    pub size: u32,
-    pub view: u32,
-    pub release: u32,
-    pub command: Command2,
-    pub replica: u8,
-    pub reserved_frame: [u8; 66],
-
-    /// The op-number of the highest entry in the new primary's log.
-    /// Backups set their op to this value.
-    pub op: u64,
-    /// The commit number.
-    /// This is max(commit) from all DVCs received by the primary.
-    /// Backups set their commit to this value.
-    pub commit: u64,
-    pub namespace: u64,
-    pub reserved: [u8; 104],
-}
-const _: () = {
-    assert!(core::mem::size_of::<StartViewHeader>() == HEADER_SIZE);
-    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
-    assert!(
-        core::mem::offset_of!(StartViewHeader, op)
-            == core::mem::offset_of!(StartViewHeader, reserved_frame)
-                + core::mem::size_of::<[u8; 66]>()
-    );
-    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
-    assert!(
-        core::mem::offset_of!(StartViewHeader, reserved) + 
core::mem::size_of::<[u8; 104]>()
-            == HEADER_SIZE
-    );
-};
-
-impl ConsensusHeader for StartViewHeader {
-    const COMMAND: Command2 = Command2::StartView;
-
-    fn operation(&self) -> Operation {
-        Operation::Reserved
-    }
-    fn command(&self) -> Command2 {
-        self.command
-    }
-
-    fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command2::StartView {
-            return Err(ConsensusError::InvalidCommand {
-                expected: Command2::StartView,
-                found: self.command,
-            });
-        }
-
-        if self.release != 0 {
-            return Err(ConsensusError::InvalidField(
-                "release must be 0".to_string(),
-            ));
-        }
-
-        // commit must be <= op
-        if self.commit > self.op {
-            return Err(ConsensusError::InvalidField(
-                "commit cannot exceed op".to_string(),
-            ));
-        }
-        Ok(())
-    }
-
-    fn size(&self) -> u32 {
-        self.size
-    }
-}
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
deleted file mode 100644
index 5a49671db..000000000
--- a/core/common/src/types/consensus/message.rs
+++ /dev/null
@@ -1,719 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::{
-    header::RequestHeader,
-    types::consensus::header::{self, ConsensusHeader, PrepareHeader, 
PrepareOkHeader},
-};
-use bytes::Bytes;
-use std::marker::PhantomData;
-
-// TODO: Rename this to Message and ConsensusHeader to Header.
-pub trait ConsensusMessage<H>
-where
-    H: ConsensusHeader,
-{
-    // TODO: fn body(&self) -> Something;
-    fn header(&self) -> &H;
-}
-
-impl<H> ConsensusMessage<H> for Message<H>
-where
-    H: ConsensusHeader,
-{
-    fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
-        bytemuck::checked::try_from_bytes(header_bytes)
-            .expect("header validated at construction time")
-    }
-}
-
-#[derive(Debug, Clone)]
-pub struct Message<H: ConsensusHeader> {
-    buffer: Bytes,
-    _marker: PhantomData<H>,
-}
-
-impl<H> Message<H>
-where
-    H: ConsensusHeader,
-{
-    #[inline]
-    #[allow(unused)]
-    pub fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
-        bytemuck::checked::try_from_bytes(header_bytes)
-            .expect("header validated at construction time")
-    }
-
-    /// Create a new message from a buffer.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    /// - buffer is too small for the header
-    /// - buffer contains invalid bit patterns (enum discriminants)
-    /// - buffer is too small for the size specified in the header
-    /// - header validation fails
-    #[allow(unused)]
-    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
-        // verify minimum size
-        if buffer.len() < size_of::<H>() {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: H::COMMAND,
-                found: header::Command2::Reserved,
-            });
-        }
-
-        // Validate bit patterns (enum discriminants) via try_from_bytes
-        let header_bytes = &buffer[..size_of::<H>()];
-        let header = bytemuck::checked::try_from_bytes::<H>(header_bytes)
-            .map_err(|_| header::ConsensusError::InvalidBitPattern)?;
-
-        // validate the header
-        header.validate()?;
-
-        // verify buffer size matches header.size
-        let header_size = header.size() as usize;
-        if buffer.len() < header_size {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: H::COMMAND,
-                found: header::Command2::Reserved,
-            });
-        }
-
-        Ok(Self {
-            buffer,
-            _marker: PhantomData,
-        })
-    }
-
-    /// Create a new message with a specific size, initializing the buffer 
with zeros.
-    ///
-    /// The header will be zeroed and must be initialized by the caller.
-    #[allow(unused)]
-    pub fn new(size: usize) -> Self {
-        assert!(size >= size_of::<H>(), "Size must be at least header size");
-        let buffer = Bytes::from(vec![0u8; size]);
-        Self {
-            buffer,
-            _marker: PhantomData,
-        }
-    }
-
-    /// Transmute the header to a different type, using the provided function 
to modify the new header.
-    pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut 
T)) -> Message<T> {
-        assert_eq!(size_of::<H>(), size_of::<T>());
-
-        // Copy old header to stack to avoid UB.
-        let old_header = *self.header();
-
-        // Safety: We ensured that size_of::<H>() == size_of::<T>()
-        // On top of that, there is going to be only one reference to buffer 
during this function call
-        // so no other references can observe the mutation.
-        // In the future we can replace the `Bytes` buffer with something that 
does not allow sharing between different threads.
-        let buffer = self.into_inner();
-        unsafe {
-            let ptr = buffer.as_ptr() as *mut u8;
-            let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
-            // Zero out to ensure valid bit patterns before creating a typed 
reference.
-            slice.fill(0);
-            let new_header =
-                bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed 
bytes are valid");
-            f(old_header, new_header);
-        }
-
-        Message {
-            buffer,
-            _marker: PhantomData,
-        }
-    }
-
-    /// Get a reference to the message body (everything after the header).
-    ///
-    /// Returns an empty slice if there is no body.
-    #[inline]
-    #[allow(unused)]
-    pub fn body(&self) -> &[u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-
-        if total_size > header_size {
-            &self.buffer[header_size..total_size]
-        } else {
-            &[]
-        }
-    }
-
-    /// Get the message body as zero-copy `Bytes`.
-    ///
-    /// Returns an empty `Bytes` if there is no body.
-    #[inline]
-    #[allow(unused)]
-    pub fn body_bytes(&self) -> Bytes {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-
-        if total_size > header_size {
-            self.buffer.slice(header_size..total_size)
-        } else {
-            Bytes::new()
-        }
-    }
-
-    /// Get the complete message as bytes (header + body).
-    #[inline]
-    #[allow(unused)]
-    pub fn as_bytes(&self) -> &[u8] {
-        let total_size = self.header().size() as usize;
-        &self.buffer[..total_size]
-    }
-
-    /// Convert into the underlying buffer.
-    #[inline]
-    #[allow(unused)]
-    pub fn into_inner(self) -> Bytes {
-        self.buffer
-    }
-
-    /// Create a message from a buffer without validation.
-    ///
-    /// # Safety
-    ///
-    /// This is private and skips validation. Use only when:
-    /// - The buffer is already validated
-    /// - If doing a zero-cost type conversion (like to GenericHeader)
-    #[inline]
-    #[allow(unused)]
-    unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
-        Self {
-            buffer,
-            _marker: PhantomData,
-        }
-    }
-
-    /// Convert to a generic message (erasing the specific header type).
-    ///
-    /// This allows treating any message as a generic message for common 
operations.
-    #[allow(unused)]
-    pub fn into_generic(self) -> Message<header::GenericHeader> {
-        unsafe { Message::from_buffer_unchecked(self.buffer) }
-    }
-
-    /// Get a reference to this message as a generic message.
-    #[allow(unused)]
-    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
-        // SAFETY: Message<H> and Message<GenericHeader> have the same memory 
layout
-        // because they only differ in the PhantomData type parameter
-        unsafe { &*(self as *const Self as *const 
Message<header::GenericHeader>) }
-    }
-
-    /// Try to convert this message to a different header type.
-    ///
-    /// This validates that the command in the header matches the target type's
-    /// expected command before performing the conversion.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    /// - The command doesn't match the target type
-    /// - The target header validation fails
-    #[allow(unused)]
-    pub fn try_into_typed<T>(self) -> Result<Message<T>, 
header::ConsensusError>
-    where
-        T: ConsensusHeader,
-    {
-        if self.buffer.len() < size_of::<T>() {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: header::Command2::Reserved,
-            });
-        }
-
-        let generic = self.as_generic();
-        if generic.header().command != T::COMMAND {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: generic.header().command,
-            });
-        }
-
-        // Validate bit patterns for the target type
-        let header_bytes = &self.buffer[..size_of::<T>()];
-        let header = bytemuck::checked::try_from_bytes::<T>(header_bytes)
-            .map_err(|_| header::ConsensusError::InvalidBitPattern)?;
-
-        header.validate()?;
-
-        Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
-    }
-
-    /// Try to get a reference to this message as a different header type.
-    ///
-    /// This is similar to `try_into_typed` but borrows instead of consuming.
-    #[allow(unused)]
-    pub fn try_as_typed<T>(&self) -> Result<&Message<T>, 
header::ConsensusError>
-    where
-        T: ConsensusHeader,
-    {
-        // check buffer size
-        if self.buffer.len() < size_of::<T>() {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: header::Command2::Reserved,
-            });
-        }
-
-        // check the command matches
-        let generic = self.as_generic();
-        if generic.header().command != T::COMMAND {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: generic.header().command,
-            });
-        }
-
-        // Validate bit patterns for the target type
-        let header_bytes = &self.buffer[..size_of::<T>()];
-        bytemuck::checked::try_from_bytes::<T>(header_bytes)
-            .map_err(|_| header::ConsensusError::InvalidBitPattern)?
-            .validate()?;
-
-        let typed_message = unsafe { &*(self as *const Self as *const 
Message<T>) };
-
-        Ok(typed_message)
-    }
-}
-
-#[derive(Debug)]
-#[allow(unused)]
-pub enum MessageBag {
-    Request(Message<RequestHeader>),
-    Prepare(Message<PrepareHeader>),
-    PrepareOk(Message<PrepareOkHeader>),
-}
-
-impl MessageBag {
-    #[allow(unused)]
-    pub fn command(&self) -> header::Command2 {
-        match self {
-            MessageBag::Request(message) => message.header().command,
-            MessageBag::Prepare(message) => message.header().command,
-            MessageBag::PrepareOk(message) => message.header().command,
-        }
-    }
-
-    #[allow(unused)]
-    pub fn size(&self) -> u32 {
-        match self {
-            MessageBag::Request(message) => message.header().size(),
-            MessageBag::Prepare(message) => message.header().size(),
-            MessageBag::PrepareOk(message) => message.header().size(),
-        }
-    }
-
-    #[allow(unused)]
-    pub fn operation(&self) -> header::Operation {
-        match self {
-            MessageBag::Request(message) => message.header().operation,
-            MessageBag::Prepare(message) => message.header().operation,
-            MessageBag::PrepareOk(message) => message.header().operation,
-        }
-    }
-}
-
-impl<T> TryFrom<Message<T>> for MessageBag
-where
-    T: ConsensusHeader,
-{
-    type Error = header::ConsensusError;
-
-    fn try_from(value: Message<T>) -> Result<Self, Self::Error> {
-        let command = value.as_generic().header().command;
-        let buffer = value.into_inner();
-
-        // SAFETY: All Message<H> types have identical memory layout (only 
PhantomData differs).
-        // We've validated the command when the original message was created.
-        match command {
-            header::Command2::Prepare => {
-                let msg =
-                    unsafe { 
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
-                Ok(MessageBag::Prepare(msg))
-            }
-            header::Command2::Request => {
-                let msg =
-                    unsafe { 
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
-                Ok(MessageBag::Request(msg))
-            }
-            header::Command2::PrepareOk => {
-                let msg =
-                    unsafe { 
Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) };
-                Ok(MessageBag::PrepareOk(msg))
-            }
-            other => Err(header::ConsensusError::InvalidCommand {
-                expected: header::Command2::Reserved,
-                found: other,
-            }),
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use bytes::BytesMut;
-
-    use super::*;
-
-    trait MessageFactory: ConsensusHeader + Sized {
-        fn create_test() -> Message<Self>;
-    }
-
-    impl MessageFactory for header::GenericHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let body_size = 128;
-            let total_size = header_size + body_size;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut 
buffer[..header_size])
-                .expect("zeroed bytes are valid");
-
-            header.checksum = 123456;
-            header.cluster = 12345;
-            header.size = total_size as u32;
-            header.command = header::Command2::Reserved;
-
-            for (i, item) in buffer
-                .iter_mut()
-                .enumerate()
-                .take(total_size)
-                .skip(header_size)
-            {
-                *item = (i % 256) as u8;
-            }
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    impl MessageFactory for header::PrepareHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let body_size = 64;
-            let total_size = header_size + body_size;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            // Zeroed bytes are valid (Command2::Reserved=0, 
Operation::Reserved=0).
-            let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut 
buffer[..header_size])
-                .expect("zeroed bytes are valid");
-
-            header.checksum = 123456;
-            header.checksum_body = 789012;
-            header.cluster = 12345;
-            header.size = total_size as u32;
-            header.view = 1;
-            header.command = header::Command2::Prepare;
-            header.replica = 1;
-            header.op = 100;
-            header.commit = 99;
-            header.timestamp = 1234567890;
-            header.operation = header::Operation::CreateStream;
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    impl MessageFactory for header::CommitHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let total_size = 256;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut 
buffer[..header_size])
-                .expect("zeroed bytes are valid");
-
-            header.checksum = 123456;
-            header.cluster = 12345;
-            header.size = 256;
-            header.view = 1;
-            header.command = header::Command2::Commit;
-            header.replica = 2;
-            header.commit = 50;
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    impl MessageFactory for header::ReplyHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let body_size = 32;
-            let total_size = header_size + body_size;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut 
buffer[..header_size])
-                .expect("zeroed bytes are valid");
-
-            header.checksum = 123456;
-            header.cluster = 12345;
-            header.size = total_size as u32;
-            header.view = 1;
-            header.command = header::Command2::Reply;
-            header.replica = 3;
-            header.op = 100;
-            header.commit = 99;
-            header.operation = header::Operation::CreateStream;
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    #[test]
-    fn test_message_creation_and_access() {
-        let message = header::GenericHeader::create_test();
-
-        assert_eq!(message.header().cluster, 12345);
-        assert_eq!(message.header().command, header::Command2::Reserved);
-        assert_eq!(
-            message.body().len(),
-            message.header().size() as usize - 
size_of::<header::GenericHeader>()
-        );
-
-        let body = message.body();
-        let header_size = size_of::<header::GenericHeader>();
-        for (i, &byte) in body.iter().enumerate() {
-            let expected = ((i + header_size) % 256) as u8;
-            assert_eq!(byte, expected);
-        }
-    }
-
-    #[test]
-    fn test_message_conversion() {
-        let prepare_message = header::PrepareHeader::create_test();
-
-        let original_bytes = prepare_message.as_bytes().to_vec();
-
-        let generic_message = prepare_message.into_generic();
-        assert_eq!(generic_message.header().command, 
header::Command2::Prepare);
-
-        let prepare_again: Message<header::PrepareHeader> =
-            generic_message.try_into_typed().unwrap();
-
-        assert_eq!(prepare_again.header().op, 100);
-        assert_eq!(prepare_again.header().view, 1);
-        assert_eq!(prepare_again.header().cluster, 12345);
-
-        let roundtrip_bytes = prepare_again.as_bytes().to_vec();
-
-        assert_eq!(
-            original_bytes, roundtrip_bytes,
-            "Bytes should be identical after round-trip conversion"
-        );
-    }
-
-    #[test]
-    fn test_message_bag_from_prepare() {
-        let prepare = header::PrepareHeader::create_test();
-        let bag = MessageBag::try_from(prepare).expect("valid prepare 
message");
-
-        assert_eq!(bag.command(), header::Command2::Prepare);
-        assert!(matches!(bag, MessageBag::Prepare(_)));
-    }
-}
-
-// TODO: Header generic
-// TODO: We will have to impl something like this (NOT ONE TO ONE JUST A 
SKETCH) as we will use `bytemuck`:
-/*
-// Generic Message type
-#[repr(C)]
-pub struct Message<H: Header = GenericHeader> {
-    buffer: AlignedBuffer<ALIGNED_TO_HEADER_SIZE>,
-    _phantom: PhantomData<H>,
-}
-
-// Trait that all headers must implement
-pub trait Header: Sized {
-    const COMMAND: Command2;
-
-    fn size(&self) -> u32;
-    fn command(&self) -> Command2;
-    fn checksum(&self) -> u128;
-}
-
-// Command2 enum (simplified)
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-#[repr(u8)]
-pub enum Command2 {
-    reserved = 0,
-
-    ping = 1,
-    pong = 2,
-
-    request = 5,
-    prepare = 6,
-    prepare_ok = 7,
-    reply = 8,
-    commit = 9,
-
-    start_view_change = 10,
-    do_view_change = 11,
-    start_view = 24,
-    // ... etc
-}
-
-// Generic header for base Message
-#[repr(C)]
-pub struct GenericHeader {
-    checksum: u128,
-    command: Command2,
-    size: u32,
-    // ... other fields
-}
-
-// first 128 bytes GenericHeader (checksum, command, size, etc....)
-// second 128 bytes specific header (PrepareHeader, CommitHeader, etc....)
-
-impl Header for GenericHeader {
-    const COMMAND: Command2 = Command::Reserved;
-
-    fn size(&self) -> u32 { self.size }
-    fn command(&self) -> Command2 { self.command }
-    fn checksum(&self) -> u128 { self.checksum }
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
-    checksum: u128,
-    command: Command2,
-    size: u32,
-    // ... prepare-specific fields
-    view: u32,
-    op: u64,
-    commit: u64,
-}
-
-impl Header for PrepareHeader {
-    const COMMAND: Command2 = Command::Prepare;
-
-    fn size(&self) -> u32 { self.size }
-    fn command(&self) -> Command2 { self.command }
-    fn checksum(&self) -> u128 { self.checksum }
-}
-*/
-
-// And then for Message impl
-/*
-impl<H: Header> Message<H> {
-    // Access header (no stored pointer needed!)
-    pub fn header(&self) -> &H {
-        assert!(size_of::<H>() <= ALIGNED_TO_HEADER_SIZE);
-        unsafe { &*(self.buffer.as_ptr() as *const H) }
-    }
-
-    pub fn header_mut(&mut self) -> &mut H {
-        unsafe { &mut *(self.buffer.as_mut_ptr() as *mut H) }
-    }
-
-    pub fn body(&self) -> &[u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-        &self.buffer[header_size..total_size]
-    }
-
-    pub fn body_mut(&mut self) -> &mut [u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-        &mut self.buffer.as_mut()[header_size..total_size]
-    }
-
-    pub fn as_bytes(&self) -> &[u8] {
-        &self.buffer[..self.header().size() as usize]
-    }
-
-    pub fn base(&self) -> &Message<GenericHeader> {
-        unsafe { &*(self as *const Self as *const Message<GenericHeader>) }
-    }
-
-    pub fn base_mut(&mut self) -> &mut Message<GenericHeader> {
-        unsafe { &mut *(self as *mut Self as *mut Message<GenericHeader>) }
-    }
-
-    pub fn try_into<T: Header>(self) -> Option<Message<T>> {
-        if self.header().command() == T::COMMAND {
-            Some(unsafe { std::mem::transmute(self) })
-        } else {
-            None
-        }
-    }
-}
-*/
-
-// Then for the `Request` header we will have to store an `Operation` enum
-/*
- Define your operation enum
-#[repr(u8)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum Operation {
-    // Metadata operations
-    CreateTopic = 1,
-    DeleteTopic = 2,
-    ListTopics = 3,
-    CreatePartition = 4,
-
-    // Partition operations
-    Produce = 100,
-    Consume = 101,
-    Fetch = 102,
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
-    checksum: u128,
-    command: Command2,
-    size: u32,
-    // ... prepare-specific fields
-    view: u32,
-    op: u64,
-    commit: u64,
-
-    // second 128 bytes
-    operation: Operation,
-}
-*/
-
-// Which will have an method that returns discriminator between Metadata and 
Partition requests
-
-// TODO: Fill this enum
-// #[expect(unused)]
-// pub enum MessageBag {
-//     Void,
-// }
-
-// impl<H> From<Message<H>> for MessageBag
-// where
-//     H: ConsensusHeader,
-// {
-//     fn from(_value: Message<H>) -> Self {
-//         MessageBag::Void
-//     }
-// }
diff --git a/core/common/src/types/consensus/mod.rs 
b/core/common/src/types/consensus/mod.rs
deleted file mode 100644
index 61bfb7bcb..000000000
--- a/core/common/src/types/consensus/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod header;
-pub mod message;
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 1aaf2aae7..01b19de7b 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -22,7 +22,6 @@ pub(crate) mod cluster;
 pub(crate) mod command;
 pub(crate) mod compression;
 pub(crate) mod configuration;
-pub(crate) mod consensus;
 pub(crate) mod consumer;
 pub(crate) mod diagnostic;
 pub(crate) mod either;
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 2695f624c..ddf7a6764 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -31,6 +31,7 @@ readme = "../../../README.md"
 bit-set = { workspace = true }
 bytemuck = { workspace = true }
 bytes = { workspace = true }
+iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 message_bus = { workspace = true }
 rand = { workspace = true }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 882340d30..ea2ac7c61 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -21,11 +21,10 @@ use crate::{
     dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner,
 };
 use bit_set::BitSet;
-use iggy_common::header::{
-    Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader, 
PrepareHeader, PrepareOkHeader,
-    RequestHeader, StartViewChangeHeader, StartViewHeader,
+use iggy_binary_protocol::{
+    Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader, Message, 
PrepareHeader,
+    PrepareOkHeader, RequestHeader, StartViewChangeHeader, StartViewHeader,
 };
-use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
 use message_bus::MessageBus;
 use std::cell::{Cell, RefCell};
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index dcc5cbc6d..31ef7c6dd 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use iggy_common::header::ConsensusHeader;
-use iggy_common::message::ConsensusMessage;
+use iggy_binary_protocol::{ConsensusHeader, ConsensusMessage};
 use message_bus::MessageBus;
 
 pub trait Project<T, C: Consensus> {
diff --git a/core/consensus/src/namespaced_pipeline.rs 
b/core/consensus/src/namespaced_pipeline.rs
index 2eb90125b..7e77d3e16 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -17,8 +17,7 @@
 
 use crate::Pipeline;
 use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX, PipelineEntry};
-use iggy_common::header::PrepareHeader;
-use iggy_common::message::Message;
+use iggy_binary_protocol::{Message, PrepareHeader};
 use std::collections::{HashMap, VecDeque};
 
 /// Pipeline that partitions entries by namespace for independent commit 
draining.
@@ -313,7 +312,7 @@ impl Pipeline for NamespacedPipeline {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use iggy_common::header::Command2;
+    use iggy_binary_protocol::Command2;
 
     fn make_prepare(
         op: u64,
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 31356a448..fdfc42c31 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, 
VsrConsensus};
-use iggy_common::header::{Command2, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader};
-use iggy_common::message::Message;
+use iggy_binary_protocol::{
+    Command2, GenericHeader, Message, PrepareHeader, PrepareOkHeader, 
ReplyHeader,
+};
 use message_bus::MessageBus;
 use std::ops::AsyncFnOnce;
 
@@ -477,7 +478,7 @@ mod tests {
 
     #[test]
     fn loopback_cleared_on_complete_view_change_as_primary() {
-        use iggy_common::header::{DoViewChangeHeader, StartViewChangeHeader};
+        use iggy_binary_protocol::{DoViewChangeHeader, StartViewChangeHeader};
 
         // 3 replicas, replica 0 is primary for view 0 (and view 3: 3 % 3 = 0).
         let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index aabb56543..d43225bfb 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy";
 readme = "../../../README.md"
 
 [dependencies]
+iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 rand = { workspace = true }
 
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 876061bc4..efb3a7e6c 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -19,7 +19,8 @@ mod cache;
 use crate::cache::connection::{
     ConnectionCache, Coordinator, LeastLoadedStrategy, ShardedConnections,
 };
-use iggy_common::{IggyError, SenderKind, TcpSender, header::GenericHeader, 
message::Message};
+use iggy_binary_protocol::{GenericHeader, Message};
+use iggy_common::{IggyError, SenderKind, TcpSender};
 use std::{collections::HashMap, rc::Rc};
 
 /// Message bus parameterized by allocation strategy and sharded state
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index e0d024aaf..3b12f52e1 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -31,6 +31,7 @@ readme = "../../../README.md"
 ahash = { workspace = true }
 bytes = { workspace = true }
 consensus = { workspace = true }
+iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 journal = { workspace = true }
 left-right = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 9751eb0c7..668f49766 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -23,11 +23,9 @@ use consensus::{
     pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
     send_prepare_ok as send_prepare_ok_common,
 };
-use iggy_common::{
-    header::{
-        Command2, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader, RequestHeader,
-    },
-    message::Message,
+use iggy_binary_protocol::{
+    Command2, ConsensusHeader, GenericHeader, Message, PrepareHeader, 
PrepareOkHeader,
+    RequestHeader,
 };
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e25aaaf65..68f36f3a0 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -241,13 +241,13 @@ macro_rules! collect_handlers {
 
             impl $crate::stm::Command for [<$state Inner>] {
                 type Cmd = [<$state Command>];
-                type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+                type Input = 
::iggy_binary_protocol::Message<::iggy_binary_protocol::PrepareHeader>;
                 type Error = ::iggy_common::IggyError;
 
                 fn parse(input: Self::Input) -> 
Result<::iggy_common::Either<Self::Cmd, Self::Input>, Self::Error> {
                     use ::iggy_common::BytesSerializable;
                     use ::iggy_common::Either;
-                    use ::iggy_common::header::Operation;
+                    use ::iggy_binary_protocol::Operation;
                     match input.header().operation {
                         $(
                             Operation::$operation => {
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 4b6127873..89a545297 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -16,9 +16,9 @@
 // under the License.
 
 use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
+use iggy_binary_protocol::{Message, PrepareHeader};
 use iggy_common::Either;
 use iggy_common::variadic;
-use iggy_common::{header::PrepareHeader, message::Message};
 
 use crate::stm::{State, StateMachine};
 
@@ -143,8 +143,7 @@ mod tests {
         use crate::stm::mux::MuxStateMachine;
         use crate::stm::stream::{Streams, StreamsInner};
         use crate::stm::user::{Users, UsersInner};
-        use iggy_common::header::PrepareHeader;
-        use iggy_common::message::Message;
+        use iggy_binary_protocol::{Message, PrepareHeader};
 
         let users: Users = UsersInner::new().into();
         let streams: Streams = StreamsInner::new().into();
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index c355ea509..2362abb73 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -30,6 +30,7 @@ readme = "../../README.md"
 [dependencies]
 bytes = { workspace = true }
 consensus = { workspace = true }
+iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 journal = { workspace = true }
 message_bus = { workspace = true }
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 0413cb472..393e59fc1 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -23,12 +23,11 @@ use crate::{
     AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer,
     decode_send_messages_batch,
 };
+use iggy_binary_protocol::{Message, Operation, PrepareHeader};
 use iggy_common::{
     ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, 
ConsumerOffsets,
     IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet, 
IggyTimestamp,
     PartitionStats, PollingKind,
-    header::{Operation, PrepareHeader},
-    message::Message,
 };
 use journal::Journal as _;
 use std::sync::Arc;
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index d219c905e..4fb9fc34f 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -29,13 +29,12 @@ use consensus::{
     pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
     send_prepare_ok as send_prepare_ok_common,
 };
-use iggy_common::header::Command2;
+use iggy_binary_protocol::{
+    Command2, ConsensusHeader, GenericHeader, Message, Operation, 
PrepareHeader, PrepareOkHeader,
+    RequestHeader,
+};
 use iggy_common::{
     IggyByteSize, PartitionStats, Segment, SegmentStorage,
-    header::{
-        ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, RequestHeader,
-    },
-    message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
 use message_bus::MessageBus;
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index d3b779fee..5be236975 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -16,11 +16,8 @@
 // under the License.
 
 use bytes::Bytes;
-use iggy_common::{
-    IggyMessagesBatchMut, IggyMessagesBatchSet,
-    header::{Operation, PrepareHeader},
-    message::Message,
-};
+use iggy_binary_protocol::{Message, Operation, PrepareHeader};
+use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet};
 use journal::{Journal, Storage};
 use std::{
     cell::UnsafeCell,
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 833c712bd..67bb0b8c3 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -24,9 +24,9 @@ mod log;
 mod types;
 
 use bytes::{Bytes, BytesMut};
+use iggy_binary_protocol::{Message, PrepareHeader};
 use iggy_common::{
-    INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet,
-    PooledBuffer, header::PrepareHeader, message::Message,
+    INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet, PooledBuffer,
 };
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index 387f5f7ef..f5cb07c4e 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -26,6 +26,7 @@ consensus = { path = "../consensus" }
 crossfire = { workspace = true }
 futures = { workspace = true }
 hash32 = { workspace = true }
+iggy_binary_protocol = { path = "../binary_protocol" }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 4e03d0a4f..c6863a25d 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -19,8 +19,9 @@ mod router;
 pub mod shards_table;
 
 use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, 
VsrConsensus};
-use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader};
-use iggy_common::message::{Message, MessageBag};
+use iggy_binary_protocol::{
+    GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader, 
RequestHeader,
+};
 use iggy_common::sharding::IggyNamespace;
 use iggy_common::variadic;
 use journal::{Journal, JournalHandle};
@@ -301,7 +302,7 @@ where
     where
         B: MessageBus<
                 Replica = u8,
-                Data = 
iggy_common::message::Message<iggy_common::header::GenericHeader>,
+                Data = 
iggy_binary_protocol::Message<iggy_binary_protocol::GenericHeader>,
                 Client = u128,
             >,
     {
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 5e75cb9dc..7398e9e2d 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -18,8 +18,7 @@
 use crate::shards_table::ShardsTable;
 use crate::{IggyShard, Receiver, ShardFrame};
 use futures::FutureExt;
-use iggy_common::header::{ConsensusError, GenericHeader, PrepareHeader};
-use iggy_common::message::{Message, MessageBag};
+use iggy_binary_protocol::{ConsensusError, GenericHeader, Message, MessageBag, 
PrepareHeader};
 use iggy_common::sharding::IggyNamespace;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 559f5fae9..556e47388 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -26,6 +26,7 @@ bytes = { workspace = true }
 consensus = { path = "../consensus" }
 enumset = { workspace = true }
 futures = { workspace = true }
+iggy_binary_protocol = { path = "../binary_protocol" }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 371258aef..93c9ef0f6 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use iggy_common::{IggyError, header::GenericHeader, message::Message};
+use iggy_binary_protocol::{GenericHeader, Message};
+use iggy_common::IggyError;
 use message_bus::MessageBus;
 use std::collections::{HashSet, VecDeque};
 use std::ops::Deref;
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 6dc9e1eab..2a73de834 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -15,13 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iggy_binary_protocol::{Message, Operation, RequestHeader};
 use iggy_common::{
     BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier,
-    create_stream::CreateStream,
-    delete_stream::DeleteStream,
-    header::{Operation, RequestHeader},
-    message::Message,
-    sharding::IggyNamespace,
+    create_stream::CreateStream, delete_stream::DeleteStream, 
sharding::IggyNamespace,
 };
 use std::cell::Cell;
 
@@ -136,7 +133,7 @@ impl SimClient {
         let total_size = header_size + payload.len();
 
         let header = RequestHeader {
-            command: iggy_common::header::Command2::Request,
+            command: iggy_binary_protocol::Command2::Request,
             operation,
             size: total_size as u32,
             cluster: 0,
@@ -171,7 +168,7 @@ impl SimClient {
         let total_size = header_size + payload.len();
 
         let header = RequestHeader {
-            command: iggy_common::header::Command2::Request,
+            command: iggy_binary_protocol::Command2::Request,
             operation,
             size: total_size as u32,
             cluster: 0, // TODO: Get from config
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 9f4d64b79..76fdd1ae6 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -16,8 +16,7 @@
 // under the License.
 
 use bytes::Bytes;
-use iggy_common::header::PrepareHeader;
-use iggy_common::message::Message;
+use iggy_binary_protocol::{Message, PrepareHeader};
 use iggy_common::variadic;
 use journal::{Journal, JournalHandle, Storage};
 use metadata::MuxStateMachine;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 5e64aca81..21effc797 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -25,8 +25,7 @@ pub mod replica;
 
 use bus::MemBus;
 use consensus::PartitionsHandle;
-use iggy_common::header::ReplyHeader;
-use iggy_common::message::Message;
+use iggy_binary_protocol::{GenericHeader, Message, ReplyHeader};
 use iggy_common::sharding::IggyNamespace;
 use iggy_common::{IggyError, IggyMessagesBatchSet};
 use message_bus::MessageBus;
@@ -126,11 +125,7 @@ impl Simulator {
     }
 
     #[allow(clippy::future_not_send)]
-    async fn dispatch_to_replica(
-        &self,
-        replica: &Replica,
-        message: Message<iggy_common::header::GenericHeader>,
-    ) {
+    async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
         replica.on_message(message).await;
 
         let mut buf = Vec::new();
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index 1ff966253..cffd0a38f 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -15,9 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iggy_binary_protocol::{Message, ReplyHeader};
 use iggy_common::PollingStrategy;
-use iggy_common::header::ReplyHeader;
-use iggy_common::message::Message;
 use iggy_common::sharding::IggyNamespace;
 use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
 use message_bus::MessageBus;
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
index d5b7b7b61..687add486 100644
--- a/core/simulator/src/network.rs
+++ b/core/simulator/src/network.rs
@@ -24,7 +24,7 @@
 use crate::packet::{
     ALLOW_ALL, BLOCK_ALL, LinkFilter, Packet, PacketSimulator, 
PacketSimulatorOptions, ProcessId,
 };
-use iggy_common::{header::GenericHeader, message::Message};
+use iggy_binary_protocol::{GenericHeader, Message};
 
 /// Network layer for the cluster simulation.
 ///
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 9aba6e17a..1658628de 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -40,8 +40,7 @@
 
 use crate::ready_queue::{Ready, ReadyQueue};
 use enumset::EnumSet;
-use iggy_common::header::{Command2, GenericHeader};
-use iggy_common::message::Message;
+use iggy_binary_protocol::{Command2, GenericHeader, Message};
 use rand::RngExt;
 use rand_xoshiro::Xoshiro256Plus;
 use rand_xoshiro::rand_core::SeedableRng;

Reply via email to