This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 4e3029d6b refactor(consensus): consolidate VSR types in
binary_protocol (#3014)
4e3029d6b is described below
commit 4e3029d6be764b6de0cc3db2c45ad743dfb9608b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Mar 25 08:47:56 2026 +0100
refactor(consensus): consolidate VSR types in binary_protocol (#3014)
---
Cargo.lock | 8 +-
core/binary_protocol/src/consensus/message.rs | 113 +++-
core/binary_protocol/src/consensus/mod.rs | 1 +
core/binary_protocol/src/lib.rs | 6 +-
core/common/Cargo.toml | 2 -
core/common/src/lib.rs | 1 -
core/common/src/types/consensus/header.rs | 879 --------------------------
core/common/src/types/consensus/message.rs | 719 ---------------------
core/common/src/types/consensus/mod.rs | 19 -
core/common/src/types/mod.rs | 1 -
core/consensus/Cargo.toml | 1 +
core/consensus/src/impls.rs | 7 +-
core/consensus/src/lib.rs | 3 +-
core/consensus/src/namespaced_pipeline.rs | 5 +-
core/consensus/src/plane_helpers.rs | 7 +-
core/message_bus/Cargo.toml | 1 +
core/message_bus/src/lib.rs | 3 +-
core/metadata/Cargo.toml | 1 +
core/metadata/src/impls/metadata.rs | 8 +-
core/metadata/src/stm/mod.rs | 4 +-
core/metadata/src/stm/mux.rs | 5 +-
core/partitions/Cargo.toml | 1 +
core/partitions/src/iggy_partition.rs | 3 +-
core/partitions/src/iggy_partitions.rs | 9 +-
core/partitions/src/journal.rs | 7 +-
core/partitions/src/lib.rs | 4 +-
core/shard/Cargo.toml | 1 +
core/shard/src/lib.rs | 7 +-
core/shard/src/router.rs | 3 +-
core/simulator/Cargo.toml | 1 +
core/simulator/src/bus.rs | 3 +-
core/simulator/src/client.rs | 11 +-
core/simulator/src/deps.rs | 3 +-
core/simulator/src/lib.rs | 9 +-
core/simulator/src/main.rs | 3 +-
core/simulator/src/network.rs | 2 +-
core/simulator/src/packet.rs | 3 +-
37 files changed, 173 insertions(+), 1691 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index cdf2f962b..e69629314 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2231,6 +2231,7 @@ dependencies = [
"bytemuck",
"bytes",
"futures",
+ "iggy_binary_protocol",
"iggy_common",
"message_bus",
"rand 0.10.0",
@@ -5416,7 +5417,6 @@ dependencies = [
"blake3",
"bon",
"byte-unit",
- "bytemuck",
"bytes",
"chrono",
"clap",
@@ -5424,7 +5424,6 @@ dependencies = [
"compio",
"crossbeam",
"derive_more",
- "enumset",
"err_trail",
"human-repr",
"humantime",
@@ -6660,6 +6659,7 @@ dependencies = [
name = "message_bus"
version = "0.1.0"
dependencies = [
+ "iggy_binary_protocol",
"iggy_common",
"rand 0.10.0",
]
@@ -6671,6 +6671,7 @@ dependencies = [
"ahash 0.8.12",
"bytes",
"consensus",
+ "iggy_binary_protocol",
"iggy_common",
"journal",
"left-right",
@@ -7648,6 +7649,7 @@ version = "0.1.0"
dependencies = [
"bytes",
"consensus",
+ "iggy_binary_protocol",
"iggy_common",
"journal",
"message_bus",
@@ -9833,6 +9835,7 @@ dependencies = [
"crossfire",
"futures",
"hash32 1.0.0",
+ "iggy_binary_protocol",
"iggy_common",
"journal",
"message_bus",
@@ -9942,6 +9945,7 @@ dependencies = [
"consensus",
"enumset",
"futures",
+ "iggy_binary_protocol",
"iggy_common",
"journal",
"message_bus",
diff --git a/core/binary_protocol/src/consensus/message.rs
b/core/binary_protocol/src/consensus/message.rs
index d8f56d872..f63cf1c19 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -20,10 +20,26 @@
//! `Message<H>` wraps a `Bytes` buffer and provides typed access to the
//! header via `bytemuck` pointer cast (zero allocation, zero copy).
-use crate::consensus::{Command2, ConsensusError, ConsensusHeader,
GenericHeader};
+use crate::consensus::{
+ Command2, ConsensusError, ConsensusHeader, GenericHeader, PrepareHeader,
PrepareOkHeader,
+ RequestHeader,
+};
use bytes::Bytes;
use std::marker::PhantomData;
+/// Trait for types that provide access to a consensus header.
+pub trait ConsensusMessage<H: ConsensusHeader> {
+ fn header(&self) -> &H;
+}
+
+impl<H: ConsensusHeader> ConsensusMessage<H> for Message<H> {
+ fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::checked::try_from_bytes(header_bytes)
+ .expect("header validated at construction time")
+ }
+}
+
/// Zero-copy message wrapping a `Bytes` buffer with a typed header.
///
/// The header is accessed via `bytemuck::try_from_bytes` - a pointer cast,
@@ -197,6 +213,36 @@ impl<H: ConsensusHeader> Message<H> {
Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
}
+ /// Try to borrow as a different header type without consuming.
+ ///
+ /// # Errors
+ /// Returns `ConsensusError` if the header command doesn't match or
validation fails.
+ pub fn try_as_typed<T: ConsensusHeader>(&self) -> Result<&Message<T>,
ConsensusError> {
+ if self.buffer.len() < size_of::<T>() {
+ return Err(ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: Command2::Reserved,
+ });
+ }
+
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let header_bytes = &self.buffer[..size_of::<T>()];
+ bytemuck::checked::try_from_bytes::<T>(header_bytes)
+ .map_err(|_| ConsensusError::InvalidBitPattern)?
+ .validate()?;
+
+ // SAFETY: Message<H> and Message<T> have identical layout (#[repr(C)],
+ // differ only in PhantomData). Header validated above.
+ Ok(unsafe { &*std::ptr::from_ref(self).cast::<Message<T>>() })
+ }
+
/// Transmute the header to a different type, preserving the buffer.
///
/// The callback receives the old header (by value) and a mutable reference
@@ -240,6 +286,71 @@ impl<H: ConsensusHeader> Message<H> {
}
}
+/// Type-erased message bag for dispatching incoming consensus messages.
+#[derive(Debug)]
+pub enum MessageBag {
+ Request(Message<RequestHeader>),
+ Prepare(Message<PrepareHeader>),
+ PrepareOk(Message<PrepareOkHeader>),
+}
+
+impl MessageBag {
+ #[must_use]
+ pub fn command(&self) -> Command2 {
+ match self {
+ Self::Request(m) => m.header().command,
+ Self::Prepare(m) => m.header().command,
+ Self::PrepareOk(m) => m.header().command,
+ }
+ }
+
+ #[must_use]
+ pub fn size(&self) -> u32 {
+ match self {
+ Self::Request(m) => m.header().size(),
+ Self::Prepare(m) => m.header().size(),
+ Self::PrepareOk(m) => m.header().size(),
+ }
+ }
+
+ #[must_use]
+ pub fn operation(&self) -> crate::consensus::Operation {
+ match self {
+ Self::Request(m) => m.header().operation,
+ Self::Prepare(m) => m.header().operation,
+ Self::PrepareOk(m) => m.header().operation,
+ }
+ }
+}
+
+impl<T: ConsensusHeader> TryFrom<Message<T>> for MessageBag {
+ type Error = ConsensusError;
+
+ fn try_from(value: Message<T>) -> Result<Self, Self::Error> {
+ let command = value.as_generic().header().command;
+ let buffer = value.into_inner();
+
+ match command {
+ Command2::Request => {
+ let msg = unsafe {
Message::<RequestHeader>::from_buffer_unchecked(buffer) };
+ Ok(Self::Request(msg))
+ }
+ Command2::Prepare => {
+ let msg = unsafe {
Message::<PrepareHeader>::from_buffer_unchecked(buffer) };
+ Ok(Self::Prepare(msg))
+ }
+ Command2::PrepareOk => {
+ let msg = unsafe {
Message::<PrepareOkHeader>::from_buffer_unchecked(buffer) };
+ Ok(Self::PrepareOk(msg))
+ }
+ other => Err(ConsensusError::InvalidCommand {
+ expected: Command2::Reserved,
+ found: other,
+ }),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::Message;
diff --git a/core/binary_protocol/src/consensus/mod.rs
b/core/binary_protocol/src/consensus/mod.rs
index 11bead1ef..3840eec82 100644
--- a/core/binary_protocol/src/consensus/mod.rs
+++ b/core/binary_protocol/src/consensus/mod.rs
@@ -51,4 +51,5 @@ pub use header::{
CommitHeader, ConsensusHeader, DoViewChangeHeader, GenericHeader,
HEADER_SIZE, PrepareHeader,
PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader,
StartViewHeader,
};
+pub use message::{ConsensusMessage, MessageBag};
pub use operation::Operation;
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 7ed4fdad0..cce1e5d56 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -68,9 +68,9 @@ pub mod responses;
pub use codec::{WireDecode, WireEncode};
pub use consensus::{
- Command2, CommitHeader, ConsensusError, ConsensusHeader,
DoViewChangeHeader, GenericHeader,
- HEADER_SIZE, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader,
RequestHeader,
- StartViewChangeHeader, StartViewHeader, message::Message,
+ Command2, CommitHeader, ConsensusError, ConsensusHeader, ConsensusMessage,
DoViewChangeHeader,
+ GenericHeader, HEADER_SIZE, MessageBag, Operation, PrepareHeader,
PrepareOkHeader, ReplyHeader,
+ RequestHeader, StartViewChangeHeader, StartViewHeader, message::Message,
};
pub use dispatch::{COMMAND_TABLE, CommandMeta, lookup_by_operation,
lookup_command};
pub use error::WireError;
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index b7e610181..3af8fa87f 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -37,7 +37,6 @@ base64 = { workspace = true }
blake3 = { workspace = true }
bon = { workspace = true }
byte-unit = { workspace = true }
-bytemuck = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
@@ -45,7 +44,6 @@ comfy-table = { workspace = true }
compio = { workspace = true }
crossbeam = { workspace = true }
derive_more = { workspace = true }
-enumset = { workspace = true }
err_trail = { workspace = true }
human-repr = { workspace = true }
humantime = { workspace = true }
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index b1b8aeb09..c690b5a1c 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -96,7 +96,6 @@ pub use
types::configuration::websocket_config::websocket_client_config::*;
pub use
types::configuration::websocket_config::websocket_client_config_builder::*;
pub use
types::configuration::websocket_config::websocket_client_reconnection_config::*;
pub use
types::configuration::websocket_config::websocket_connection_string_options::*;
-pub use types::consensus::*;
pub use types::consumer::consumer_group::*;
pub use types::consumer::consumer_group_id::*;
pub use types::consumer::consumer_group_offsets::*;
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
deleted file mode 100644
index e7d62e758..000000000
--- a/core/common/src/types/consensus/header.rs
+++ /dev/null
@@ -1,879 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use bytemuck::{CheckedBitPattern, NoUninit};
-use enumset::EnumSetType;
-use thiserror::Error;
-
-const HEADER_SIZE: usize = 256;
-pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
- const COMMAND: Command2;
-
- fn validate(&self) -> Result<(), ConsensusError>;
- fn operation(&self) -> Operation;
- fn command(&self) -> Command2;
- fn size(&self) -> u32;
-}
-
-#[derive(Default, Debug, EnumSetType)]
-#[repr(u8)]
-pub enum Command2 {
- #[default]
- Reserved = 0,
-
- Ping = 1,
- Pong = 2,
- PingClient = 3,
- PongClient = 4,
-
- Request = 5,
- Prepare = 6,
- PrepareOk = 7,
- Reply = 8,
- Commit = 9,
-
- StartViewChange = 10,
- DoViewChange = 11,
- StartView = 12,
-}
-
-// SAFETY: Command2 is #[repr(u8)] with no padding bytes.
-unsafe impl NoUninit for Command2 {}
-
-// SAFETY: Command2 is #[repr(u8)]; is_valid_bit_pattern matches all defined
discriminants.
-unsafe impl CheckedBitPattern for Command2 {
- type Bits = u8;
-
- fn is_valid_bit_pattern(bits: &u8) -> bool {
- *bits <= 12
- }
-}
-
-#[derive(Debug, Clone, Error, PartialEq, Eq)]
-pub enum ConsensusError {
- #[error("invalid command: expected {expected:?}, found {found:?}")]
- InvalidCommand { expected: Command2, found: Command2 },
-
- #[error("invalid size: expected {expected:?}, found {found:?}")]
- InvalidSize { expected: u32, found: u32 },
-
- #[error("invalid checksum")]
- InvalidChecksum,
-
- #[error("invalid cluster ID")]
- InvalidCluster,
-
- #[error("invalid field: {0}")]
- InvalidField(String),
-
- #[error("parent_padding must be 0")]
- PrepareParentPaddingNonZero,
-
- #[error("request_checksum_padding must be 0")]
- PrepareRequestChecksumPaddingNonZero,
-
- #[error("command must be Commit")]
- CommitInvalidCommand2,
-
- #[error("size must be 256, found {0}")]
- CommitInvalidSize(u32),
-
- // ReplyHeader specific
- #[error("command must be Reply")]
- ReplyInvalidCommand2,
-
- #[error("request_checksum_padding must be 0")]
- ReplyRequestChecksumPaddingNonZero,
-
- #[error("context_padding must be 0")]
- ReplyContextPaddingNonZero,
-
- #[error("invalid bit pattern in header (enum discriminant out of range)")]
- InvalidBitPattern,
-}
-
-#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, NoUninit,
CheckedBitPattern)]
-#[repr(u8)]
-pub enum Operation {
- #[default]
- Reserved = 0,
- CreateStream = 128,
- UpdateStream = 129,
- DeleteStream = 130,
- PurgeStream = 131,
- CreateTopic = 132,
- UpdateTopic = 133,
- DeleteTopic = 134,
- PurgeTopic = 135,
- CreatePartitions = 136,
- DeletePartitions = 137,
- DeleteSegments = 138,
- CreateConsumerGroup = 139,
- DeleteConsumerGroup = 140,
- CreateUser = 141,
- UpdateUser = 142,
- DeleteUser = 143,
- ChangePassword = 144,
- UpdatePermissions = 145,
- CreatePersonalAccessToken = 146,
- DeletePersonalAccessToken = 147,
-
- // Partition operations (replicated via consensus)
- SendMessages = 160,
- StoreConsumerOffset = 161,
-}
-
-impl Operation {
- /// Returns `true` for metadata / control-plane operations (streams,
topics,
- /// users, consumer groups, etc.) that are always handled by shard 0.
- #[inline]
- pub fn is_metadata(&self) -> bool {
- matches!(
- self,
- Operation::CreateStream
- | Operation::UpdateStream
- | Operation::DeleteStream
- | Operation::PurgeStream
- | Operation::CreateTopic
- | Operation::UpdateTopic
- | Operation::DeleteTopic
- | Operation::PurgeTopic
- | Operation::CreatePartitions
- | Operation::DeletePartitions
- | Operation::CreateConsumerGroup
- | Operation::DeleteConsumerGroup
- | Operation::CreateUser
- | Operation::UpdateUser
- | Operation::DeleteUser
- | Operation::ChangePassword
- | Operation::UpdatePermissions
- | Operation::CreatePersonalAccessToken
- | Operation::DeletePersonalAccessToken
- )
- }
-
- /// Returns `true` for data-plane operations that are routed to the shard
- /// owning the partition identified by the message's namespace.
- #[inline]
- pub fn is_partition(&self) -> bool {
- matches!(
- self,
- Operation::SendMessages | Operation::StoreConsumerOffset |
Operation::DeleteSegments
- )
- }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct GenericHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub reserved_command: [u8; 128],
-}
-const _: () = {
- assert!(core::mem::size_of::<GenericHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(GenericHeader, reserved_command)
- == core::mem::offset_of!(GenericHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(GenericHeader, reserved_command) +
core::mem::size_of::<[u8; 128]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for GenericHeader {
- const COMMAND: Command2 = Command2::Reserved;
-
- fn operation(&self) -> Operation {
- Operation::Reserved
- }
-
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct RequestHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub client: u128,
- pub request_checksum: u128,
- pub timestamp: u64,
- pub request: u64,
- pub operation: Operation,
- pub operation_padding: [u8; 7],
- pub namespace: u64,
- pub reserved: [u8; 64],
-}
-const _: () = {
- assert!(core::mem::size_of::<RequestHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(RequestHeader, client)
- == core::mem::offset_of!(RequestHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(RequestHeader, reserved) +
core::mem::size_of::<[u8; 64]>()
- == HEADER_SIZE
- );
-};
-
-impl Default for RequestHeader {
- fn default() -> Self {
- Self {
- checksum: 0,
- checksum_body: 0,
- cluster: 0,
- size: 0,
- view: 0,
- release: 0,
- command: Command2::Reserved,
- replica: 0,
- reserved_frame: [0; 66],
- client: 0,
- request_checksum: 0,
- timestamp: 0,
- request: 0,
- operation: Operation::Reserved,
- operation_padding: [0; 7],
- namespace: 0,
- reserved: [0; 64],
- }
- }
-}
-
-impl ConsensusHeader for RequestHeader {
- const COMMAND: Command2 = Command2::Request;
-
- fn operation(&self) -> Operation {
- self.operation
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::Request {
- return Err(ConsensusError::InvalidCommand {
- expected: Command2::Request,
- found: self.command,
- });
- }
- Ok(())
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-// TODO: Manually impl default (and use a const for the `release`)
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct PrepareHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub client: u128,
- pub parent: u128,
- pub request_checksum: u128,
- pub op: u64,
- pub commit: u64,
- pub timestamp: u64,
- pub request: u64,
- pub operation: Operation,
- pub operation_padding: [u8; 7],
- pub namespace: u64,
- pub reserved: [u8; 32],
-}
-const _: () = {
- assert!(core::mem::size_of::<PrepareHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(PrepareHeader, client)
- == core::mem::offset_of!(PrepareHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(PrepareHeader, reserved) +
core::mem::size_of::<[u8; 32]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for PrepareHeader {
- const COMMAND: Command2 = Command2::Prepare;
-
- fn operation(&self) -> Operation {
- self.operation
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::Prepare {
- return Err(ConsensusError::InvalidCommand {
- expected: Command2::Prepare,
- found: self.command,
- });
- }
- Ok(())
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-impl Default for PrepareHeader {
- fn default() -> Self {
- Self {
- checksum: 0,
- checksum_body: 0,
- cluster: 0,
- size: 0,
- view: 0,
- release: 0,
- command: Command2::Reserved,
- replica: 0,
- reserved_frame: [0; 66],
- client: 0,
- parent: 0,
- request_checksum: 0,
- op: 0,
- commit: 0,
- timestamp: 0,
- request: 0,
- operation: Operation::Reserved,
- operation_padding: [0; 7],
- namespace: 0,
- reserved: [0; 32],
- }
- }
-}
-
-// TODO: Manually impl default (and use a const for the `release`)
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct PrepareOkHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub parent: u128,
- pub prepare_checksum: u128,
- pub op: u64,
- pub commit: u64,
- pub timestamp: u64,
- pub request: u64,
- pub operation: Operation,
- pub operation_padding: [u8; 7],
- pub namespace: u64,
- pub reserved: [u8; 48],
-}
-const _: () = {
- assert!(core::mem::size_of::<PrepareOkHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(PrepareOkHeader, parent)
- == core::mem::offset_of!(PrepareOkHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(PrepareOkHeader, reserved) +
core::mem::size_of::<[u8; 48]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for PrepareOkHeader {
- const COMMAND: Command2 = Command2::PrepareOk;
-
- fn operation(&self) -> Operation {
- self.operation
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::PrepareOk {
- return Err(ConsensusError::InvalidCommand {
- expected: Command2::PrepareOk,
- found: self.command,
- });
- }
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-impl Default for PrepareOkHeader {
- fn default() -> Self {
- Self {
- checksum: 0,
- checksum_body: 0,
- cluster: 0,
- size: 0,
- view: 0,
- release: 0,
- command: Command2::Reserved,
- replica: 0,
- reserved_frame: [0; 66],
- parent: 0,
- prepare_checksum: 0,
- op: 0,
- commit: 0,
- timestamp: 0,
- request: 0,
- operation: Operation::Reserved,
- operation_padding: [0; 7],
- namespace: 0,
- reserved: [0; 48],
- }
- }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct CommitHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub commit_checksum: u128,
- pub timestamp_monotonic: u64,
- pub commit: u64,
- pub checkpoint_op: u64,
- pub namespace: u64,
- pub reserved: [u8; 80],
-}
-const _: () = {
- assert!(core::mem::size_of::<CommitHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(CommitHeader, commit_checksum)
- == core::mem::offset_of!(CommitHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(CommitHeader, reserved) +
core::mem::size_of::<[u8; 80]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for CommitHeader {
- const COMMAND: Command2 = Command2::Commit;
-
- fn operation(&self) -> Operation {
- Operation::Reserved
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::Commit {
- return Err(ConsensusError::CommitInvalidCommand2);
- }
- if self.size != 256 {
- return Err(ConsensusError::CommitInvalidSize(self.size));
- }
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-#[repr(C)]
-#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
-pub struct ReplyHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub request_checksum: u128,
- pub context: u128,
- pub op: u64,
- pub commit: u64,
- pub timestamp: u64,
- pub request: u64,
- pub operation: Operation,
- pub operation_padding: [u8; 7],
- pub namespace: u64,
- pub reserved: [u8; 48],
-}
-const _: () = {
- assert!(core::mem::size_of::<ReplyHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(ReplyHeader, request_checksum)
- == core::mem::offset_of!(ReplyHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(ReplyHeader, reserved) +
core::mem::size_of::<[u8; 48]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for ReplyHeader {
- const COMMAND: Command2 = Command2::Reply;
-
- fn operation(&self) -> Operation {
- self.operation
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::Reply {
- return Err(ConsensusError::ReplyInvalidCommand2);
- }
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-impl Default for ReplyHeader {
- fn default() -> Self {
- Self {
- checksum: 0,
- checksum_body: 0,
- cluster: 0,
- size: 0,
- view: 0,
- release: 0,
- command: Command2::Reserved,
- replica: 0,
- reserved_frame: [0; 66],
- request_checksum: 0,
- context: 0,
- op: 0,
- commit: 0,
- timestamp: 0,
- request: 0,
- operation: Operation::Reserved,
- operation_padding: [0; 7],
- namespace: 0,
- reserved: [0; 48],
- }
- }
-}
-
-/// StartViewChange message header.
-///
-/// Sent by a replica when it suspects the primary has failed.
-/// This is a header-only message with no body.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
-#[repr(C)]
-pub struct StartViewChangeHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- pub namespace: u64,
- pub reserved: [u8; 120],
-}
-const _: () = {
- assert!(core::mem::size_of::<StartViewChangeHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(StartViewChangeHeader, namespace)
- == core::mem::offset_of!(StartViewChangeHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(StartViewChangeHeader, reserved) +
core::mem::size_of::<[u8; 120]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for StartViewChangeHeader {
- const COMMAND: Command2 = Command2::StartViewChange;
-
- fn operation(&self) -> Operation {
- Operation::Reserved
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::StartViewChange {
- return Err(ConsensusError::InvalidCommand {
- expected: Command2::StartViewChange,
- found: self.command,
- });
- }
-
- if self.release != 0 {
- return Err(ConsensusError::InvalidField("release !=
0".to_string()));
- }
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-/// DoViewChange message header.
-///
-/// Sent by replicas to the primary candidate after collecting a quorum of
-/// StartViewChange messages.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
-#[repr(C)]
-pub struct DoViewChangeHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- /// The highest op-number in this replica's log.
- /// Used to select the most complete log when log_view values are equal.
- pub op: u64,
- /// The replica's commit number (highest committed op).
- /// The new primary sets its commit to max(commit) across all DVCs.
- pub commit: u64,
- pub namespace: u64,
- /// The view number when this replica's status was last normal.
- /// This is the key field for log selection: the replica with the
- /// highest log_view has the most authoritative log.
- pub log_view: u32,
- pub reserved: [u8; 100],
-}
-const _: () = {
- assert!(core::mem::size_of::<DoViewChangeHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(DoViewChangeHeader, op)
- == core::mem::offset_of!(DoViewChangeHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(DoViewChangeHeader, reserved) +
core::mem::size_of::<[u8; 100]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for DoViewChangeHeader {
- const COMMAND: Command2 = Command2::DoViewChange;
-
- fn operation(&self) -> Operation {
- Operation::Reserved
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::DoViewChange {
- return Err(ConsensusError::InvalidCommand {
- expected: Command2::DoViewChange,
- found: self.command,
- });
- }
-
- if self.release != 0 {
- return Err(ConsensusError::InvalidField(
- "release must be 0".to_string(),
- ));
- }
-
- // log_view must be <= view (can't have been normal in a future view)
- if self.log_view > self.view {
- return Err(ConsensusError::InvalidField(
- "log_view cannot exceed view".to_string(),
- ));
- }
-
- // commit must be <= op (can't commit what we haven't seen)
- if self.commit > self.op {
- return Err(ConsensusError::InvalidField(
- "commit cannot exceed op".to_string(),
- ));
- }
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
-
-/// StartView message header.
-///
-/// Sent by the new primary to all replicas after collecting a quorum of
-/// DoViewChange messages.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
-#[repr(C)]
-pub struct StartViewHeader {
- pub checksum: u128,
- pub checksum_body: u128,
- pub cluster: u128,
- pub size: u32,
- pub view: u32,
- pub release: u32,
- pub command: Command2,
- pub replica: u8,
- pub reserved_frame: [u8; 66],
-
- /// The op-number of the highest entry in the new primary's log.
- /// Backups set their op to this value.
- pub op: u64,
- /// The commit number.
- /// This is max(commit) from all DVCs received by the primary.
- /// Backups set their commit to this value.
- pub commit: u64,
- pub namespace: u64,
- pub reserved: [u8; 104],
-}
-const _: () = {
- assert!(core::mem::size_of::<StartViewHeader>() == HEADER_SIZE);
- // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
- assert!(
- core::mem::offset_of!(StartViewHeader, op)
- == core::mem::offset_of!(StartViewHeader, reserved_frame)
- + core::mem::size_of::<[u8; 66]>()
- );
- // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
- assert!(
- core::mem::offset_of!(StartViewHeader, reserved) +
core::mem::size_of::<[u8; 104]>()
- == HEADER_SIZE
- );
-};
-
-impl ConsensusHeader for StartViewHeader {
- const COMMAND: Command2 = Command2::StartView;
-
- fn operation(&self) -> Operation {
- Operation::Reserved
- }
- fn command(&self) -> Command2 {
- self.command
- }
-
- fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command2::StartView {
- return Err(ConsensusError::InvalidCommand {
- expected: Command2::StartView,
- found: self.command,
- });
- }
-
- if self.release != 0 {
- return Err(ConsensusError::InvalidField(
- "release must be 0".to_string(),
- ));
- }
-
- // commit must be <= op
- if self.commit > self.op {
- return Err(ConsensusError::InvalidField(
- "commit cannot exceed op".to_string(),
- ));
- }
- Ok(())
- }
-
- fn size(&self) -> u32 {
- self.size
- }
-}
diff --git a/core/common/src/types/consensus/message.rs
b/core/common/src/types/consensus/message.rs
deleted file mode 100644
index 5a49671db..000000000
--- a/core/common/src/types/consensus/message.rs
+++ /dev/null
@@ -1,719 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::{
- header::RequestHeader,
- types::consensus::header::{self, ConsensusHeader, PrepareHeader,
PrepareOkHeader},
-};
-use bytes::Bytes;
-use std::marker::PhantomData;
-
-// TODO: Rename this to Message and ConsensusHeader to Header.
-pub trait ConsensusMessage<H>
-where
- H: ConsensusHeader,
-{
- // TODO: fn body(&self) -> Something;
- fn header(&self) -> &H;
-}
-
-impl<H> ConsensusMessage<H> for Message<H>
-where
- H: ConsensusHeader,
-{
- fn header(&self) -> &H {
- let header_bytes = &self.buffer[..size_of::<H>()];
- bytemuck::checked::try_from_bytes(header_bytes)
- .expect("header validated at construction time")
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct Message<H: ConsensusHeader> {
- buffer: Bytes,
- _marker: PhantomData<H>,
-}
-
-impl<H> Message<H>
-where
- H: ConsensusHeader,
-{
- #[inline]
- #[allow(unused)]
- pub fn header(&self) -> &H {
- let header_bytes = &self.buffer[..size_of::<H>()];
- bytemuck::checked::try_from_bytes(header_bytes)
- .expect("header validated at construction time")
- }
-
- /// Create a new message from a buffer.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - buffer is too small for the header
- /// - buffer contains invalid bit patterns (enum discriminants)
- /// - buffer is too small for the size specified in the header
- /// - header validation fails
- #[allow(unused)]
- pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
- // verify minimum size
- if buffer.len() < size_of::<H>() {
- return Err(header::ConsensusError::InvalidCommand {
- expected: H::COMMAND,
- found: header::Command2::Reserved,
- });
- }
-
- // Validate bit patterns (enum discriminants) via try_from_bytes
- let header_bytes = &buffer[..size_of::<H>()];
- let header = bytemuck::checked::try_from_bytes::<H>(header_bytes)
- .map_err(|_| header::ConsensusError::InvalidBitPattern)?;
-
- // validate the header
- header.validate()?;
-
- // verify buffer size matches header.size
- let header_size = header.size() as usize;
- if buffer.len() < header_size {
- return Err(header::ConsensusError::InvalidCommand {
- expected: H::COMMAND,
- found: header::Command2::Reserved,
- });
- }
-
- Ok(Self {
- buffer,
- _marker: PhantomData,
- })
- }
-
- /// Create a new message with a specific size, initializing the buffer
with zeros.
- ///
- /// The header will be zeroed and must be initialized by the caller.
- #[allow(unused)]
- pub fn new(size: usize) -> Self {
- assert!(size >= size_of::<H>(), "Size must be at least header size");
- let buffer = Bytes::from(vec![0u8; size]);
- Self {
- buffer,
- _marker: PhantomData,
- }
- }
-
- /// Transmute the header to a different type, using the provided function
to modify the new header.
- pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut
T)) -> Message<T> {
- assert_eq!(size_of::<H>(), size_of::<T>());
-
- // Copy old header to stack to avoid UB.
- let old_header = *self.header();
-
- // Safety: We ensured that size_of::<H>() == size_of::<T>()
- // On top of that, there is going to be only one reference to buffer
during this function call
- // so no other references can observe the mutation.
- // In the future we can replace the `Bytes` buffer with something that
does not allow sharing between different threads.
- let buffer = self.into_inner();
- unsafe {
- let ptr = buffer.as_ptr() as *mut u8;
- let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
- // Zero out to ensure valid bit patterns before creating a typed
reference.
- slice.fill(0);
- let new_header =
- bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed
bytes are valid");
- f(old_header, new_header);
- }
-
- Message {
- buffer,
- _marker: PhantomData,
- }
- }
-
- /// Get a reference to the message body (everything after the header).
- ///
- /// Returns an empty slice if there is no body.
- #[inline]
- #[allow(unused)]
- pub fn body(&self) -> &[u8] {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
-
- if total_size > header_size {
- &self.buffer[header_size..total_size]
- } else {
- &[]
- }
- }
-
- /// Get the message body as zero-copy `Bytes`.
- ///
- /// Returns an empty `Bytes` if there is no body.
- #[inline]
- #[allow(unused)]
- pub fn body_bytes(&self) -> Bytes {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
-
- if total_size > header_size {
- self.buffer.slice(header_size..total_size)
- } else {
- Bytes::new()
- }
- }
-
- /// Get the complete message as bytes (header + body).
- #[inline]
- #[allow(unused)]
- pub fn as_bytes(&self) -> &[u8] {
- let total_size = self.header().size() as usize;
- &self.buffer[..total_size]
- }
-
- /// Convert into the underlying buffer.
- #[inline]
- #[allow(unused)]
- pub fn into_inner(self) -> Bytes {
- self.buffer
- }
-
- /// Create a message from a buffer without validation.
- ///
- /// # Safety
- ///
- /// This is private and skips validation. Use only when:
- /// - The buffer is already validated
- /// - If doing a zero-cost type conversion (like to GenericHeader)
- #[inline]
- #[allow(unused)]
- unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
- Self {
- buffer,
- _marker: PhantomData,
- }
- }
-
- /// Convert to a generic message (erasing the specific header type).
- ///
- /// This allows treating any message as a generic message for common
operations.
- #[allow(unused)]
- pub fn into_generic(self) -> Message<header::GenericHeader> {
- unsafe { Message::from_buffer_unchecked(self.buffer) }
- }
-
- /// Get a reference to this message as a generic message.
- #[allow(unused)]
- pub fn as_generic(&self) -> &Message<header::GenericHeader> {
- // SAFETY: Message<H> and Message<GenericHeader> have the same memory
layout
- // because they only differ in the PhantomData type parameter
- unsafe { &*(self as *const Self as *const
Message<header::GenericHeader>) }
- }
-
- /// Try to convert this message to a different header type.
- ///
- /// This validates that the command in the header matches the target type's
- /// expected command before performing the conversion.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - The command doesn't match the target type
- /// - The target header validation fails
- #[allow(unused)]
- pub fn try_into_typed<T>(self) -> Result<Message<T>,
header::ConsensusError>
- where
- T: ConsensusHeader,
- {
- if self.buffer.len() < size_of::<T>() {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: header::Command2::Reserved,
- });
- }
-
- let generic = self.as_generic();
- if generic.header().command != T::COMMAND {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: generic.header().command,
- });
- }
-
- // Validate bit patterns for the target type
- let header_bytes = &self.buffer[..size_of::<T>()];
- let header = bytemuck::checked::try_from_bytes::<T>(header_bytes)
- .map_err(|_| header::ConsensusError::InvalidBitPattern)?;
-
- header.validate()?;
-
- Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
- }
-
- /// Try to get a reference to this message as a different header type.
- ///
- /// This is similar to `try_into_typed` but borrows instead of consuming.
- #[allow(unused)]
- pub fn try_as_typed<T>(&self) -> Result<&Message<T>,
header::ConsensusError>
- where
- T: ConsensusHeader,
- {
- // check buffer size
- if self.buffer.len() < size_of::<T>() {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: header::Command2::Reserved,
- });
- }
-
- // check the command matches
- let generic = self.as_generic();
- if generic.header().command != T::COMMAND {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: generic.header().command,
- });
- }
-
- // Validate bit patterns for the target type
- let header_bytes = &self.buffer[..size_of::<T>()];
- bytemuck::checked::try_from_bytes::<T>(header_bytes)
- .map_err(|_| header::ConsensusError::InvalidBitPattern)?
- .validate()?;
-
- let typed_message = unsafe { &*(self as *const Self as *const
Message<T>) };
-
- Ok(typed_message)
- }
-}
-
-#[derive(Debug)]
-#[allow(unused)]
-pub enum MessageBag {
- Request(Message<RequestHeader>),
- Prepare(Message<PrepareHeader>),
- PrepareOk(Message<PrepareOkHeader>),
-}
-
-impl MessageBag {
- #[allow(unused)]
- pub fn command(&self) -> header::Command2 {
- match self {
- MessageBag::Request(message) => message.header().command,
- MessageBag::Prepare(message) => message.header().command,
- MessageBag::PrepareOk(message) => message.header().command,
- }
- }
-
- #[allow(unused)]
- pub fn size(&self) -> u32 {
- match self {
- MessageBag::Request(message) => message.header().size(),
- MessageBag::Prepare(message) => message.header().size(),
- MessageBag::PrepareOk(message) => message.header().size(),
- }
- }
-
- #[allow(unused)]
- pub fn operation(&self) -> header::Operation {
- match self {
- MessageBag::Request(message) => message.header().operation,
- MessageBag::Prepare(message) => message.header().operation,
- MessageBag::PrepareOk(message) => message.header().operation,
- }
- }
-}
-
-impl<T> TryFrom<Message<T>> for MessageBag
-where
- T: ConsensusHeader,
-{
- type Error = header::ConsensusError;
-
- fn try_from(value: Message<T>) -> Result<Self, Self::Error> {
- let command = value.as_generic().header().command;
- let buffer = value.into_inner();
-
- // SAFETY: All Message<H> types have identical memory layout (only
PhantomData differs).
- // We've validated the command when the original message was created.
- match command {
- header::Command2::Prepare => {
- let msg =
- unsafe {
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
- Ok(MessageBag::Prepare(msg))
- }
- header::Command2::Request => {
- let msg =
- unsafe {
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
- Ok(MessageBag::Request(msg))
- }
- header::Command2::PrepareOk => {
- let msg =
- unsafe {
Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) };
- Ok(MessageBag::PrepareOk(msg))
- }
- other => Err(header::ConsensusError::InvalidCommand {
- expected: header::Command2::Reserved,
- found: other,
- }),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use bytes::BytesMut;
-
- use super::*;
-
- trait MessageFactory: ConsensusHeader + Sized {
- fn create_test() -> Message<Self>;
- }
-
- impl MessageFactory for header::GenericHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let body_size = 128;
- let total_size = header_size + body_size;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut
buffer[..header_size])
- .expect("zeroed bytes are valid");
-
- header.checksum = 123456;
- header.cluster = 12345;
- header.size = total_size as u32;
- header.command = header::Command2::Reserved;
-
- for (i, item) in buffer
- .iter_mut()
- .enumerate()
- .take(total_size)
- .skip(header_size)
- {
- *item = (i % 256) as u8;
- }
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- impl MessageFactory for header::PrepareHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let body_size = 64;
- let total_size = header_size + body_size;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- // Zeroed bytes are valid (Command2::Reserved=0,
Operation::Reserved=0).
- let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut
buffer[..header_size])
- .expect("zeroed bytes are valid");
-
- header.checksum = 123456;
- header.checksum_body = 789012;
- header.cluster = 12345;
- header.size = total_size as u32;
- header.view = 1;
- header.command = header::Command2::Prepare;
- header.replica = 1;
- header.op = 100;
- header.commit = 99;
- header.timestamp = 1234567890;
- header.operation = header::Operation::CreateStream;
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- impl MessageFactory for header::CommitHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let total_size = 256;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut
buffer[..header_size])
- .expect("zeroed bytes are valid");
-
- header.checksum = 123456;
- header.cluster = 12345;
- header.size = 256;
- header.view = 1;
- header.command = header::Command2::Commit;
- header.replica = 2;
- header.commit = 50;
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- impl MessageFactory for header::ReplyHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let body_size = 32;
- let total_size = header_size + body_size;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::checked::try_from_bytes_mut::<Self>(&mut
buffer[..header_size])
- .expect("zeroed bytes are valid");
-
- header.checksum = 123456;
- header.cluster = 12345;
- header.size = total_size as u32;
- header.view = 1;
- header.command = header::Command2::Reply;
- header.replica = 3;
- header.op = 100;
- header.commit = 99;
- header.operation = header::Operation::CreateStream;
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- #[test]
- fn test_message_creation_and_access() {
- let message = header::GenericHeader::create_test();
-
- assert_eq!(message.header().cluster, 12345);
- assert_eq!(message.header().command, header::Command2::Reserved);
- assert_eq!(
- message.body().len(),
- message.header().size() as usize -
size_of::<header::GenericHeader>()
- );
-
- let body = message.body();
- let header_size = size_of::<header::GenericHeader>();
- for (i, &byte) in body.iter().enumerate() {
- let expected = ((i + header_size) % 256) as u8;
- assert_eq!(byte, expected);
- }
- }
-
- #[test]
- fn test_message_conversion() {
- let prepare_message = header::PrepareHeader::create_test();
-
- let original_bytes = prepare_message.as_bytes().to_vec();
-
- let generic_message = prepare_message.into_generic();
- assert_eq!(generic_message.header().command,
header::Command2::Prepare);
-
- let prepare_again: Message<header::PrepareHeader> =
- generic_message.try_into_typed().unwrap();
-
- assert_eq!(prepare_again.header().op, 100);
- assert_eq!(prepare_again.header().view, 1);
- assert_eq!(prepare_again.header().cluster, 12345);
-
- let roundtrip_bytes = prepare_again.as_bytes().to_vec();
-
- assert_eq!(
- original_bytes, roundtrip_bytes,
- "Bytes should be identical after round-trip conversion"
- );
- }
-
- #[test]
- fn test_message_bag_from_prepare() {
- let prepare = header::PrepareHeader::create_test();
- let bag = MessageBag::try_from(prepare).expect("valid prepare
message");
-
- assert_eq!(bag.command(), header::Command2::Prepare);
- assert!(matches!(bag, MessageBag::Prepare(_)));
- }
-}
-
-// TODO: Header generic
-// TODO: We will have to impl something like this (NOT ONE TO ONE JUST A
SKETCH) as we will use `bytemuck`:
-/*
-// Generic Message type
-#[repr(C)]
-pub struct Message<H: Header = GenericHeader> {
- buffer: AlignedBuffer<ALIGNED_TO_HEADER_SIZE>,
- _phantom: PhantomData<H>,
-}
-
-// Trait that all headers must implement
-pub trait Header: Sized {
- const COMMAND: Command2;
-
- fn size(&self) -> u32;
- fn command(&self) -> Command2;
- fn checksum(&self) -> u128;
-}
-
-// Command2 enum (simplified)
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-#[repr(u8)]
-pub enum Command2 {
- reserved = 0,
-
- ping = 1,
- pong = 2,
-
- request = 5,
- prepare = 6,
- prepare_ok = 7,
- reply = 8,
- commit = 9,
-
- start_view_change = 10,
- do_view_change = 11,
- start_view = 24,
- // ... etc
-}
-
-// Generic header for base Message
-#[repr(C)]
-pub struct GenericHeader {
- checksum: u128,
- command: Command2,
- size: u32,
- // ... other fields
-}
-
-// first 128 bytes GenericHeader (checksum, command, size, etc....)
-// second 128 bytes specific header (PrepareHeader, CommitHeader, etc....)
-
-impl Header for GenericHeader {
- const COMMAND: Command2 = Command::Reserved;
-
- fn size(&self) -> u32 { self.size }
- fn command(&self) -> Command2 { self.command }
- fn checksum(&self) -> u128 { self.checksum }
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
- checksum: u128,
- command: Command2,
- size: u32,
- // ... prepare-specific fields
- view: u32,
- op: u64,
- commit: u64,
-}
-
-impl Header for PrepareHeader {
- const COMMAND: Command2 = Command::Prepare;
-
- fn size(&self) -> u32 { self.size }
- fn command(&self) -> Command2 { self.command }
- fn checksum(&self) -> u128 { self.checksum }
-}
-*/
-
-// And then for Message impl
-/*
-impl<H: Header> Message<H> {
- // Access header (no stored pointer needed!)
- pub fn header(&self) -> &H {
- assert!(size_of::<H>() <= ALIGNED_TO_HEADER_SIZE);
- unsafe { &*(self.buffer.as_ptr() as *const H) }
- }
-
- pub fn header_mut(&mut self) -> &mut H {
- unsafe { &mut *(self.buffer.as_mut_ptr() as *mut H) }
- }
-
- pub fn body(&self) -> &[u8] {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
- &self.buffer[header_size..total_size]
- }
-
- pub fn body_mut(&mut self) -> &mut [u8] {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
- &mut self.buffer.as_mut()[header_size..total_size]
- }
-
- pub fn as_bytes(&self) -> &[u8] {
- &self.buffer[..self.header().size() as usize]
- }
-
- pub fn base(&self) -> &Message<GenericHeader> {
- unsafe { &*(self as *const Self as *const Message<GenericHeader>) }
- }
-
- pub fn base_mut(&mut self) -> &mut Message<GenericHeader> {
- unsafe { &mut *(self as *mut Self as *mut Message<GenericHeader>) }
- }
-
- pub fn try_into<T: Header>(self) -> Option<Message<T>> {
- if self.header().command() == T::COMMAND {
- Some(unsafe { std::mem::transmute(self) })
- } else {
- None
- }
- }
-}
-*/
-
-// Then for the `Request` header we will have to store an `Operation` enum
-/*
- Define your operation enum
-#[repr(u8)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum Operation {
- // Metadata operations
- CreateTopic = 1,
- DeleteTopic = 2,
- ListTopics = 3,
- CreatePartition = 4,
-
- // Partition operations
- Produce = 100,
- Consume = 101,
- Fetch = 102,
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
- checksum: u128,
- command: Command2,
- size: u32,
- // ... prepare-specific fields
- view: u32,
- op: u64,
- commit: u64,
-
- // second 128 bytes
- operation: Operation,
-}
-*/
-
-// Which will have an method that returns discriminator between Metadata and
Partition requests
-
-// TODO: Fill this enum
-// #[expect(unused)]
-// pub enum MessageBag {
-// Void,
-// }
-
-// impl<H> From<Message<H>> for MessageBag
-// where
-// H: ConsensusHeader,
-// {
-// fn from(_value: Message<H>) -> Self {
-// MessageBag::Void
-// }
-// }
diff --git a/core/common/src/types/consensus/mod.rs
b/core/common/src/types/consensus/mod.rs
deleted file mode 100644
index 61bfb7bcb..000000000
--- a/core/common/src/types/consensus/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod header;
-pub mod message;
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 1aaf2aae7..01b19de7b 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -22,7 +22,6 @@ pub(crate) mod cluster;
pub(crate) mod command;
pub(crate) mod compression;
pub(crate) mod configuration;
-pub(crate) mod consensus;
pub(crate) mod consumer;
pub(crate) mod diagnostic;
pub(crate) mod either;
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 2695f624c..ddf7a6764 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -31,6 +31,7 @@ readme = "../../../README.md"
bit-set = { workspace = true }
bytemuck = { workspace = true }
bytes = { workspace = true }
+iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
message_bus = { workspace = true }
rand = { workspace = true }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 882340d30..ea2ac7c61 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -21,11 +21,10 @@ use crate::{
dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner,
};
use bit_set::BitSet;
-use iggy_common::header::{
- Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader,
PrepareHeader, PrepareOkHeader,
- RequestHeader, StartViewChangeHeader, StartViewHeader,
+use iggy_binary_protocol::{
+ Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader, Message,
PrepareHeader,
+ PrepareOkHeader, RequestHeader, StartViewChangeHeader, StartViewHeader,
};
-use iggy_common::message::Message;
use message_bus::IggyMessageBus;
use message_bus::MessageBus;
use std::cell::{Cell, RefCell};
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index dcc5cbc6d..31ef7c6dd 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use iggy_common::header::ConsensusHeader;
-use iggy_common::message::ConsensusMessage;
+use iggy_binary_protocol::{ConsensusHeader, ConsensusMessage};
use message_bus::MessageBus;
pub trait Project<T, C: Consensus> {
diff --git a/core/consensus/src/namespaced_pipeline.rs
b/core/consensus/src/namespaced_pipeline.rs
index 2eb90125b..7e77d3e16 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -17,8 +17,7 @@
use crate::Pipeline;
use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX, PipelineEntry};
-use iggy_common::header::PrepareHeader;
-use iggy_common::message::Message;
+use iggy_binary_protocol::{Message, PrepareHeader};
use std::collections::{HashMap, VecDeque};
/// Pipeline that partitions entries by namespace for independent commit
draining.
@@ -313,7 +312,7 @@ impl Pipeline for NamespacedPipeline {
#[cfg(test)]
mod tests {
use super::*;
- use iggy_common::header::Command2;
+ use iggy_binary_protocol::Command2;
fn make_prepare(
op: u64,
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 31356a448..fdfc42c31 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -16,8 +16,9 @@
// under the License.
use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status,
VsrConsensus};
-use iggy_common::header::{Command2, GenericHeader, PrepareHeader,
PrepareOkHeader, ReplyHeader};
-use iggy_common::message::Message;
+use iggy_binary_protocol::{
+ Command2, GenericHeader, Message, PrepareHeader, PrepareOkHeader,
ReplyHeader,
+};
use message_bus::MessageBus;
use std::ops::AsyncFnOnce;
@@ -477,7 +478,7 @@ mod tests {
#[test]
fn loopback_cleared_on_complete_view_change_as_primary() {
- use iggy_common::header::{DoViewChangeHeader, StartViewChangeHeader};
+ use iggy_binary_protocol::{DoViewChangeHeader, StartViewChangeHeader};
// 3 replicas, replica 0 is primary for view 0 (and view 3: 3 % 3 = 0).
let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index aabb56543..d43225bfb 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy"
readme = "../../../README.md"
[dependencies]
+iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
rand = { workspace = true }
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 876061bc4..efb3a7e6c 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -19,7 +19,8 @@ mod cache;
use crate::cache::connection::{
ConnectionCache, Coordinator, LeastLoadedStrategy, ShardedConnections,
};
-use iggy_common::{IggyError, SenderKind, TcpSender, header::GenericHeader,
message::Message};
+use iggy_binary_protocol::{GenericHeader, Message};
+use iggy_common::{IggyError, SenderKind, TcpSender};
use std::{collections::HashMap, rc::Rc};
/// Message bus parameterized by allocation strategy and sharded state
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index e0d024aaf..3b12f52e1 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -31,6 +31,7 @@ readme = "../../../README.md"
ahash = { workspace = true }
bytes = { workspace = true }
consensus = { workspace = true }
+iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
journal = { workspace = true }
left-right = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 9751eb0c7..668f49766 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -23,11 +23,9 @@ use consensus::{
pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
send_prepare_ok as send_prepare_ok_common,
};
-use iggy_common::{
- header::{
- Command2, ConsensusHeader, GenericHeader, PrepareHeader,
PrepareOkHeader, RequestHeader,
- },
- message::Message,
+use iggy_binary_protocol::{
+ Command2, ConsensusHeader, GenericHeader, Message, PrepareHeader,
PrepareOkHeader,
+ RequestHeader,
};
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e25aaaf65..68f36f3a0 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -241,13 +241,13 @@ macro_rules! collect_handlers {
impl $crate::stm::Command for [<$state Inner>] {
type Cmd = [<$state Command>];
- type Input =
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+ type Input =
::iggy_binary_protocol::Message<::iggy_binary_protocol::PrepareHeader>;
type Error = ::iggy_common::IggyError;
fn parse(input: Self::Input) ->
Result<::iggy_common::Either<Self::Cmd, Self::Input>, Self::Error> {
use ::iggy_common::BytesSerializable;
use ::iggy_common::Either;
- use ::iggy_common::header::Operation;
+ use ::iggy_binary_protocol::Operation;
match input.header().operation {
$(
Operation::$operation => {
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 4b6127873..89a545297 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -16,9 +16,9 @@
// under the License.
use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
+use iggy_binary_protocol::{Message, PrepareHeader};
use iggy_common::Either;
use iggy_common::variadic;
-use iggy_common::{header::PrepareHeader, message::Message};
use crate::stm::{State, StateMachine};
@@ -143,8 +143,7 @@ mod tests {
use crate::stm::mux::MuxStateMachine;
use crate::stm::stream::{Streams, StreamsInner};
use crate::stm::user::{Users, UsersInner};
- use iggy_common::header::PrepareHeader;
- use iggy_common::message::Message;
+ use iggy_binary_protocol::{Message, PrepareHeader};
let users: Users = UsersInner::new().into();
let streams: Streams = StreamsInner::new().into();
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index c355ea509..2362abb73 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -30,6 +30,7 @@ readme = "../../README.md"
[dependencies]
bytes = { workspace = true }
consensus = { workspace = true }
+iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
journal = { workspace = true }
message_bus = { workspace = true }
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 0413cb472..393e59fc1 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -23,12 +23,11 @@ use crate::{
AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer,
decode_send_messages_batch,
};
+use iggy_binary_protocol::{Message, Operation, PrepareHeader};
use iggy_common::{
ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset,
ConsumerOffsets,
IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet,
IggyTimestamp,
PartitionStats, PollingKind,
- header::{Operation, PrepareHeader},
- message::Message,
};
use journal::Journal as _;
use std::sync::Arc;
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index d219c905e..4fb9fc34f 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -29,13 +29,12 @@ use consensus::{
pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
send_prepare_ok as send_prepare_ok_common,
};
-use iggy_common::header::Command2;
+use iggy_binary_protocol::{
+ Command2, ConsensusHeader, GenericHeader, Message, Operation,
PrepareHeader, PrepareOkHeader,
+ RequestHeader,
+};
use iggy_common::{
IggyByteSize, PartitionStats, Segment, SegmentStorage,
- header::{
- ConsensusHeader, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader, RequestHeader,
- },
- message::Message,
sharding::{IggyNamespace, LocalIdx, ShardId},
};
use message_bus::MessageBus;
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index d3b779fee..5be236975 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -16,11 +16,8 @@
// under the License.
use bytes::Bytes;
-use iggy_common::{
- IggyMessagesBatchMut, IggyMessagesBatchSet,
- header::{Operation, PrepareHeader},
- message::Message,
-};
+use iggy_binary_protocol::{Message, Operation, PrepareHeader};
+use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet};
use journal::{Journal, Storage};
use std::{
cell::UnsafeCell,
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 833c712bd..67bb0b8c3 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -24,9 +24,9 @@ mod log;
mod types;
use bytes::{Bytes, BytesMut};
+use iggy_binary_protocol::{Message, PrepareHeader};
use iggy_common::{
- INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet,
- PooledBuffer, header::PrepareHeader, message::Message,
+ INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet, PooledBuffer,
};
pub use iggy_partition::IggyPartition;
pub use iggy_partitions::IggyPartitions;
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index 387f5f7ef..f5cb07c4e 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -26,6 +26,7 @@ consensus = { path = "../consensus" }
crossfire = { workspace = true }
futures = { workspace = true }
hash32 = { workspace = true }
+iggy_binary_protocol = { path = "../binary_protocol" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 4e03d0a4f..c6863a25d 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -19,8 +19,9 @@ mod router;
pub mod shards_table;
use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane,
VsrConsensus};
-use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader};
-use iggy_common::message::{Message, MessageBag};
+use iggy_binary_protocol::{
+ GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader,
RequestHeader,
+};
use iggy_common::sharding::IggyNamespace;
use iggy_common::variadic;
use journal::{Journal, JournalHandle};
@@ -301,7 +302,7 @@ where
where
B: MessageBus<
Replica = u8,
- Data =
iggy_common::message::Message<iggy_common::header::GenericHeader>,
+ Data =
iggy_binary_protocol::Message<iggy_binary_protocol::GenericHeader>,
Client = u128,
>,
{
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 5e75cb9dc..7398e9e2d 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -18,8 +18,7 @@
use crate::shards_table::ShardsTable;
use crate::{IggyShard, Receiver, ShardFrame};
use futures::FutureExt;
-use iggy_common::header::{ConsensusError, GenericHeader, PrepareHeader};
-use iggy_common::message::{Message, MessageBag};
+use iggy_binary_protocol::{ConsensusError, GenericHeader, Message, MessageBag,
PrepareHeader};
use iggy_common::sharding::IggyNamespace;
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 559f5fae9..556e47388 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -26,6 +26,7 @@ bytes = { workspace = true }
consensus = { path = "../consensus" }
enumset = { workspace = true }
futures = { workspace = true }
+iggy_binary_protocol = { path = "../binary_protocol" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 371258aef..93c9ef0f6 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use iggy_common::{IggyError, header::GenericHeader, message::Message};
+use iggy_binary_protocol::{GenericHeader, Message};
+use iggy_common::IggyError;
use message_bus::MessageBus;
use std::collections::{HashSet, VecDeque};
use std::ops::Deref;
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 6dc9e1eab..2a73de834 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -15,13 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use iggy_binary_protocol::{Message, Operation, RequestHeader};
use iggy_common::{
BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier,
- create_stream::CreateStream,
- delete_stream::DeleteStream,
- header::{Operation, RequestHeader},
- message::Message,
- sharding::IggyNamespace,
+ create_stream::CreateStream, delete_stream::DeleteStream,
sharding::IggyNamespace,
};
use std::cell::Cell;
@@ -136,7 +133,7 @@ impl SimClient {
let total_size = header_size + payload.len();
let header = RequestHeader {
- command: iggy_common::header::Command2::Request,
+ command: iggy_binary_protocol::Command2::Request,
operation,
size: total_size as u32,
cluster: 0,
@@ -171,7 +168,7 @@ impl SimClient {
let total_size = header_size + payload.len();
let header = RequestHeader {
- command: iggy_common::header::Command2::Request,
+ command: iggy_binary_protocol::Command2::Request,
operation,
size: total_size as u32,
cluster: 0, // TODO: Get from config
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 9f4d64b79..76fdd1ae6 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -16,8 +16,7 @@
// under the License.
use bytes::Bytes;
-use iggy_common::header::PrepareHeader;
-use iggy_common::message::Message;
+use iggy_binary_protocol::{Message, PrepareHeader};
use iggy_common::variadic;
use journal::{Journal, JournalHandle, Storage};
use metadata::MuxStateMachine;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 5e64aca81..21effc797 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -25,8 +25,7 @@ pub mod replica;
use bus::MemBus;
use consensus::PartitionsHandle;
-use iggy_common::header::ReplyHeader;
-use iggy_common::message::Message;
+use iggy_binary_protocol::{GenericHeader, Message, ReplyHeader};
use iggy_common::sharding::IggyNamespace;
use iggy_common::{IggyError, IggyMessagesBatchSet};
use message_bus::MessageBus;
@@ -126,11 +125,7 @@ impl Simulator {
}
#[allow(clippy::future_not_send)]
- async fn dispatch_to_replica(
- &self,
- replica: &Replica,
- message: Message<iggy_common::header::GenericHeader>,
- ) {
+ async fn dispatch_to_replica(&self, replica: &Replica, message:
Message<GenericHeader>) {
replica.on_message(message).await;
let mut buf = Vec::new();
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index 1ff966253..cffd0a38f 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use iggy_binary_protocol::{Message, ReplyHeader};
use iggy_common::PollingStrategy;
-use iggy_common::header::ReplyHeader;
-use iggy_common::message::Message;
use iggy_common::sharding::IggyNamespace;
use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
use message_bus::MessageBus;
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
index d5b7b7b61..687add486 100644
--- a/core/simulator/src/network.rs
+++ b/core/simulator/src/network.rs
@@ -24,7 +24,7 @@
use crate::packet::{
ALLOW_ALL, BLOCK_ALL, LinkFilter, Packet, PacketSimulator,
PacketSimulatorOptions, ProcessId,
};
-use iggy_common::{header::GenericHeader, message::Message};
+use iggy_binary_protocol::{GenericHeader, Message};
/// Network layer for the cluster simulation.
///
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 9aba6e17a..1658628de 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -40,8 +40,7 @@
use crate::ready_queue::{Ready, ReadyQueue};
use enumset::EnumSet;
-use iggy_common::header::{Command2, GenericHeader};
-use iggy_common::message::Message;
+use iggy_binary_protocol::{Command2, GenericHeader, Message};
use rand::RngExt;
use rand_xoshiro::Xoshiro256Plus;
use rand_xoshiro::rand_core::SeedableRng;