This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partition_remaster in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1de4e23a862c80de0e3bd9b4f28dffb8e4ede30a Author: numinex <[email protected]> AuthorDate: Tue Mar 24 08:05:31 2026 +0100 feat(partitions): rework partitions using prefix/tail message --- Cargo.lock | 8 + Cargo.toml | 1 + core/common/Cargo.toml | 1 + core/common/src/types/consensus/message.rs | 199 ++++--- .../src/types/segment_storage/messages_writer.rs | 1 - core/consensus/Cargo.toml | 1 + core/consensus/src/plane_helpers.rs | 29 +- core/iobuf/Cargo.toml | 1 + core/iobuf/src/lib.rs | 599 ++++++++++++++------- core/message_bus/src/lib.rs | 16 +- core/metadata/Cargo.toml | 1 + core/metadata/src/impls/metadata.rs | 9 + core/metadata/src/stm/mod.rs | 3 +- core/partitions/Cargo.toml | 3 + core/partitions/src/frozen_messages_writer.rs | 116 ++++ core/partitions/src/iggy_partition.rs | 105 ++-- core/partitions/src/iggy_partitions.rs | 96 +++- core/partitions/src/journal.rs | 167 +++--- core/partitions/src/lib.rs | 38 +- core/partitions/src/send_messages2.rs | 552 +++++++++++++++++++ core/simulator/Cargo.toml | 1 + core/simulator/src/bus.rs | 16 +- core/simulator/src/client.rs | 13 +- core/simulator/src/deps.rs | 13 +- core/simulator/src/lib.rs | 6 +- core/simulator/src/packet.rs | 6 +- 26 files changed, 1462 insertions(+), 539 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63420b7b5..6450c9bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2232,6 +2232,7 @@ dependencies = [ "bytes", "futures", "iggy_common", + "iobuf", "message_bus", "rand 0.10.0", "rand_xoshiro", @@ -5427,6 +5428,7 @@ dependencies = [ "err_trail", "human-repr", "humantime", + "iobuf", "lending-iterator", "moka", "nix", @@ -5922,6 +5924,7 @@ name = "iobuf" version = "0.1.0" dependencies = [ "aligned-vec", + "compio-buf", ] [[package]] @@ -6671,6 +6674,7 @@ dependencies = [ "bytes", "consensus", "iggy_common", + "iobuf", "journal", "left-right", "message_bus", @@ -7645,9 +7649,12 @@ dependencies = [ name = "partitions" version = "0.1.0" dependencies = [ + "bytemuck", "bytes", + "compio", "consensus", "iggy_common", + "iobuf", "journal", "message_bus", "ringbuffer", @@ -9942,6 +9949,7 @@ dependencies = [ "enumset", "futures", "iggy_common", + "iobuf", "journal", "message_bus", "metadata", diff --git a/Cargo.toml b/Cargo.toml index 3843f5649..af4c45d95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,6 +157,7 @@ hostname = "0.4.2" human-repr = "1.1.0" humantime = "2.3.0" hwlocality = "1.0.0-alpha.11" +iobuf = { path = "core/iobuf" } iceberg = "0.9.0" iceberg-catalog-rest = "0.9.0" iceberg-storage-opendal = "0.9.0" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index d3d3794ba..26f5591cf 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -48,6 +48,7 @@ enumset = { workspace = true } err_trail = { workspace = true } human-repr = { workspace = true } humantime = { workspace = true } +iobuf = { workspace = true } lending-iterator = { workspace = true } moka = { workspace = true } once_cell = { workspace = true } diff --git a/core/common/src/types/consensus/message.rs b/core/common/src/types/consensus/message.rs index 5a49671db..bd30a5323 100644 --- a/core/common/src/types/consensus/message.rs +++ b/core/common/src/types/consensus/message.rs @@ -19,9 +19,13 @@ use crate::{ header::RequestHeader, types::consensus::header::{self, ConsensusHeader, PrepareHeader, PrepareOkHeader}, }; -use bytes::Bytes; +use iobuf::{Frozen, Owned, Prefix}; use std::marker::PhantomData; +const MESSAGE_ALIGN: usize = 4096; +type MessageOwned = Owned<MESSAGE_ALIGN>; +type MessageBuffer = (Prefix<MESSAGE_ALIGN>, Frozen<MESSAGE_ALIGN>); + // TODO: Rename this to Message and ConsensusHeader to Header. pub trait ConsensusMessage<H> where @@ -36,7 +40,7 @@ where H: ConsensusHeader, { fn header(&self) -> &H { - let header_bytes = &self.buffer[..size_of::<H>()]; + let header_bytes = &self.prefix[..size_of::<H>()]; bytemuck::checked::try_from_bytes(header_bytes) .expect("header validated at construction time") } @@ -44,7 +48,8 @@ where #[derive(Debug, Clone)] pub struct Message<H: ConsensusHeader> { - buffer: Bytes, + prefix: Prefix<MESSAGE_ALIGN>, + tail: Frozen<MESSAGE_ALIGN>, _marker: PhantomData<H>, } @@ -55,41 +60,30 @@ where #[inline] #[allow(unused)] pub fn header(&self) -> &H { - let header_bytes = &self.buffer[..size_of::<H>()]; + let header_bytes = &self.prefix[..size_of::<H>()]; bytemuck::checked::try_from_bytes(header_bytes) .expect("header validated at construction time") } - /// Create a new message from a buffer. - /// - /// # Errors - /// - /// Returns an error if: - /// - buffer is too small for the header - /// - buffer contains invalid bit patterns (enum discriminants) - /// - buffer is too small for the size specified in the header - /// - header validation fails #[allow(unused)] - pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> { - // verify minimum size - if buffer.len() < size_of::<H>() { + pub fn from_inner(buffer: MessageBuffer) -> Result<Self, header::ConsensusError> { + let (prefix, tail) = buffer; + let total_len = prefix.len() + tail.len(); + + if total_len < size_of::<H>() || prefix.len() < size_of::<H>() { return Err(header::ConsensusError::InvalidCommand { expected: H::COMMAND, found: header::Command2::Reserved, }); } - // Validate bit patterns (enum discriminants) via try_from_bytes - let header_bytes = &buffer[..size_of::<H>()]; + let header_bytes = &prefix[..size_of::<H>()]; let header = bytemuck::checked::try_from_bytes::<H>(header_bytes) .map_err(|_| header::ConsensusError::InvalidBitPattern)?; - // validate the header header.validate()?; - // verify buffer size matches header.size - let header_size = header.size() as usize; - if buffer.len() < header_size { + if total_len < header.size() as usize { return Err(header::ConsensusError::InvalidCommand { expected: H::COMMAND, found: header::Command2::Reserved, @@ -97,7 +91,8 @@ where } Ok(Self { - buffer, + prefix, + tail, _marker: PhantomData, }) } @@ -108,9 +103,10 @@ where #[allow(unused)] pub fn new(size: usize) -> Self { assert!(size >= size_of::<H>(), "Size must be at least header size"); - let buffer = Bytes::from(vec![0u8; size]); + let (prefix, tail) = MessageOwned::zeroed(size).split_at(size_of::<H>()); Self { - buffer, + prefix, + tail, _marker: PhantomData, } } @@ -123,71 +119,27 @@ where let old_header = *self.header(); // Safety: We ensured that size_of::<H>() == size_of::<T>() - // On top of that, there is going to be only one reference to buffer during this function call - // so no other references can observe the mutation. - // In the future we can replace the `Bytes` buffer with something that does not allow sharing between different threads. - let buffer = self.into_inner(); - unsafe { - let ptr = buffer.as_ptr() as *mut u8; - let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>()); - // Zero out to ensure valid bit patterns before creating a typed reference. - slice.fill(0); - let new_header = - bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed bytes are valid"); - f(old_header, new_header); - } + // Only the consensus header bytes are mutated. The command-specific + // prefix and immutable tail remain untouched. + let (mut prefix, tail) = self.into_inner(); + let slice = &mut prefix[..size_of::<T>()]; + slice.fill(0); + let new_header = + bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed bytes are valid"); + f(old_header, new_header); Message { - buffer, + prefix, + tail, _marker: PhantomData, } } - /// Get a reference to the message body (everything after the header). - /// - /// Returns an empty slice if there is no body. - #[inline] - #[allow(unused)] - pub fn body(&self) -> &[u8] { - let header_size = size_of::<H>(); - let total_size = self.header().size() as usize; - - if total_size > header_size { - &self.buffer[header_size..total_size] - } else { - &[] - } - } - - /// Get the message body as zero-copy `Bytes`. - /// - /// Returns an empty `Bytes` if there is no body. - #[inline] - #[allow(unused)] - pub fn body_bytes(&self) -> Bytes { - let header_size = size_of::<H>(); - let total_size = self.header().size() as usize; - - if total_size > header_size { - self.buffer.slice(header_size..total_size) - } else { - Bytes::new() - } - } - - /// Get the complete message as bytes (header + body). - #[inline] - #[allow(unused)] - pub fn as_bytes(&self) -> &[u8] { - let total_size = self.header().size() as usize; - &self.buffer[..total_size] - } - /// Convert into the underlying buffer. #[inline] #[allow(unused)] - pub fn into_inner(self) -> Bytes { - self.buffer + pub fn into_inner(self) -> MessageBuffer { + (self.prefix, self.tail) } /// Create a message from a buffer without validation. @@ -199,9 +151,11 @@ where /// - If doing a zero-cost type conversion (like to GenericHeader) #[inline] #[allow(unused)] - unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self { + unsafe fn from_buffer_unchecked(buffer: MessageBuffer) -> Self { + let (prefix, tail) = buffer; Self { - buffer, + prefix, + tail, _marker: PhantomData, } } @@ -211,7 +165,7 @@ where /// This allows treating any message as a generic message for common operations. #[allow(unused)] pub fn into_generic(self) -> Message<header::GenericHeader> { - unsafe { Message::from_buffer_unchecked(self.buffer) } + unsafe { Message::from_buffer_unchecked((self.prefix, self.tail)) } } /// Get a reference to this message as a generic message. @@ -237,7 +191,7 @@ where where T: ConsensusHeader, { - if self.buffer.len() < size_of::<T>() { + if self.total_len() < size_of::<T>() || self.prefix.len() < size_of::<T>() { return Err(header::ConsensusError::InvalidCommand { expected: T::COMMAND, found: header::Command2::Reserved, @@ -253,13 +207,13 @@ where } // Validate bit patterns for the target type - let header_bytes = &self.buffer[..size_of::<T>()]; + let header_bytes = &self.prefix[..size_of::<T>()]; let header = bytemuck::checked::try_from_bytes::<T>(header_bytes) .map_err(|_| header::ConsensusError::InvalidBitPattern)?; header.validate()?; - Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) }) + Ok(unsafe { Message::<T>::from_buffer_unchecked((self.prefix, self.tail)) }) } /// Try to get a reference to this message as a different header type. @@ -271,7 +225,7 @@ where T: ConsensusHeader, { // check buffer size - if self.buffer.len() < size_of::<T>() { + if self.total_len() < size_of::<T>() || self.prefix.len() < size_of::<T>() { return Err(header::ConsensusError::InvalidCommand { expected: T::COMMAND, found: header::Command2::Reserved, @@ -288,7 +242,7 @@ where } // Validate bit patterns for the target type - let header_bytes = &self.buffer[..size_of::<T>()]; + let header_bytes = &self.prefix[..size_of::<T>()]; bytemuck::checked::try_from_bytes::<T>(header_bytes) .map_err(|_| header::ConsensusError::InvalidBitPattern)? .validate()?; @@ -297,6 +251,21 @@ where Ok(typed_message) } + + fn total_len(&self) -> usize { + self.prefix.len() + self.tail.len() + } +} + +impl<H> TryFrom<MessageBuffer> for Message<H> +where + H: ConsensusHeader, +{ + type Error = header::ConsensusError; + + fn try_from(buffer: MessageBuffer) -> Result<Self, Self::Error> { + Self::from_inner(buffer) + } } #[derive(Debug)] @@ -378,6 +347,28 @@ mod tests { use super::*; + fn merged_body<H>(message: &Message<H>) -> Vec<u8> + where + H: ConsensusHeader, + { + let total_size = message.header().size() as usize; + let header_size = size_of::<H>(); + let split_at = message.prefix.len(); + + if total_size <= split_at { + return message.prefix[header_size..total_size].to_vec(); + } + + if split_at <= header_size { + return message.tail[..total_size - header_size].to_vec(); + } + + let mut body = Vec::with_capacity(total_size - header_size); + body.extend_from_slice(&message.prefix[header_size..split_at]); + body.extend_from_slice(&message.tail[..total_size - split_at]); + body + } + trait MessageFactory: ConsensusHeader + Sized { fn create_test() -> Message<Self>; } @@ -407,7 +398,8 @@ mod tests { *item = (i % 256) as u8; } - Message::<Self>::from_bytes(buffer.freeze()).unwrap() + Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size)) + .expect("test buffer must contain a valid consensus message") } } @@ -435,7 +427,8 @@ mod tests { header.timestamp = 1234567890; header.operation = header::Operation::CreateStream; - Message::<Self>::from_bytes(buffer.freeze()).unwrap() + Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size)) + .expect("test buffer must contain a valid consensus message") } } @@ -457,7 +450,8 @@ mod tests { header.replica = 2; header.commit = 50; - Message::<Self>::from_bytes(buffer.freeze()).unwrap() + Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size)) + .expect("test buffer must contain a valid consensus message") } } @@ -482,7 +476,8 @@ mod tests { header.commit = 99; header.operation = header::Operation::CreateStream; - Message::<Self>::from_bytes(buffer.freeze()).unwrap() + Message::try_from(MessageOwned::copy_from_slice(buffer.as_ref()).split_at(header_size)) + .expect("test buffer must contain a valid consensus message") } } @@ -493,13 +488,13 @@ mod tests { assert_eq!(message.header().cluster, 12345); assert_eq!(message.header().command, header::Command2::Reserved); assert_eq!( - message.body().len(), + merged_body(&message).len(), message.header().size() as usize - size_of::<header::GenericHeader>() ); - let body = message.body(); + let body = merged_body(&message); let header_size = size_of::<header::GenericHeader>(); - for (i, &byte) in body.iter().enumerate() { + for (i, byte) in body.into_iter().enumerate() { let expected = ((i + header_size) % 256) as u8; assert_eq!(byte, expected); } @@ -508,8 +503,8 @@ mod tests { #[test] fn test_message_conversion() { let prepare_message = header::PrepareHeader::create_test(); - - let original_bytes = prepare_message.as_bytes().to_vec(); + let original_header = *prepare_message.header(); + let original_body = merged_body(&prepare_message); let generic_message = prepare_message.into_generic(); assert_eq!(generic_message.header().command, header::Command2::Prepare); @@ -521,12 +516,12 @@ mod tests { assert_eq!(prepare_again.header().view, 1); assert_eq!(prepare_again.header().cluster, 12345); - let roundtrip_bytes = prepare_again.as_bytes().to_vec(); - assert_eq!( - original_bytes, roundtrip_bytes, - "Bytes should be identical after round-trip conversion" + bytemuck::bytes_of(&original_header), + bytemuck::bytes_of(prepare_again.header()), + "Header should be identical after round-trip conversion" ); + assert_eq!(original_body, merged_body(&prepare_again)); } #[test] diff --git a/core/common/src/types/segment_storage/messages_writer.rs b/core/common/src/types/segment_storage/messages_writer.rs index 6f81f7752..ae8864699 100644 --- a/core/common/src/types/segment_storage/messages_writer.rs +++ b/core/common/src/types/segment_storage/messages_writer.rs @@ -117,7 +117,6 @@ impl MessagesWriter { Ok(IggyByteSize::from(messages_size)) } - pub fn path(&self) -> String { self.file_path.clone() } diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml index 2695f624c..c4e3e13ad 100644 --- a/core/consensus/Cargo.toml +++ b/core/consensus/Cargo.toml @@ -32,6 +32,7 @@ bit-set = { workspace = true } bytemuck = { workspace = true } bytes = { workspace = true } iggy_common = { workspace = true } +iobuf = { workspace = true } message_bus = { workspace = true } rand = { workspace = true } rand_xoshiro = { workspace = true } diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 31356a448..0fdeb5ff8 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -18,6 +18,7 @@ use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, VsrConsensus}; use iggy_common::header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader}; use iggy_common::message::Message; +use iobuf::Owned; use message_bus::MessageBus; use std::ops::AsyncFnOnce; @@ -267,8 +268,8 @@ where buffer[header_size..].copy_from_slice(&body); } - Message::<ReplyHeader>::from_bytes(buffer.freeze()) - .expect("build_reply_message: constructed header must be valid") + Message::try_from(Owned::<4096>::copy_from_slice(buffer.as_ref()).split_at(header_size)) + .expect("reply buffer must contain a valid reply message") } /// Verify hash chain would not break if we add this header. @@ -388,17 +389,17 @@ mod tests { async fn send_to_client( &self, _client_id: Self::Client, - _data: Self::Data, - ) -> Result<(), IggyError> { - Ok(()) + data: Self::Data, + ) -> Result<Self::Data, IggyError> { + Ok(data) } async fn send_to_replica( &self, _replica: Self::Replica, - _data: Self::Data, - ) -> Result<(), IggyError> { - Ok(()) + data: Self::Data, + ) -> Result<Self::Data, IggyError> { + Ok(data) } } @@ -596,17 +597,17 @@ mod tests { async fn send_to_client( &self, _client_id: Self::Client, - _data: Self::Data, - ) -> Result<(), IggyError> { - Ok(()) + data: Self::Data, + ) -> Result<Self::Data, IggyError> { + Ok(data) } async fn send_to_replica( &self, replica: Self::Replica, data: Self::Data, - ) -> Result<(), IggyError> { - self.sent.borrow_mut().push((replica, data)); - Ok(()) + ) -> Result<Self::Data, IggyError> { + self.sent.borrow_mut().push((replica, data.clone())); + Ok(data) } } diff --git a/core/iobuf/Cargo.toml b/core/iobuf/Cargo.toml index 62a8b8cc1..9ec4e8cd6 100644 --- a/core/iobuf/Cargo.toml +++ b/core/iobuf/Cargo.toml @@ -23,3 +23,4 @@ license = "Apache-2.0" [dependencies] aligned-vec = "0.6" +compio-buf = "0.8.0" diff --git a/core/iobuf/src/lib.rs b/core/iobuf/src/lib.rs index 4b9006672..43ddcb22c 100644 --- a/core/iobuf/src/lib.rs +++ b/core/iobuf/src/lib.rs @@ -15,12 +15,20 @@ // specific language governing permissions and limitations // under the License. -use std::mem::ManuallyDrop; +use std::ops::{Deref, DerefMut, RangeBounds}; use std::ptr::NonNull; use std::slice; use std::sync::atomic::{AtomicUsize, Ordering, fence}; use aligned_vec::{AVec, ConstAlign}; +use compio_buf::IoBuf; + +const fn assert_even_alignment<const ALIGN: usize>() { + assert!( + ALIGN % 2 == 0, + "ALIGN must be divisible by 2 for control-block pointer tagging" + ); +} #[derive(Debug)] pub struct Owned<const ALIGN: usize = 4096> { @@ -40,6 +48,20 @@ impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> { } impl<const ALIGN: usize> Owned<ALIGN> { + pub fn zeroed(len: usize) -> Self { + const { assert_even_alignment::<ALIGN>() }; + let mut inner: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN); + inner.resize(len, 0); + Self { inner } + } + + pub fn copy_from_slice(data: &[u8]) -> Self { + const { assert_even_alignment::<ALIGN>() }; + let mut inner: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN); + inner.extend_from_slice(data); + Self { inner } + } + pub fn as_slice(&self) -> &[u8] { &self.inner } @@ -48,11 +70,16 @@ impl<const ALIGN: usize> Owned<ALIGN> { &mut self.inner } - /// Split `Owned` buffer into two halves + /// Split `Owned` into a mutable copied-on-clone prefix and an immutable shared tail. /// /// # Panics /// Panics if `split_at > self.len()`. - pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> { + pub fn split_at(self, split_at: usize) -> (Prefix<ALIGN>, Frozen<ALIGN>) { + let (prefix, tail) = self.split_extent_pair(split_at); + (Prefix { inner: prefix }, Frozen { inner: tail }) + } + + fn split_extent_pair(self, split_at: usize) -> (Extent<ALIGN>, Extent<ALIGN>) { assert!(split_at <= self.inner.len()); // Take ownership of the AVec's allocation. After this, we are responsible @@ -68,143 +95,132 @@ impl<const ALIGN: usize> Owned<ALIGN> { // We need to increment the ref_count as the resulting halves will both point to the same control block. unsafe { ctrlb.as_ref().ref_count.fetch_add(1, Ordering::Relaxed) }; - TwoHalves { - inner: ( - Extent { - ptr: base, - len: split_at, - ctrlb, - _pad: 0, - }, - Extent { - ptr: tail, - len: len - split_at, - ctrlb, - _pad: 0, - }, - ), - } + ( + Extent { + ptr: base, + len: split_at, + ctrlb, + _pad: 0, + }, + Extent { + ptr: tail, + len: len - split_at, + ctrlb, + _pad: 0, + }, + ) } } -pub struct TwoHalves<const ALIGN: usize> { - inner: (Extent<ALIGN>, Extent<ALIGN>), +pub trait TryMerge: Sized { + type Output; + + /// # Safety + /// + /// The caller must guarantee that the second part is the suffix of the + /// original frame allocation, beginning exactly after the first part. + unsafe fn try_merge(self) -> Result<Self::Output, Self>; } -// SAFETY: `TwoHalves` can be sent across threads as long as the caller ensures that the head half is not shared between -// threads (e.g. by cloning, which creates a new head half), -// and that the tail half is only shared immutably (e.g. by cloning, which shares the tail half immutably). -unsafe impl<const ALIGN: usize> Send for TwoHalves<ALIGN> {} +impl<const ALIGN: usize> TryMerge for (Prefix<ALIGN>, Frozen<ALIGN>) { + type Output = Owned<ALIGN>; -impl<const ALIGN: usize> TwoHalves<ALIGN> { - pub fn head(&self) -> &[u8] { - self.inner.0.as_slice() + unsafe fn try_merge(self) -> Result<Self::Output, Self> { + let (prefix, tail) = self; + match unsafe { try_merge_extents(prefix.inner, tail.inner) } { + Ok(owned) => Ok(owned), + Err((prefix, tail)) => Err((Prefix { inner: prefix }, Frozen { inner: tail })), + } } +} - pub fn head_mut(&mut self) -> &mut [u8] { - // SAFETY: We are accessing the head half mutably, this is the only correct operation, as the head is not shared between clones, - // instead it gets copied. - unsafe { self.inner.0.as_mut_slice() } - } +pub struct Prefix<const ALIGN: usize> { + inner: Extent<ALIGN>, +} - pub fn tail(&self) -> &[u8] { - self.inner.1.as_slice() - } +// SAFETY: `Prefix` owns the mutable front extent. Clone copies the bytes into a new +// allocation, so the mutable prefix is never shared across threads by aliasing. +unsafe impl<const ALIGN: usize> Send for Prefix<ALIGN> {} - pub fn split_point(&self) -> usize { - self.inner.0.len +impl<const ALIGN: usize> Prefix<ALIGN> { + pub fn copy_from_slice(src: &[u8]) -> Self { + Self { + inner: Extent::copy_from_slice(src), + } } - pub fn total_len(&self) -> usize { - self.inner.0.len + self.inner.1.len + pub fn len(&self) -> usize { + self.inner.len } - pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> { - let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), self.inner.1.ctrlb.as_ptr()); - // SAFETY: `inner.1.ctrlb` points to a live control block while `self` is alive. - let ref_count = unsafe { - self.inner - .1 - .ctrlb - .as_ref() - .ref_count - .load(Ordering::Acquire) - }; - // When ctrlb_eq, both extents share the same control block with refcount 2. - // When !ctrlb_eq (after clone), the tail has its own refcount. - let is_unique = if ctrlb_eq { - ref_count == 2 - } else { - ref_count == 1 - }; + pub fn is_empty(&self) -> bool { + self.inner.len == 0 + } - if !is_unique { - return Err(self); + fn ensure_mutable(&mut self) { + if !is_copy_on_write_tagged(self.inner.ctrlb) { + return; } - // Transfer ownership to prevent Extent::drop from running. - // SAFETY: We read the inner tuple out of ManuallyDrop, which won't run the compiler-generated drop. - let this = ManuallyDrop::new(self); - let (head, tail) = unsafe { std::ptr::read(&this.inner) }; - let split_at = head.len; - - // SAFETY: `tail.ctrlb` is unique at this point (is_unique checked above), - // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation - // that must be released after copying. - unsafe { - if !ctrlb_eq { - let dst_ctrlb = tail.ctrlb.as_ref(); - - // We are patching up the original allocation, with the current head data, so that the resulting `Owned` has correct content. - let dst = slice::from_raw_parts_mut(dst_ctrlb.base.as_ptr(), split_at); - dst.copy_from_slice(head.as_slice()); - } - - // Dropping the head in `ctrlb_eq` case, should decrease the refcount to 1, so it's safe to reuse tail control block. - // In case when head was it's own allocation, we guarantee that it's always unique. - drop(head); - let tail_ctrlb = tail.ctrlb; - // Prevent tail Drop from running, we're taking ownership of the control block. - std::mem::forget(tail); - - let ctrlb = reclaim_unique_control_block(tail_ctrlb); - // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` allocation and - // are now exclusively owned by this path. - let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, ctrlb.len, ctrlb.capacity); - Ok(Owned { inner }) + let ctrlb = clear_copy_on_write_tag(self.inner.ctrlb); + // Prefix reconstructed from Frozen may alias other overlapping views. + // Detach unless the only remaining aliases are the prefix itself and its paired tail. + let ref_count = unsafe { ctrlb.as_ref().ref_count.load(Ordering::Acquire) }; + if ref_count > 2 { + self.inner = Extent::copy_from_slice(self.inner.as_slice()); + return; } + + self.inner.ctrlb = ctrlb; } } -impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> { +impl<const ALIGN: usize> Clone for Prefix<ALIGN> { fn clone(&self) -> Self { Self { - inner: ( - Extent::<ALIGN>::copy_from_slice(self.head()), - self.inner.1.clone(), - ), + inner: Extent::copy_from_slice(self.inner.as_slice()), } } } -impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> { +impl<const ALIGN: usize> std::fmt::Debug for Prefix<ALIGN> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TwoHalves") - .field("split_point", &self.split_point()) - .field("head_len", &self.inner.0.len) - .field("tail_len", &self.inner.1.len) - .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb)) + f.debug_struct("Prefix") + .field("len", &self.len()) + .field("copy_on_write", &is_copy_on_write_tagged(self.inner.ctrlb)) .finish() } } +impl<const ALIGN: usize> Deref for Prefix<ALIGN> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.inner.as_slice() + } +} + +impl<const ALIGN: usize> DerefMut for Prefix<ALIGN> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.ensure_mutable(); + // SAFETY: Prefix either comes from `Owned::split_at` and is safe to mutate + // in place, or it has been detached into its own allocation by `ensure_mutable`. + unsafe { self.inner.as_mut_slice() } + } +} + #[derive(Clone)] pub struct Frozen<const ALIGN: usize> { inner: Extent<ALIGN>, } +// SAFETY: `Frozen` provides immutable access only, and clones share the underlying +// extent immutably. +unsafe impl<const ALIGN: usize> Send for Frozen<ALIGN> {} + impl<const ALIGN: usize> From<Owned<ALIGN>> for Frozen<ALIGN> { fn from(value: Owned<ALIGN>) -> Self { + const { assert_even_alignment::<ALIGN>() }; let inner = value.inner; let (ptr, _, len, capacity) = inner.into_raw_parts(); @@ -226,6 +242,78 @@ impl<const ALIGN: usize> Frozen<ALIGN> { pub fn as_slice(&self) -> &[u8] { self.inner.as_slice() } + + /// Split `Frozen` into a copy-on-write prefix and immutable tail view. + pub fn split_at(self, split_at: usize) -> (Prefix<ALIGN>, Frozen<ALIGN>) { + const { assert_even_alignment::<ALIGN>() }; + assert!(split_at <= self.inner.len); + + let mut prefix = self.inner.clone(); + prefix.len = split_at; + prefix.ctrlb = set_copy_on_write_tag(prefix.ctrlb); + + let mut tail = self.inner; + // SAFETY: `split_at` is bounds-checked against the extent length. + tail.ptr = unsafe { NonNull::new_unchecked(tail.ptr.as_ptr().add(split_at)) }; + tail.len -= split_at; + + (Prefix { inner: prefix }, Frozen { inner: tail }) + } + + pub fn slice(&self, range: impl RangeBounds<usize>) -> Self { + use std::ops::Bound; + + let begin = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n + 1, + Bound::Unbounded => 0, + }; + + let end = match range.end_bound() { + Bound::Included(&n) => n.checked_add(1).expect("out of range"), + Bound::Excluded(&n) => n, + Bound::Unbounded => self.len(), + }; + + assert!(begin <= self.len()); + assert!(begin <= end); + assert!(end <= self.len()); + + let mut sliced = self.clone(); + // SAFETY: begin/end are bounds-checked against the current extent length. + let ptr = unsafe { NonNull::new_unchecked(sliced.inner.ptr.as_ptr().add(begin)) }; + sliced.inner.ptr = ptr; + sliced.inner.len = end - begin; + sliced + } + + pub fn len(&self) -> usize { + self.inner.len + } + + pub fn is_empty(&self) -> bool { + self.inner.len == 0 + } +} + +impl<const ALIGN: usize> std::fmt::Debug for Frozen<ALIGN> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Frozen").field("len", &self.len()).finish() + } +} + +impl<const ALIGN: usize> Deref for Frozen<ALIGN> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl<const ALIGN: usize> IoBuf for Frozen<ALIGN> { + fn as_init(&self) -> &[u8] { + self.as_slice() + } } #[repr(C, align(64))] @@ -253,7 +341,7 @@ struct Extent<const ALIGN: usize> { ptr: NonNull<u8>, len: usize, ctrlb: NonNull<ControlBlock>, - // Padded to 32 bytes in order to avoid false sharing when used by `TwoHalves` + // Padded to 32 bytes in order to avoid false sharing when used by split frame pairs. // If `Extent` would be smaller than 32 bytes, two `Extent`s that are adjacent in memory // could potentially share the same cache line + some extra // that extra could lead to false sharing, in case of invalidation of extra @@ -263,7 +351,7 @@ struct Extent<const ALIGN: usize> { impl<const ALIGN: usize> Drop for Extent<ALIGN> { fn drop(&mut self) { // SAFETY: `self.ctrlb` points to a live control block while `self` is alive. - unsafe { release_control_block_w_allocation::<ALIGN>(self.ctrlb) } + unsafe { release_control_block_w_allocation::<ALIGN>(clear_copy_on_write_tag(self.ctrlb)) } } } @@ -279,6 +367,7 @@ impl<const ALIGN: usize> Extent<ALIGN> { } fn copy_from_slice(src: &[u8]) -> Self { + const { assert_even_alignment::<ALIGN>() }; let mut v: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN); v.extend_from_slice(src); @@ -298,12 +387,10 @@ impl<const ALIGN: usize> Extent<ALIGN> { impl<const ALIGN: usize> Clone for Extent<ALIGN> { fn clone(&self) -> Self { + let ctrlb = clear_copy_on_write_tag(self.ctrlb); // SAFETY: `self.ctrlb` points to a live control block while `self` is alive. unsafe { - self.ctrlb - .as_ref() - .ref_count - .fetch_add(1, Ordering::Relaxed); + ctrlb.as_ref().ref_count.fetch_add(1, Ordering::Relaxed); } Self { ptr: self.ptr, @@ -314,7 +401,24 @@ impl<const ALIGN: usize> Clone for Extent<ALIGN> { } } +const COPY_ON_WRITE_TAG: usize = 1; + +fn is_copy_on_write_tagged(ctrlb: NonNull<ControlBlock>) -> bool { + ctrlb.as_ptr().addr() & COPY_ON_WRITE_TAG != 0 +} + +fn set_copy_on_write_tag(ctrlb: NonNull<ControlBlock>) -> NonNull<ControlBlock> { + // SAFETY: tagging only toggles the low alignment bit while preserving provenance. + unsafe { NonNull::new_unchecked(ctrlb.as_ptr().map_addr(|addr| addr | COPY_ON_WRITE_TAG)) } +} + +fn clear_copy_on_write_tag(ctrlb: NonNull<ControlBlock>) -> NonNull<ControlBlock> { + // SAFETY: clearing only strips the low tag bit while preserving provenance. + unsafe { NonNull::new_unchecked(ctrlb.as_ptr().map_addr(|addr| addr & !COPY_ON_WRITE_TAG)) } +} + unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb: NonNull<ControlBlock>) { + const { assert_even_alignment::<ALIGN>() }; // SAFETY: ctrlb is valid per function preconditions let old = unsafe { ctrlb.as_ref() } .ref_count @@ -371,12 +475,65 @@ unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) -> ControlB unsafe { *Box::from_raw(ctrlb.as_ptr()) } } +/// # Safety +/// +/// The caller must guarantee that `tail` is the suffix of the original frame +/// allocation and begins exactly after `prefix`. +unsafe fn try_merge_extents<const ALIGN: usize>( + prefix: Extent<ALIGN>, + tail: Extent<ALIGN>, +) -> Result<Owned<ALIGN>, (Extent<ALIGN>, Extent<ALIGN>)> { + const { assert_even_alignment::<ALIGN>() }; + let split_at = prefix.len; + // SAFETY: `tail.ctrlb` points to a live control block while `tail` is alive. + let tail_ctrlb = clear_copy_on_write_tag(tail.ctrlb); + let prefix_ctrlb = clear_copy_on_write_tag(prefix.ctrlb); + let ctrlb_eq = std::ptr::addr_eq(prefix_ctrlb.as_ptr(), tail_ctrlb.as_ptr()); + let ref_count = unsafe { tail_ctrlb.as_ref().ref_count.load(Ordering::Acquire) }; + + // When ctrlb_eq, both extents share the same control block with refcount 2. + // When !ctrlb_eq, the tail must be unique for the merge to reclaim its allocation. + let is_unique = if ctrlb_eq { + ref_count == 2 + } else { + ref_count == 1 + }; + if !is_unique { + return Err((prefix, tail)); + } + + unsafe { + if !ctrlb_eq { + let dst_ctrlb = tail_ctrlb.as_ref(); + // We are patching up the original allocation, with the current prefix data, + // so that the resulting `Owned` has correct content. + let dst = slice::from_raw_parts_mut(dst_ctrlb.base.as_ptr(), split_at); + dst.copy_from_slice(prefix.as_slice()); + } + + // Dropping the prefix in `ctrlb_eq` case should decrease the refcount to 1, + // so it's safe to reuse the tail control block. In the case where the prefix + // owns its own allocation, we guarantee that it's always unique. + drop(prefix); + let tail_ctrlb = tail_ctrlb; + // Prevent tail Drop from running, we're taking ownership of the control block. + std::mem::forget(tail); + + let ctrlb = reclaim_unique_control_block(tail_ctrlb); + // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` allocation and + // are now exclusively owned by this path. + let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, ctrlb.len, ctrlb.capacity); + Ok(Owned { inner }) + } +} + // TODO: Better tests & miri. #[cfg(test)] mod tests { - use super::Owned; + use super::{Frozen, Owned, Prefix, TryMerge}; use aligned_vec::AVec; use aligned_vec::ConstAlign; + use std::mem; fn make_owned(data: &[u8]) -> Owned { let mut v: AVec<u8, ConstAlign<4096>> = AVec::new(4096); @@ -385,72 +542,71 @@ mod tests { } #[test] - fn split_exposes_head_and_tail() { + fn split_exposes_prefix_and_tail() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let mut buffer = owned.split_at(2); + let (mut prefix, tail) = owned.split_at(2); - assert_eq!(buffer.head(), &[1, 2]); - assert_eq!(buffer.tail(), &[3, 4, 5]); - assert_eq!(buffer.split_point(), 2); - assert_eq!(buffer.total_len(), 5); + assert_eq!(&*prefix, &[1, 2]); + assert_eq!(&*tail, &[3, 4, 5]); - buffer.head_mut().copy_from_slice(&[9, 8]); - assert_eq!(buffer.head(), &[9, 8]); - assert_eq!(buffer.tail(), &[3, 4, 5]); + prefix.copy_from_slice(&[9, 8]); + assert_eq!(&*prefix, &[9, 8]); + assert_eq!(&*tail, &[3, 4, 5]); } #[test] - fn clone_copies_head_and_shares_tail() { + fn clone_copies_prefix_and_shares_tail() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let mut original = owned.split_at(2); - let mut cloned = original.clone(); + let (mut original_prefix, original_tail) = owned.split_at(2); + let (mut cloned_prefix, cloned_tail) = (original_prefix.clone(), original_tail.clone()); - original.head_mut().copy_from_slice(&[9, 9]); - cloned.head_mut().copy_from_slice(&[7, 7]); + original_prefix.copy_from_slice(&[9, 9]); + cloned_prefix.copy_from_slice(&[7, 7]); - assert_eq!(original.head(), &[9, 9]); - assert_eq!(cloned.head(), &[7, 7]); - assert_eq!(original.tail(), &[3, 4, 5]); - assert_eq!(cloned.tail(), &[3, 4, 5]); + assert_eq!(&*original_prefix, &[9, 9]); + assert_eq!(&*cloned_prefix, &[7, 7]); + assert_eq!(&*original_tail, &[3, 4, 5]); + assert_eq!(&*cloned_tail, &[3, 4, 5]); } #[test] fn try_merge_reuses_original_frame_when_unique() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let mut buffer = owned.split_at(2); - buffer.head_mut().copy_from_slice(&[8, 9]); + let (mut prefix, tail) = owned.split_at(2); + prefix.copy_from_slice(&[8, 9]); - let merged: AVec<u8, ConstAlign<4096>> = buffer.try_merge().unwrap().into(); + let merged: AVec<u8, ConstAlign<4096>> = + unsafe { (prefix, tail).try_merge() }.unwrap().into(); assert_eq!(merged.as_slice(), &[8, 9, 3, 4, 5]); } #[test] fn try_merge_fails_while_tail_is_shared() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let buffer = owned.split_at(2); - let clone = buffer.clone(); + let parts = owned.split_at(2); + let clone = parts.clone(); // Merge fails because tail is shared - let buffer = buffer.try_merge().unwrap_err(); + let parts = unsafe { parts.try_merge() }.unwrap_err(); drop(clone); // Now merge succeeds - let merged: AVec<u8, ConstAlign<4096>> = buffer.try_merge().unwrap().into(); + let merged: AVec<u8, ConstAlign<4096>> = unsafe { parts.try_merge() }.unwrap().into(); assert_eq!(merged.as_slice(), &[1, 2, 3, 4, 5]); } #[test] - fn merge_after_cloned_head_mutation_writes_back_to_original_frame() { + fn merge_after_cloned_prefix_mutation_writes_back_to_original_frame() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let buffer = owned.split_at(2); - let mut clone = buffer.clone(); + let parts = owned.split_at(2); + let mut clone = parts.clone(); - drop(buffer); + drop(parts); - clone.head_mut().copy_from_slice(&[4, 2]); + clone.0.copy_from_slice(&[4, 2]); - let merged: AVec<u8, ConstAlign<4096>> = clone.try_merge().unwrap().into(); + let merged: AVec<u8, ConstAlign<4096>> = unsafe { clone.try_merge() }.unwrap().into(); assert_eq!(merged.as_slice(), &[4, 2, 3, 4, 5]); } @@ -458,13 +614,13 @@ mod tests { fn zero_length_splits_work() { let owned = make_owned(&[1, 2, 3]); let left_empty = owned.split_at(0); - assert_eq!(left_empty.head(), &[]); - assert_eq!(left_empty.tail(), &[1, 2, 3]); + assert_eq!(&*left_empty.0, &[]); + assert_eq!(&*left_empty.1, &[1, 2, 3]); let owned = make_owned(&[1, 2, 3]); let right_empty = owned.split_at(3); - assert_eq!(right_empty.head(), &[1, 2, 3]); - assert_eq!(right_empty.tail(), &[]); + assert_eq!(&*right_empty.0, &[1, 2, 3]); + assert_eq!(&*right_empty.1, &[]); } #[test] @@ -474,8 +630,8 @@ mod tests { let clone1 = original.clone(); let _clone2 = clone1.clone(); - // All clones share tail, so merge should fail - assert!(original.try_merge().is_err()); + // All clones share tail, so merge should fail. + assert!(unsafe { original.try_merge() }.is_err()); } #[test] @@ -514,91 +670,164 @@ mod tests { owned.as_mut_slice()[0] = 99; owned.as_mut_slice()[4] = 88; - let buffer = owned.split_at(2); - assert_eq!(buffer.head(), &[99, 2]); - assert_eq!(buffer.tail(), &[3, 4, 88]); + let (prefix, tail) = owned.split_at(2); + assert_eq!(&*prefix, &[99, 2]); + assert_eq!(&*tail, &[3, 4, 88]); } #[test] - fn two_halves_head_returns_correct_slice() { + fn prefix_returns_correct_slice() { let owned = make_owned(&[10, 20, 30, 40, 50]); - let buffer = owned.split_at(3); + let (prefix, _) = owned.split_at(3); - assert_eq!(buffer.head(), &[10, 20, 30]); + assert_eq!(&*prefix, &[10, 20, 30]); } #[test] - fn two_halves_tail_returns_correct_slice() { + fn tail_returns_correct_slice() { let owned = make_owned(&[10, 20, 30, 40, 50]); - let buffer = owned.split_at(3); + let (_, tail) = owned.split_at(3); - assert_eq!(buffer.tail(), &[40, 50]); + assert_eq!(&*tail, &[40, 50]); } #[test] - fn two_halves_head_mut_allows_modification() { + fn prefix_allows_modification() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let mut buffer = owned.split_at(3); + let (mut prefix, tail) = owned.split_at(3); - buffer.head_mut()[0] = 100; - buffer.head_mut()[2] = 200; + prefix[0] = 100; + prefix[2] = 200; - assert_eq!(buffer.head(), &[100, 2, 200]); - assert_eq!(buffer.tail(), &[4, 5]); + assert_eq!(&*prefix, &[100, 2, 200]); + assert_eq!(&*tail, &[4, 5]); } #[test] - fn two_halves_head_mut_full_overwrite() { + fn prefix_full_overwrite() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let mut buffer = owned.split_at(3); + let (mut prefix, tail) = owned.split_at(3); - buffer.head_mut().copy_from_slice(&[7, 8, 9]); + prefix.copy_from_slice(&[7, 8, 9]); - assert_eq!(buffer.head(), &[7, 8, 9]); - assert_eq!(buffer.tail(), &[4, 5]); + assert_eq!(&*prefix, &[7, 8, 9]); + assert_eq!(&*tail, &[4, 5]); } #[test] - fn two_halves_head_empty_slice() { + fn prefix_empty_slice() { let owned = make_owned(&[1, 2, 3]); - let buffer = owned.split_at(0); + let (prefix, tail) = owned.split_at(0); - assert_eq!(buffer.head(), &[]); - assert_eq!(buffer.tail(), &[1, 2, 3]); + assert_eq!(&*prefix, &[]); + assert_eq!(&*tail, &[1, 2, 3]); } #[test] - fn two_halves_tail_empty_slice() { + fn tail_empty_slice() { let owned = make_owned(&[1, 2, 3]); - let buffer = owned.split_at(3); + let (prefix, tail) = owned.split_at(3); - assert_eq!(buffer.head(), &[1, 2, 3]); - assert_eq!(buffer.tail(), &[]); + assert_eq!(&*prefix, &[1, 2, 3]); + assert_eq!(&*tail, &[]); } #[test] - fn two_halves_head_mut_does_not_affect_tail() { + fn prefix_mutation_does_not_affect_tail() { let owned = make_owned(&[1, 2, 3, 4, 5]); - let mut buffer = owned.split_at(2); + let (mut prefix, tail) = owned.split_at(2); - let original_tail: Vec<u8> = buffer.tail().to_vec(); - buffer.head_mut().copy_from_slice(&[99, 99]); + let original_tail: Vec<u8> = tail.to_vec(); + prefix.copy_from_slice(&[99, 99]); - assert_eq!(buffer.tail(), original_tail.as_slice()); + assert_eq!(&*tail, original_tail.as_slice()); } #[test] - fn two_halves_cloned_head_mut_independent() { + fn cloned_prefix_mutations_are_independent() { let owned = make_owned(&[1, 2, 3, 4, 5]); let mut original = owned.split_at(2); let mut cloned = original.clone(); - original.head_mut().copy_from_slice(&[10, 20]); - cloned.head_mut().copy_from_slice(&[30, 40]); + original.0.copy_from_slice(&[10, 20]); + cloned.0.copy_from_slice(&[30, 40]); - assert_eq!(original.head(), &[10, 20]); - assert_eq!(cloned.head(), &[30, 40]); - // Tail is shared, both should see the same data - assert_eq!(original.tail(), cloned.tail()); + assert_eq!(&*original.0, &[10, 20]); + assert_eq!(&*cloned.0, &[30, 40]); + assert_eq!(&*original.1, &*cloned.1); + } + + #[test] + fn prefix_clone_copies_and_frozen_clone_shares() { + let owned = make_owned(&[1, 2, 3, 4, 5]); + let (mut prefix, tail) = owned.split_at(2); + let mut prefix_clone = prefix.clone(); + let tail_clone = tail.clone(); + + prefix.copy_from_slice(&[7, 7]); + prefix_clone.copy_from_slice(&[8, 8]); + + assert_eq!(&*prefix, &[7, 7]); + assert_eq!(&*prefix_clone, &[8, 8]); + assert_eq!(&*tail, &[3, 4, 5]); + assert_eq!(&*tail_clone, &[3, 4, 5]); + } + + #[test] + fn split_try_merge_reuses_original_frame_when_unique() { + let owned = make_owned(&[1, 2, 3, 4, 5]); + let (mut prefix, tail) = owned.split_at(2); + prefix.copy_from_slice(&[8, 9]); + + let merged: AVec<u8, ConstAlign<4096>> = + unsafe { (prefix, tail).try_merge() }.unwrap().into(); + assert_eq!(merged.as_slice(), &[8, 9, 3, 4, 5]); + } + + #[test] + fn split_try_merge_fails_while_tail_is_shared() { + let owned = make_owned(&[1, 2, 3, 4, 5]); + let parts = owned.split_at(2); + let tail_clone = parts.1.clone(); + + let parts = unsafe { parts.try_merge() }.unwrap_err(); + drop(tail_clone); + + let merged: AVec<u8, ConstAlign<4096>> = unsafe { parts.try_merge() }.unwrap().into(); + assert_eq!(merged.as_slice(), &[1, 2, 3, 4, 5]); + } + + #[test] + fn frozen_can_be_split_into_prefix_and_tail() { + let frozen = Frozen::from(make_owned(&[1, 2, 3, 4, 5])); + let (prefix, tail) = frozen.split_at(2); + + let merged: AVec<u8, ConstAlign<4096>> = + unsafe { (prefix, tail).try_merge() }.unwrap().into(); + assert_eq!(merged.as_slice(), &[1, 2, 3, 4, 5]); + } + + #[test] + fn frozen_split_prefix_detaches_before_mutation() { + let frozen = Frozen::from(make_owned(&[1, 2, 3, 4, 5])); + let alias = frozen.clone(); + let (mut prefix, tail) = frozen.split_at(2); + + prefix.copy_from_slice(&[9, 8]); + + assert_eq!(&alias[..], &[1, 2, 3, 4, 5]); + assert_eq!(&*prefix, &[9, 8]); + assert_eq!(&*tail, &[3, 4, 5]); + + drop(alias); + + let merged: AVec<u8, ConstAlign<4096>> = + unsafe { (prefix, tail).try_merge() }.unwrap().into(); + assert_eq!(merged.as_slice(), &[9, 8, 3, 4, 5]); + } + + #[test] + fn prefix_stays_32_bytes() { + assert_eq!(mem::size_of::<Prefix<4096>>(), 32); } } diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index 876061bc4..2a3132786 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -40,12 +40,12 @@ pub trait MessageBus { &self, client_id: Self::Client, data: Self::Data, - ) -> impl Future<Output = Result<(), IggyError>>; + ) -> impl Future<Output = Result<Self::Data, IggyError>>; fn send_to_replica( &self, replica: Self::Replica, data: Self::Data, - ) -> impl Future<Output = Result<(), IggyError>>; + ) -> impl Future<Output = Result<Self::Data, IggyError>>; } // TODO: explore generics for Strategy @@ -112,25 +112,25 @@ impl MessageBus for IggyMessageBus { async fn send_to_client( &self, client_id: Self::Client, - _message: Self::Data, - ) -> Result<(), IggyError> { + message: Self::Data, + ) -> Result<Self::Data, IggyError> { #[allow(clippy::cast_possible_truncation)] // IggyError::ClientNotFound takes u32 let _sender = self .clients .get(&client_id) .ok_or(IggyError::ClientNotFound(client_id as u32))?; - Ok(()) + Ok(message) } async fn send_to_replica( &self, replica: Self::Replica, - _message: Self::Data, - ) -> Result<(), IggyError> { + message: Self::Data, + ) -> Result<Self::Data, IggyError> { // TODO: Handle lazily creating the connection. let _connection = self .get_replica_connection(replica) .ok_or(IggyError::ResourceNotFound(format!("Replica {replica}")))?; - Ok(()) + Ok(message) } } diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index e0d024aaf..41ae29517 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -32,6 +32,7 @@ ahash = { workspace = true } bytes = { workspace = true } consensus = { workspace = true } iggy_common = { workspace = true } +iobuf = { workspace = true } journal = { workspace = true } left-right = { workspace = true } message_bus = { workspace = true } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 9751eb0c7..09be2e6e8 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -29,10 +29,18 @@ use iggy_common::{ }, message::Message, }; +use iobuf::TryMerge; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; use tracing::{debug, warn}; +fn prepare_for_update(message: Message<PrepareHeader>) -> Message<PrepareHeader> { + let owned = unsafe { message.into_inner().try_merge() } + .expect("metadata state machine update expects a unique message buffer"); + Message::try_from(owned.split_at(std::mem::size_of::<PrepareHeader>())) + .expect("normalized metadata prepare must remain valid") +} + #[derive(Debug, Clone)] #[allow(unused)] pub struct IggySnapshot { @@ -228,6 +236,7 @@ where ) }); + let prepare = prepare_for_update(prepare); let response = self.mux_stm.update(prepare).unwrap_or_else(|err| { warn!( "on_ack: state machine error for op={}: {err}", diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index e25aaaf65..e6e37a800 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -251,7 +251,8 @@ macro_rules! collect_handlers { match input.header().operation { $( Operation::$operation => { - let body = input.body_bytes(); + let (_, tail) = input.into_inner(); + let body = ::bytes::Bytes::copy_from_slice(&tail); let cmd = $operation::from_bytes(body)?; Ok(Either::Left([<$state Command>]::$operation(cmd))) }, diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml index c355ea509..bca5a4d60 100644 --- a/core/partitions/Cargo.toml +++ b/core/partitions/Cargo.toml @@ -28,8 +28,11 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" [dependencies] +bytemuck = { workspace = true } bytes = { workspace = true } +compio = { workspace = true } consensus = { workspace = true } +iobuf = { workspace = true } iggy_common = { workspace = true } journal = { workspace = true } message_bus = { workspace = true } diff --git a/core/partitions/src/frozen_messages_writer.rs b/core/partitions/src/frozen_messages_writer.rs new file mode 100644 index 000000000..7db6a86af --- /dev/null +++ b/core/partitions/src/frozen_messages_writer.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use compio::{ + buf::IoBuf, + fs::{File, OpenOptions}, + io::AsyncWriteAtExt, +}; +use iggy_common::{IggyByteSize, IggyError}; +use iobuf::Frozen; +use tracing::error; + +const MAX_IOV_COUNT: usize = 1024; + +#[derive(Debug, Clone)] +pub struct FrozenMessagesWriter { + file_path: String, + fsync: bool, +} + +impl FrozenMessagesWriter { + pub fn new(file_path: String, fsync: bool) -> Self { + Self { file_path, fsync } + } + + pub async fn save<const ALIGN: usize>( + &self, + position: u64, + buffers: &[Frozen<ALIGN>], + strip_prefix: usize, + ) -> Result<IggyByteSize, IggyError> { + let messages_size = buffers.iter().try_fold(0u64, |size, buffer| { + let written = buffer + .len() + .checked_sub(strip_prefix) + .ok_or(IggyError::CannotWriteToFile)?; + Ok::<_, IggyError>(size + written as u64) + })?; + + if messages_size == 0 { + return Ok(IggyByteSize::from(0)); + } + + let file = OpenOptions::new() + .create(true) + .write(true) + .open(&self.file_path) + .await + .map_err(|_| IggyError::CannotReadFile)?; + + write_frozen_chunked(&file, &self.file_path, position, buffers, strip_prefix).await?; + + if self.fsync { + file.sync_all() + .await + .map_err(|_| IggyError::CannotWriteToFile)?; + } + + Ok(IggyByteSize::from(messages_size)) + } +} + +async fn write_frozen_chunked<const ALIGN: usize>( + file: &File, + file_path: &str, + mut position: u64, + buffers: &[Frozen<ALIGN>], + strip_prefix: usize, +) -> Result<(), IggyError> { + for chunk in buffers.chunks(MAX_IOV_COUNT) { + let chunk_size = chunk.iter().try_fold(0usize, |size, buffer| { + let written = buffer + .len() + .checked_sub(strip_prefix) + .ok_or(IggyError::CannotWriteToFile)?; + Ok::<_, IggyError>(size + written) + })?; + + let chunk_vec: Vec<_> = chunk + .iter() + .cloned() + .map(|buffer| buffer.slice(strip_prefix..)) + .collect(); + + let (result, _) = (&*file) + .write_vectored_all_at(chunk_vec, position) + .await + .into(); + result.map_err(|err| { + error!( + file = file_path, + %err, + "failed to write frozen messages to segment file" + ); + IggyError::CannotWriteToFile + })?; + + position += chunk_size as u64; + } + + Ok(()) +} diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 5addf1516..241cff0d9 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -21,12 +21,11 @@ use crate::journal::{ use crate::log::SegmentedLog; use crate::{ AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer, - decode_send_messages_batch, + send_messages2::{IggyMessages2, stamp_prepare_for_persistence}, }; use iggy_common::{ ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, - IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet, IggyTimestamp, - PartitionStats, PollingKind, + IggyByteSize, IggyError, IggyTimestamp, PartitionStats, PollingKind, header::{Operation, PrepareHeader}, message::Message, }; @@ -55,36 +54,6 @@ pub struct IggyPartition { } impl IggyPartition { - fn prepare_message_from_batch( - mut header: PrepareHeader, - batch: &IggyMessagesBatchMut, - ) -> Message<PrepareHeader> { - let indexes = batch.indexes(); - let count = batch.count(); - let body_len = 4 + indexes.len() + batch.len(); - let total_size = std::mem::size_of::<PrepareHeader>() + body_len; - header.size = u32::try_from(total_size) - .expect("prepare_message_from_batch: batch size exceeds u32::MAX"); - - let message = Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| { - *new = header; - }); - - let mut bytes = message - .into_inner() - .try_into_mut() - .expect("prepare_message_from_batch: expected unique bytes buffer"); - let header_size = std::mem::size_of::<PrepareHeader>(); - bytes[header_size..header_size + 4].copy_from_slice(&count.to_le_bytes()); - let mut position = header_size + 4; - bytes[position..position + indexes.len()].copy_from_slice(indexes); - position += indexes.len(); - bytes[position..position + batch.len()].copy_from_slice(batch); - - Message::<PrepareHeader>::from_bytes(bytes.freeze()) - .expect("prepare_message_from_batch: invalid prepared message bytes") - } - pub fn new(stats: Arc<PartitionStats>) -> Self { Self { log: SegmentedLog::default(), @@ -111,29 +80,29 @@ impl Partition for IggyPartition { return Err(IggyError::CannotAppendMessage); } - let mut batch = decode_send_messages_batch(message.body_bytes()) - .ok_or(IggyError::CannotAppendMessage)?; - - if batch.count() == 0 { - return Ok(AppendResult::new(0, 0, 0)); - } - let dirty_offset = if self.should_increment_offset { self.dirty_offset.load(Ordering::Relaxed) + 1 } else { 0 }; - let segment = self.log.active_segment(); - let segment_start_offset = segment.start_offset; - let current_position = segment.current_position; - - batch - .prepare_for_persistence(segment_start_offset, dirty_offset, current_position, None) - .await; + let current_max_timestamp = self + .log + .journal() + .info + .end_timestamp + .max(self.log.active_segment().end_timestamp); + let batch_timestamp = IggyTimestamp::now().as_micros().max(current_max_timestamp); + let (message, batch) = + stamp_prepare_for_persistence(message, dirty_offset, batch_timestamp) + .map_err(|_| IggyError::CannotAppendMessage)?; + + if batch.record_count == 0 { + return Ok(AppendResult::new(0, 0, 0)); + } - let batch_messages_count = batch.count(); - let batch_messages_size = batch.size(); + let batch_messages_count = batch.record_count; + let batch_messages_size = batch.total_size() as u32; let last_dirty_offset = if batch_messages_count == 0 { dirty_offset @@ -154,16 +123,10 @@ impl Partition for IggyPartition { journal.info.messages_count += batch_messages_count; journal.info.size += IggyByteSize::from(u64::from(batch_messages_size)); journal.info.current_offset = last_dirty_offset; - if let Some(ts) = batch.first_timestamp() - && journal.info.first_timestamp == 0 - { - journal.info.first_timestamp = ts; + if journal.info.first_timestamp == 0 { + journal.info.first_timestamp = batch.base_timestamp; } - if let Some(ts) = batch.last_timestamp() { - journal.info.end_timestamp = ts; - } - - let message = Self::prepare_message_from_batch(header, &batch); + journal.info.end_timestamp = batch.base_timestamp; journal.inner.append(message).await; Ok(AppendResult::new( @@ -177,9 +140,9 @@ impl Partition for IggyPartition { &self, consumer: PollingConsumer, args: PollingArgs, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessages2, IggyError> { if !self.should_increment_offset || args.count == 0 { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessages2::empty()); } let committed_offset = self.offset.load(Ordering::Relaxed); @@ -198,15 +161,15 @@ impl Partition for IggyPartition { count: args.count, }) .await; - let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty); - if let Some(first) = batch_set.first_offset() { + let messages = result.unwrap_or_else(IggyMessages2::empty); + if let Some(first) = messages.first_offset() { if first > committed_offset { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessages2::empty()); } let max_count = u32::try_from(committed_offset - first + 1).unwrap_or(u32::MAX); - return Ok(batch_set.get_by_offset(first, batch_set.count().min(max_count))); + return Ok(messages.limit(max_count)); } - return Ok(batch_set); + return Ok(messages); } PollingKind::Next => self .get_consumer_offset(consumer) @@ -214,7 +177,7 @@ impl Partition for IggyPartition { }; if start_offset > committed_offset { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessages2::empty()); } let max_count = u32::try_from(committed_offset - start_offset + 1).unwrap_or(u32::MAX); @@ -230,10 +193,12 @@ impl Partition for IggyPartition { }) .await; - let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty); + let messages = result.unwrap_or_else(IggyMessages2::empty); - if args.auto_commit && !batch_set.is_empty() { - let last_offset = start_offset + u64::from(batch_set.count()) - 1; + if args.auto_commit && !messages.is_empty() { + let last_offset = messages + .last_offset() + .expect("non-empty poll result must have a last offset"); if let Err(err) = self.store_consumer_offset(consumer, last_offset) { // warning for now. warn!( @@ -245,7 +210,7 @@ impl Partition for IggyPartition { } } - Ok(batch_set) + Ok(messages) } #[allow(clippy::cast_possible_truncation)] diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index d219c905e..bc2cceb84 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -20,7 +20,9 @@ use crate::IggyPartition; use crate::Partition; use crate::PollingConsumer; +use crate::frozen_messages_writer::FrozenMessagesWriter; use crate::log::JournalInfo; +use crate::send_messages2::convert_request_message; use crate::types::PartitionsConfig; use consensus::PlaneIdentity; use consensus::{ @@ -38,6 +40,7 @@ use iggy_common::{ message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, }; +use iobuf::{Frozen, TryMerge}; use message_bus::MessageBus; use std::cell::UnsafeCell; use std::collections::{HashMap, HashSet}; @@ -350,6 +353,17 @@ where .expect("on_request: consensus not initialized"); debug!(?namespace, "handling partition request"); + let message = if message.header().operation == Operation::SendMessages { + match convert_request_message(namespace, message) { + Ok(message) => message, + Err(error) => { + warn!(?namespace, %error, "on_request: failed to convert SendMessages"); + return; + } + } + } else { + message + }; let prepare = message.project(consensus); pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } @@ -563,8 +577,10 @@ where ); } Operation::StoreConsumerOffset => { - let body = message.body_bytes(); - let body = body.as_ref(); + let total_size = header.size() as usize; + let owned = unsafe { message.into_inner().try_merge() } + .expect("consumer offset prepare expects a unique message buffer"); + let body = &owned.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size]; let consumer_kind = body[0]; let consumer_id = u32::from_le_bytes(body[1..5].try_into().unwrap()) as usize; let offset = u64::from_le_bytes(body[5..13].try_into().unwrap()); @@ -689,22 +705,53 @@ where if is_full || unsaved_messages_count_exceeded || unsaved_messages_size_exceeded { // Freeze journal batches. - let frozen_batches = { - let batches = partition.log.journal_mut().inner.commit(); + let (frozen_batches, batch_count) = { + let entries = partition.log.journal_mut().inner.commit(); + let segment = partition.log.active_segment(); + let segment_start_offset = segment.start_offset; + let segment_base_timestamp = if segment.start_timestamp == 0 { + journal_info.first_timestamp + } else { + segment.start_timestamp + }; + let mut file_position = segment.size.as_bytes_u64() as u32; partition.log.ensure_indexes(); - batches.append_indexes_to(partition.log.active_indexes_mut().unwrap()); - - let frozen: Vec<_> = batches - .into_inner() - .into_iter() - .map(|mut b| b.freeze()) - .collect(); - partition.log.set_in_flight(frozen.clone()); - frozen + let indexes = partition.log.active_indexes_mut().unwrap(); + let mut frozen = Vec::with_capacity(entries.len()); + let mut batch_count = 0u32; + + for entry in entries { + let owned = unsafe { entry.into_inner().try_merge() } + .expect("journal commit expects mergeable prepare buffers"); + let Ok(batch) = crate::send_messages2::decode_prepare_slice(owned.as_slice()) + else { + continue; + }; + if batch.header.record_count == 0 { + continue; + } + + let relative_offset = + u32::try_from(batch.header.base_offset - segment_start_offset) + .expect("relative batch offset exceeds u32::MAX"); + let relative_timestamp = + u32::try_from(batch.header.base_timestamp - segment_base_timestamp) + .expect("relative batch timestamp exceeds u32::MAX"); + indexes.insert( + relative_offset, + file_position, + u64::from(relative_timestamp), + ); + file_position += batch.header.total_size() as u32; + batch_count += batch.header.record_count; + frozen.push(owned.into()); + } + + (frozen, batch_count) }; // Persist to disk. - self.persist_frozen_batches_to_disk(namespace, frozen_batches) + self.persist_frozen_batches_to_disk(namespace, frozen_batches, batch_count) .await; if is_full { @@ -731,13 +778,9 @@ where async fn persist_frozen_batches_to_disk( &self, namespace: &IggyNamespace, - frozen_batches: Vec<iggy_common::IggyMessagesBatch>, + frozen_batches: Vec<Frozen<4096>>, + batch_count: u32, ) { - let batch_count: u32 = frozen_batches - .iter() - .map(iggy_common::IggyMessagesBatch::count) - .sum(); - if batch_count == 0 { return; } @@ -750,13 +793,13 @@ where return; } - let messages_writer = partition + let messages_path = partition .log .active_storage() .messages_writer .as_ref() .expect("Messages writer not initialized") - .clone(); + .path(); let index_writer = partition .log .active_storage() @@ -764,10 +807,15 @@ where .as_ref() .expect("Index writer not initialized") .clone(); + let position = partition.log.active_segment().size.as_bytes_u64(); + let messages_writer = FrozenMessagesWriter::new(messages_path, self.config.enforce_fsync); let saved = messages_writer - .as_ref() - .save_frozen_batches(&frozen_batches) + .save( + position, + &frozen_batches, + std::mem::size_of::<PrepareHeader>(), + ) .await .expect("persist: failed to save frozen batches"); diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index d3b779fee..fa9450ee8 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,19 +15,28 @@ // specific language governing permissions and limitations // under the License. -use bytes::Bytes; use iggy_common::{ - IggyMessagesBatchMut, IggyMessagesBatchSet, header::{Operation, PrepareHeader}, message::Message, }; +use iobuf::{Frozen, Owned, TryMerge}; use journal::{Journal, Storage}; use std::{ cell::UnsafeCell, collections::{BTreeMap, HashMap}, }; +use crate::send_messages2::{IggyMessages2, PREPARE_SPLIT_POINT, decode_prepare_slice}; + const ZERO_LEN: usize = 0; +type JournalBuffer = Frozen<4096>; + +fn message_from_frozen( + inner: JournalBuffer, +) -> Result<Message<PrepareHeader>, iggy_common::header::ConsensusError> { + let split_at = PREPARE_SPLIT_POINT.min(inner.len()); + Message::try_from(inner.split_at(split_at)) +} /// Lookup key for querying messages from the journal. #[derive(Debug, Clone, Copy)] @@ -53,12 +62,12 @@ where { type Query; - fn get(&self, query: &Self::Query) -> impl Future<Output = Option<IggyMessagesBatchSet>>; + fn get(&self, query: &Self::Query) -> impl Future<Output = Option<IggyMessages2>>; } #[derive(Debug, Default)] pub struct PartitionJournalMemStorage { - entries: UnsafeCell<Vec<Bytes>>, + entries: UnsafeCell<Vec<JournalBuffer>>, /// Maps byte offset (as if disk-backed) to index in entries Vec offset_to_index: UnsafeCell<HashMap<usize, usize>>, /// Current write position (cumulative byte offset) @@ -66,7 +75,7 @@ pub struct PartitionJournalMemStorage { } impl Storage for PartitionJournalMemStorage { - type Buffer = Bytes; + type Buffer = JournalBuffer; async fn write(&self, buf: Self::Buffer) -> usize { let len = buf.len(); @@ -87,17 +96,20 @@ impl Storage for PartitionJournalMemStorage { async fn read(&self, offset: usize, _len: usize) -> Self::Buffer { let offset_to_index = unsafe { &*self.offset_to_index.get() }; let Some(&index) = offset_to_index.get(&offset) else { - return Bytes::new(); + return Owned::<4096>::zeroed(0).into(); }; let entries = unsafe { &*self.entries.get() }; - entries.get(index).cloned().unwrap_or_default() + entries + .get(index) + .cloned() + .unwrap_or_else(|| Owned::<4096>::zeroed(0).into()) } } pub struct PartitionJournal<S> where - S: Storage<Buffer = Bytes>, + S: Storage<Buffer = JournalBuffer>, { /// Maps op -> storage byte offset (for all entries) op_to_storage_offset: UnsafeCell<BTreeMap<u64, usize>>, @@ -111,7 +123,7 @@ where impl<S> Default for PartitionJournal<S> where - S: Storage<Buffer = Bytes> + Default, + S: Storage<Buffer = JournalBuffer> + Default, { fn default() -> Self { Self { @@ -128,7 +140,7 @@ where impl<S> std::fmt::Debug for PartitionJournal<S> where - S: Storage<Buffer = Bytes>, + S: Storage<Buffer = JournalBuffer>, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PartitionJournal2Impl").finish() @@ -137,13 +149,13 @@ where struct JournalInner<S> where - S: Storage<Buffer = Bytes>, + S: Storage<Buffer = JournalBuffer>, { storage: S, } impl PartitionJournalMemStorage { - fn drain(&self) -> Vec<Bytes> { + fn drain(&self) -> Vec<JournalBuffer> { let entries = unsafe { &mut *self.entries.get() }; let offset_to_index = unsafe { &mut *self.offset_to_index.get() }; let current_offset = unsafe { &mut *self.current_offset.get() }; @@ -162,19 +174,12 @@ impl PartitionJournalMemStorage { impl PartitionJournal<PartitionJournalMemStorage> { /// Drain all accumulated batches, matching the legacy `PartitionJournal` API. - pub fn commit(&self) -> IggyMessagesBatchSet { + pub fn commit(&self) -> Vec<Message<PrepareHeader>> { let entries = { let inner = unsafe { &*self.inner.get() }; inner.storage.drain() }; - let mut messages = Vec::with_capacity(entries.len()); - for bytes in entries { - if let Ok(message) = Message::from_bytes(bytes) { - messages.push(message); - } - } - let headers = unsafe { &mut *self.headers.get() }; headers.clear(); let op_to_storage_offset = unsafe { &mut *self.op_to_storage_offset.get() }; @@ -184,7 +189,13 @@ impl PartitionJournal<PartitionJournalMemStorage> { let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() }; timestamp_to_op.clear(); - Self::messages_to_batch_set(&messages) + entries + .into_iter() + .map(|entry| { + message_from_frozen(entry) + .expect("partition journal storage must contain valid prepare batches") + }) + .collect() } pub fn is_empty(&self) -> bool { @@ -195,28 +206,8 @@ impl PartitionJournal<PartitionJournalMemStorage> { impl<S> PartitionJournal<S> where - S: Storage<Buffer = Bytes>, + S: Storage<Buffer = JournalBuffer>, { - fn message_to_batch(message: &Message<PrepareHeader>) -> Option<IggyMessagesBatchMut> { - if message.header().operation != Operation::SendMessages { - return None; - } - - crate::decode_send_messages_batch(message.body_bytes()) - } - - fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) -> IggyMessagesBatchSet { - let mut batch_set = IggyMessagesBatchSet::empty(); - - for message in messages { - if let Some(batch) = Self::message_to_batch(message) { - batch_set.add_batch(batch); - } - } - - batch_set - } - #[allow(dead_code)] fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> { match query { @@ -255,8 +246,8 @@ where } Some( - Message::from_bytes(bytes) - .expect("partition.journal.storage.read: invalid bytes for message"), + message_from_frozen(bytes) + .expect("partition journal storage must contain a valid prepare batch"), ) } @@ -264,10 +255,14 @@ where async fn load_messages_from_storage( &self, start_op: u64, - count: u32, - ) -> Vec<Message<PrepareHeader>> { + query: MessageLookup, + ) -> IggyMessages2 { + let count = match query { + MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { count, .. } => count, + }; + if count == 0 { - return Vec::new(); + return IggyMessages2::empty(); } // Get (op, storage_offset) pairs directly from the mapping @@ -280,11 +275,10 @@ where .collect() }; - let mut messages = Vec::new(); - let mut loaded_messages = 0u32; + let mut messages = IggyMessages2::with_capacity(count as usize); for (_, storage_offset) in op_offsets { - if loaded_messages >= count { + if messages.count() >= count { break; } @@ -297,12 +291,36 @@ where continue; } - let message = Message::from_bytes(bytes) - .expect("partition.journal.storage.read: invalid bytes for message"); + let header_bytes = &bytes[..std::mem::size_of::<PrepareHeader>()]; + let header = *bytemuck::checked::try_from_bytes::<PrepareHeader>(header_bytes) + .expect("partition journal storage must contain a valid prepare header"); + if header.operation != Operation::SendMessages { + continue; + } + let Ok(batch) = decode_prepare_slice(bytes.as_slice()) else { + continue; + }; - if let Some(batch) = Self::message_to_batch(&message) { - loaded_messages = loaded_messages.saturating_add(batch.count()); - messages.push(message); + for record in batch.iter() { + if messages.count() >= count { + break; + } + + let owned = record.to_owned(&batch.header); + match query { + MessageLookup::Offset { offset, .. } => { + if owned.header.offset < offset { + continue; + } + } + MessageLookup::Timestamp { timestamp, .. } => { + if owned.header.timestamp < timestamp { + continue; + } + } + } + + messages.push(owned); } } @@ -312,10 +330,10 @@ where impl<S> Journal<S> for PartitionJournal<S> where - S: Storage<Buffer = Bytes>, + S: Storage<Buffer = JournalBuffer>, { type Header = PrepareHeader; - type Entry = Message<Self::Header>; + type Entry = Message<PrepareHeader>; #[rustfmt::skip] // Scuffed formatter. type HeaderRef<'a> = &'a Self::Header where S: 'a; @@ -335,18 +353,27 @@ where } async fn append(&self, entry: Self::Entry) { - let first_offset_and_timestamp = Self::message_to_batch(&entry) - .and_then(|batch| Some((batch.first_offset()?, batch.first_timestamp()?))); - let header = *entry.header(); let op = header.op; + let owned = unsafe { entry.into_inner().try_merge() } + .expect("partition journal append expects a unique message buffer"); + let first_offset_and_timestamp = if header.operation == Operation::SendMessages { + decode_prepare_slice(owned.as_slice()) + .ok() + .and_then(|batch| { + (batch.header.record_count != 0) + .then_some((batch.header.base_offset, batch.header.base_timestamp)) + }) + } else { + None + }; { let headers = unsafe { &mut *self.headers.get() }; headers.push(header); }; - let bytes = entry.into_inner(); + let bytes: JournalBuffer = owned.into(); let storage_offset = { let inner = unsafe { &*self.inner.get() }; inner.storage.write(bytes).await @@ -373,26 +400,14 @@ where impl<S> QueryableJournal<S> for PartitionJournal<S> where - S: Storage<Buffer = Bytes>, + S: Storage<Buffer = JournalBuffer>, { type Query = MessageLookup; - async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> { + async fn get(&self, query: &Self::Query) -> Option<IggyMessages2> { let query = *query; let start_op = self.candidate_start_op(&query)?; - let count = match query { - MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { count, .. } => count, - }; - - let messages = self.load_messages_from_storage(start_op, count).await; - - let batch_set = Self::messages_to_batch_set(&messages); - let result = match query { - MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), - MessageLookup::Timestamp { timestamp, count } => { - batch_set.get_by_timestamp(timestamp, count) - } - }; + let result = self.load_messages_from_storage(start_op, query).await; if result.is_empty() { None diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index 1f18e32ed..4747d0ed5 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -17,51 +17,23 @@ #![allow(clippy::future_not_send)] +mod frozen_messages_writer; mod iggy_partition; mod iggy_partitions; mod journal; mod log; +mod send_messages2; mod types; -use bytes::{Bytes, BytesMut}; -use iggy_common::{ - INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, - PooledBuffer, header::PrepareHeader, message::Message, -}; +use iggy_common::{IggyError, header::PrepareHeader, message::Message}; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; +pub use send_messages2::{IggyMessage2, IggyMessage2Header, IggyMessages2}; pub use types::{ AppendResult, PartitionOffsets, PartitionsConfig, PollingArgs, PollingConsumer, SendMessagesResult, }; -pub(crate) fn decode_send_messages_batch(body: Bytes) -> Option<IggyMessagesBatchMut> { - // TODO: This very is bad, IGGY-114 Fixes this. - let mut body = body - .try_into_mut() - .unwrap_or_else(|body| BytesMut::from(body.as_ref())); - - if body.len() < 4 { - return None; - } - - let count_bytes = body.split_to(4); - let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); - let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; - - if body.len() < indexes_len { - return None; - } - - let indexes_bytes = body.split_to(indexes_len); - let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); - let messages = PooledBuffer::from(body); - - Some(IggyMessagesBatchMut::from_indexes_and_messages( - indexes, messages, - )) -} - /// Partition-level data plane operations. /// /// `send_messages` MUST only append to the partition journal (prepare phase), @@ -76,7 +48,7 @@ pub trait Partition { &self, consumer: PollingConsumer, args: PollingArgs, - ) -> impl Future<Output = Result<IggyMessagesBatchSet, IggyError>> { + ) -> impl Future<Output = Result<IggyMessages2, IggyError>> { let _ = (consumer, args); async { Err(IggyError::FeatureUnavailable) } } diff --git a/core/partitions/src/send_messages2.rs b/core/partitions/src/send_messages2.rs new file mode 100644 index 000000000..4e31c33d2 --- /dev/null +++ b/core/partitions/src/send_messages2.rs @@ -0,0 +1,552 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::{BufMut, Bytes, BytesMut}; +use iggy_common::{ + INDEX_SIZE, IggyError, calculate_checksum, + header::{PrepareHeader, RequestHeader}, + message::Message, + random_id, + sharding::IggyNamespace, +}; +use iobuf::{Owned, TryMerge}; + +const MESSAGE_ALIGN: usize = 4096; +pub const COMMAND_HEADER_SIZE: usize = 256; +pub const PREPARE_SPLIT_POINT: usize = 512; +const MESSAGE_HEADER_SIZE: usize = 48; +const LEGACY_MESSAGE_HEADER_SIZE: usize = 64; +const SEND_MESSAGES2_MAGIC: u32 = u32::from_le_bytes(*b"SMG2"); +const SEND_MESSAGES2_VERSION: u16 = 1; + +#[derive(Debug, Clone, Copy, Default)] +pub struct SendMessages2Header { + pub magic: u32, + pub version: u16, + pub flags: u16, + pub partition_id: u64, + pub base_offset: u64, + pub base_timestamp: u64, + pub origin_timestamp: u64, + pub record_count: u32, + pub body_len: u32, +} + +impl SendMessages2Header { + pub fn new(partition_id: u64, origin_timestamp: u64, record_count: u32, body_len: u32) -> Self { + Self { + magic: SEND_MESSAGES2_MAGIC, + version: SEND_MESSAGES2_VERSION, + flags: 0, + partition_id, + base_offset: 0, + base_timestamp: 0, + origin_timestamp, + record_count, + body_len, + } + } + + pub fn decode(bytes: &[u8]) -> Result<Self, IggyError> { + if bytes.len() < COMMAND_HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + + let magic = read_u32(bytes, 0)?; + let version = read_u16(bytes, 4)?; + if magic != SEND_MESSAGES2_MAGIC || version != SEND_MESSAGES2_VERSION { + return Err(IggyError::InvalidCommand); + } + + Ok(Self { + magic, + version, + flags: read_u16(bytes, 6)?, + partition_id: read_u64(bytes, 8)?, + base_offset: read_u64(bytes, 16)?, + base_timestamp: read_u64(bytes, 24)?, + origin_timestamp: read_u64(bytes, 32)?, + record_count: read_u32(bytes, 40)?, + body_len: read_u32(bytes, 44)?, + }) + } + + pub fn encode_into(&self, bytes: &mut [u8]) { + assert!(bytes.len() >= COMMAND_HEADER_SIZE); + bytes[..COMMAND_HEADER_SIZE].fill(0); + bytes[0..4].copy_from_slice(&self.magic.to_le_bytes()); + bytes[4..6].copy_from_slice(&self.version.to_le_bytes()); + bytes[6..8].copy_from_slice(&self.flags.to_le_bytes()); + bytes[8..16].copy_from_slice(&self.partition_id.to_le_bytes()); + bytes[16..24].copy_from_slice(&self.base_offset.to_le_bytes()); + bytes[24..32].copy_from_slice(&self.base_timestamp.to_le_bytes()); + bytes[32..40].copy_from_slice(&self.origin_timestamp.to_le_bytes()); + bytes[40..44].copy_from_slice(&self.record_count.to_le_bytes()); + bytes[44..48].copy_from_slice(&self.body_len.to_le_bytes()); + } + + pub fn total_size(&self) -> usize { + COMMAND_HEADER_SIZE + self.body_len as usize + } +} + +#[derive(Debug, Clone)] +pub struct SendMessages2Owned { + pub header: SendMessages2Header, + pub blob: Bytes, +} + +impl SendMessages2Owned { + pub fn from_legacy_request(namespace: IggyNamespace, body: &[u8]) -> Result<Self, IggyError> { + let (message_count, messages) = legacy_messages_slice(body)?; + let mut parsed = Vec::with_capacity(message_count as usize); + let mut origin_timestamp = u64::MAX; + let mut cursor = 0usize; + + while cursor < messages.len() && parsed.len() < message_count as usize { + let legacy = LegacyMessageRef::decode(&messages[cursor..])?; + origin_timestamp = origin_timestamp.min(legacy.origin_timestamp); + cursor += legacy.total_size; + parsed.push(legacy); + } + + if parsed.len() != message_count as usize || cursor != messages.len() { + return Err(IggyError::InvalidCommand); + } + + if origin_timestamp == u64::MAX { + origin_timestamp = 0; + } + + let mut blob = BytesMut::with_capacity(messages.len()); + for (index, legacy) in parsed.iter().enumerate() { + let id = if legacy.id == 0 { + random_id::get_uuid() + } else { + legacy.id + }; + let timestamp_delta = legacy + .origin_timestamp + .checked_sub(origin_timestamp) + .and_then(|delta| u32::try_from(delta).ok()) + .ok_or(IggyError::InvalidCommand)?; + + let mut header = [0u8; MESSAGE_HEADER_SIZE]; + header[8..24].copy_from_slice(&id.to_le_bytes()); + header[24..28].copy_from_slice(&(index as u32).to_le_bytes()); + header[28..32].copy_from_slice(×tamp_delta.to_le_bytes()); + header[32..36].copy_from_slice(&(legacy.user_headers.len() as u32).to_le_bytes()); + header[36..40].copy_from_slice(&(legacy.payload.len() as u32).to_le_bytes()); + + let checksum = + calculate_checksum_parts(&header[8..], legacy.user_headers, legacy.payload); + header[0..8].copy_from_slice(&checksum.to_le_bytes()); + + blob.extend_from_slice(&header); + blob.extend_from_slice(legacy.user_headers); + blob.extend_from_slice(legacy.payload); + } + + let blob = blob.freeze(); + let header = SendMessages2Header::new( + namespace.partition_id() as u64, + origin_timestamp, + message_count, + blob.len() as u32, + ); + + Ok(Self { header, blob }) + } + + pub fn encode_request( + self, + request_header: RequestHeader, + ) -> Result<Message<RequestHeader>, IggyError> { + let total_size = std::mem::size_of::<RequestHeader>() + self.header.total_size(); + let mut buffer = Owned::<MESSAGE_ALIGN>::zeroed(total_size); + let bytes = buffer.as_mut_slice(); + bytes[0..std::mem::size_of::<RequestHeader>()] + .copy_from_slice(bytemuck::bytes_of(&request_header)); + self.header.encode_into( + &mut bytes[std::mem::size_of::<RequestHeader>() + ..std::mem::size_of::<RequestHeader>() + COMMAND_HEADER_SIZE], + ); + bytes[PREPARE_SPLIT_POINT..PREPARE_SPLIT_POINT + self.blob.len()] + .copy_from_slice(&self.blob); + + Message::try_from(buffer.split_at(PREPARE_SPLIT_POINT)) + .map_err(|_| IggyError::InvalidCommand) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct IggyMessage2Header { + pub checksum: u64, + pub id: u128, + pub offset: u64, + pub timestamp: u64, + pub origin_timestamp: u64, + pub user_headers_length: u32, + pub payload_length: u32, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct IggyMessage2 { + pub header: IggyMessage2Header, + pub payload: Bytes, + pub user_headers: Option<Bytes>, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct IggyMessages2 { + messages: Vec<IggyMessage2>, +} + +impl IggyMessages2 { + pub fn empty() -> Self { + Self::default() + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + messages: Vec::with_capacity(capacity), + } + } + + pub fn push(&mut self, message: IggyMessage2) { + self.messages.push(message); + } + + pub fn count(&self) -> u32 { + self.messages.len() as u32 + } + + pub fn is_empty(&self) -> bool { + self.messages.is_empty() + } + + pub fn first_offset(&self) -> Option<u64> { + self.messages.first().map(|message| message.header.offset) + } + + pub fn last_offset(&self) -> Option<u64> { + self.messages.last().map(|message| message.header.offset) + } + + pub fn limit(self, count: u32) -> Self { + let mut messages = self.messages; + messages.truncate(count as usize); + Self { messages } + } + + pub fn iter(&self) -> std::slice::Iter<'_, IggyMessage2> { + self.messages.iter() + } +} + +impl IntoIterator for IggyMessages2 { + type Item = IggyMessage2; + type IntoIter = std::vec::IntoIter<IggyMessage2>; + + fn into_iter(self) -> Self::IntoIter { + self.messages.into_iter() + } +} + +impl<'a> IntoIterator for &'a IggyMessages2 { + type Item = &'a IggyMessage2; + type IntoIter = std::slice::Iter<'a, IggyMessage2>; + + fn into_iter(self) -> Self::IntoIter { + self.messages.iter() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SendMessages2Ref<'a> { + pub header: SendMessages2Header, + blob: &'a [u8], +} + +impl<'a> SendMessages2Ref<'a> { + pub fn iter(&self) -> SendMessages2Iterator<'a> { + SendMessages2Iterator { + blob: self.blob, + position: 0, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SendMessages2MessageHeader { + pub checksum: u64, + pub id: u128, + pub offset_delta: u32, + pub timestamp_delta: u32, + pub user_headers_length: u32, + pub payload_length: u32, +} + +impl SendMessages2MessageHeader { + fn decode(bytes: &[u8]) -> Result<Self, IggyError> { + if bytes.len() < MESSAGE_HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + + let reserved = read_u64(bytes, 40)?; + if reserved != 0 { + return Err(IggyError::InvalidCommand); + } + + Ok(Self { + checksum: read_u64(bytes, 0)?, + id: read_u128(bytes, 8)?, + offset_delta: read_u32(bytes, 24)?, + timestamp_delta: read_u32(bytes, 28)?, + user_headers_length: read_u32(bytes, 32)?, + payload_length: read_u32(bytes, 36)?, + }) + } + + fn total_size(&self) -> usize { + MESSAGE_HEADER_SIZE + self.user_headers_length as usize + self.payload_length as usize + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SendMessages2MessageView<'a> { + pub header: SendMessages2MessageHeader, + pub user_headers: &'a [u8], + pub payload: &'a [u8], +} + +impl SendMessages2MessageView<'_> { + pub fn to_owned(&self, batch: &SendMessages2Header) -> IggyMessage2 { + IggyMessage2 { + header: IggyMessage2Header { + checksum: self.header.checksum, + id: self.header.id, + offset: batch.base_offset + u64::from(self.header.offset_delta), + timestamp: batch.base_timestamp, + origin_timestamp: batch.origin_timestamp + u64::from(self.header.timestamp_delta), + user_headers_length: self.header.user_headers_length, + payload_length: self.header.payload_length, + }, + payload: Bytes::copy_from_slice(self.payload), + user_headers: (!self.user_headers.is_empty()) + .then(|| Bytes::copy_from_slice(self.user_headers)), + } + } +} + +pub struct SendMessages2Iterator<'a> { + blob: &'a [u8], + position: usize, +} + +impl<'a> Iterator for SendMessages2Iterator<'a> { + type Item = SendMessages2MessageView<'a>; + + fn next(&mut self) -> Option<Self::Item> { + if self.position >= self.blob.len() { + return None; + } + + let header = SendMessages2MessageHeader::decode(&self.blob[self.position..]).ok()?; + let start = self.position + MESSAGE_HEADER_SIZE; + let headers_end = start + header.user_headers_length as usize; + let payload_end = headers_end + header.payload_length as usize; + let user_headers = self.blob.get(start..headers_end)?; + let payload = self.blob.get(headers_end..payload_end)?; + self.position += header.total_size(); + Some(SendMessages2MessageView { + header, + user_headers, + payload, + }) + } +} + +pub fn convert_request_message( + namespace: IggyNamespace, + message: Message<RequestHeader>, +) -> Result<Message<RequestHeader>, IggyError> { + let request_header = *message.header(); + let total_size = request_header.size as usize; + let owned = unsafe { message.into_inner().try_merge() } + .expect("request conversion expects a unique message buffer"); + let body = &owned.as_slice()[std::mem::size_of::<RequestHeader>()..total_size]; + SendMessages2Owned::from_legacy_request(namespace, body)?.encode_request(request_header) +} + +pub fn decode_prepare_slice(bytes: &[u8]) -> Result<SendMessages2Ref<'_>, IggyError> { + let header_size = std::mem::size_of::<PrepareHeader>(); + if bytes.len() < header_size { + return Err(IggyError::InvalidCommand); + } + + let prepare = bytemuck::checked::try_from_bytes::<PrepareHeader>(&bytes[..header_size]) + .map_err(|_| IggyError::InvalidCommand)?; + let total_size = prepare.size as usize; + if bytes.len() < total_size { + return Err(IggyError::InvalidCommand); + } + + let body = &bytes[header_size..total_size]; + if body.len() < COMMAND_HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + + let header = SendMessages2Header::decode(&body[..COMMAND_HEADER_SIZE])?; + let blob = &body[COMMAND_HEADER_SIZE..]; + if blob.len() < header.body_len as usize { + return Err(IggyError::InvalidCommand); + } + + Ok(SendMessages2Ref { + header, + blob: &blob[..header.body_len as usize], + }) +} + +pub fn stamp_prepare_for_persistence( + message: Message<PrepareHeader>, + base_offset: u64, + base_timestamp: u64, +) -> Result<(Message<PrepareHeader>, SendMessages2Header), IggyError> { + let (mut prefix, tail) = message.into_inner(); + if prefix.len() < PREPARE_SPLIT_POINT { + return Err(IggyError::InvalidCommand); + } + + let header_offset = std::mem::size_of::<PrepareHeader>(); + let header_bytes = &mut prefix[header_offset..header_offset + COMMAND_HEADER_SIZE]; + let mut command = SendMessages2Header::decode(header_bytes)?; + command.base_offset = base_offset; + command.base_timestamp = base_timestamp; + command.encode_into(header_bytes); + + let message = Message::from_inner((prefix, tail)).map_err(|_| IggyError::InvalidCommand)?; + Ok((message, command)) +} + +fn legacy_messages_slice(body: &[u8]) -> Result<(u32, &[u8]), IggyError> { + if body.len() < 4 { + return Err(IggyError::InvalidCommand); + } + + let metadata_length = read_u32(body, 0)? as usize; + let metadata_end = 4usize + .checked_add(metadata_length) + .ok_or(IggyError::InvalidCommand)?; + if metadata_end < 4 || body.len() < metadata_end { + return Err(IggyError::InvalidCommand); + } + + let message_count = read_u32(body, metadata_end - 4)?; + let indexes_len = usize::try_from(message_count) + .ok() + .and_then(|count| count.checked_mul(INDEX_SIZE)) + .ok_or(IggyError::InvalidCommand)?; + let messages_start = metadata_end + .checked_add(indexes_len) + .ok_or(IggyError::InvalidCommand)?; + if body.len() < messages_start { + return Err(IggyError::InvalidCommand); + } + + Ok((message_count, &body[messages_start..])) +} + +#[derive(Clone, Copy)] +struct LegacyMessageRef<'a> { + id: u128, + origin_timestamp: u64, + user_headers: &'a [u8], + payload: &'a [u8], + total_size: usize, +} + +impl<'a> LegacyMessageRef<'a> { + fn decode(bytes: &'a [u8]) -> Result<Self, IggyError> { + if bytes.len() < LEGACY_MESSAGE_HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + + let user_headers_length = read_u32(bytes, 48)? as usize; + let payload_length = read_u32(bytes, 52)? as usize; + let total_size = LEGACY_MESSAGE_HEADER_SIZE + .checked_add(payload_length) + .and_then(|size| size.checked_add(user_headers_length)) + .ok_or(IggyError::InvalidCommand)?; + if bytes.len() < total_size { + return Err(IggyError::InvalidCommand); + } + + let payload_start = LEGACY_MESSAGE_HEADER_SIZE; + let payload_end = payload_start + payload_length; + let headers_end = payload_end + user_headers_length; + + Ok(Self { + id: read_u128(bytes, 8)?, + origin_timestamp: read_u64(bytes, 40)?, + user_headers: &bytes[payload_end..headers_end], + payload: &bytes[payload_start..payload_end], + total_size, + }) + } +} + +fn calculate_checksum_parts(header_tail: &[u8], user_headers: &[u8], payload: &[u8]) -> u64 { + let mut bytes = BytesMut::with_capacity(header_tail.len() + user_headers.len() + payload.len()); + bytes.put_slice(header_tail); + bytes.put_slice(user_headers); + bytes.put_slice(payload); + calculate_checksum(&bytes.freeze()) +} + +fn read_u16(bytes: &[u8], offset: usize) -> Result<u16, IggyError> { + bytes + .get(offset..offset + 2) + .and_then(|slice| slice.try_into().ok()) + .map(u16::from_le_bytes) + .ok_or(IggyError::InvalidNumberEncoding) +} + +fn read_u32(bytes: &[u8], offset: usize) -> Result<u32, IggyError> { + bytes + .get(offset..offset + 4) + .and_then(|slice| slice.try_into().ok()) + .map(u32::from_le_bytes) + .ok_or(IggyError::InvalidNumberEncoding) +} + +fn read_u64(bytes: &[u8], offset: usize) -> Result<u64, IggyError> { + bytes + .get(offset..offset + 8) + .and_then(|slice| slice.try_into().ok()) + .map(u64::from_le_bytes) + .ok_or(IggyError::InvalidNumberEncoding) +} + +fn read_u128(bytes: &[u8], offset: usize) -> Result<u128, IggyError> { + bytes + .get(offset..offset + 16) + .and_then(|slice| slice.try_into().ok()) + .map(u128::from_le_bytes) + .ok_or(IggyError::InvalidNumberEncoding) +} diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml index 559f5fae9..5e4d36d42 100644 --- a/core/simulator/Cargo.toml +++ b/core/simulator/Cargo.toml @@ -27,6 +27,7 @@ consensus = { path = "../consensus" } enumset = { workspace = true } futures = { workspace = true } iggy_common = { path = "../common" } +iobuf = { workspace = true } journal = { path = "../journal" } message_bus = { path = "../message_bus" } metadata = { path = "../metadata" } diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs index 371258aef..adca4ca80 100644 --- a/core/simulator/src/bus.rs +++ b/core/simulator/src/bus.rs @@ -94,7 +94,7 @@ impl MessageBus for MemBus { &self, client_id: Self::Client, message: Self::Data, - ) -> Result<(), IggyError> { + ) -> Result<Self::Data, IggyError> { if !self.clients.lock().unwrap().contains(&client_id) { #[allow(clippy::cast_possible_truncation)] return Err(IggyError::ClientNotFound(client_id as u32)); @@ -104,17 +104,17 @@ impl MessageBus for MemBus { from_replica: None, to_replica: None, to_client: Some(client_id), - message, + message: message.clone(), }); - Ok(()) + Ok(message) } async fn send_to_replica( &self, replica: Self::Replica, message: Self::Data, - ) -> Result<(), IggyError> { + ) -> Result<Self::Data, IggyError> { if !self.replicas.lock().unwrap().contains(&replica) { return Err(IggyError::ResourceNotFound(format!("Replica {replica}"))); } @@ -123,10 +123,10 @@ impl MessageBus for MemBus { from_replica: None, to_replica: Some(replica), to_client: None, - message, + message: message.clone(), }); - Ok(()) + Ok(message) } } @@ -167,7 +167,7 @@ impl MessageBus for SharedMemBus { &self, client_id: Self::Client, message: Self::Data, - ) -> Result<(), IggyError> { + ) -> Result<Self::Data, IggyError> { self.0.send_to_client(client_id, message).await } @@ -175,7 +175,7 @@ impl MessageBus for SharedMemBus { &self, replica: Self::Replica, message: Self::Data, - ) -> Result<(), IggyError> { + ) -> Result<Self::Data, IggyError> { self.0.send_to_replica(replica, message).await } } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 6dc9e1eab..23c57915d 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -23,6 +23,7 @@ use iggy_common::{ message::Message, sharding::IggyNamespace, }; +use iobuf::Owned; use std::cell::Cell; // TODO: Proper client which implements the full client SDK API @@ -130,8 +131,6 @@ impl SimClient { payload: &[u8], namespace: IggyNamespace, ) -> Message<RequestHeader> { - use bytes::Bytes; - let header_size = std::mem::size_of::<RequestHeader>(); let total_size = header_size + payload.len(); @@ -159,14 +158,12 @@ impl SimClient { buffer.extend_from_slice(header_bytes); buffer.extend_from_slice(payload); - Message::<RequestHeader>::from_bytes(Bytes::from(buffer)) - .expect("failed to build request message") + Message::try_from(Owned::<4096>::copy_from_slice(&buffer).split_at(header_size)) + .expect("request buffer must contain a valid request message") } #[allow(clippy::cast_possible_truncation)] fn build_request(&self, operation: Operation, payload: &[u8]) -> Message<RequestHeader> { - use bytes::Bytes; - let header_size = std::mem::size_of::<RequestHeader>(); let total_size = header_size + payload.len(); @@ -193,7 +190,7 @@ impl SimClient { buffer.extend_from_slice(header_bytes); buffer.extend_from_slice(payload); - Message::<RequestHeader>::from_bytes(Bytes::from(buffer)) - .expect("failed to build request message") + Message::try_from(Owned::<4096>::copy_from_slice(&buffer).split_at(header_size)) + .expect("request buffer must contain a valid request message") } } diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 9f4d64b79..a7f0a9630 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use bytes::Bytes; use iggy_common::header::PrepareHeader; use iggy_common::message::Message; use iggy_common::variadic; +use iobuf::{Owned, TryMerge}; use journal::{Journal, JournalHandle, Storage}; use metadata::MuxStateMachine; use metadata::stm::consumer_group::ConsumerGroups; @@ -108,8 +108,10 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> { let offset = *offsets.get(&header.op)?; let buffer = self.storage.read(offset, header.size as usize).await; - let message = - Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes should be valid"); + let message = Message::try_from( + Owned::<4096>::copy_from_slice(&buffer).split_at(std::mem::size_of::<PrepareHeader>()), + ) + .expect("prepare buffer must contain a valid prepare message"); Some(message) } @@ -122,9 +124,10 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> { async fn append(&self, entry: Self::Entry) { let header = *entry.header(); - let message_bytes = entry.as_bytes(); + let message_bytes = unsafe { entry.into_inner().try_merge() } + .expect("simulator journal append expects a unique message buffer"); - let bytes_written = self.storage.write(message_bytes.to_vec()).await; + let bytes_written = self.storage.write(message_bytes.as_slice().to_vec()).await; let offset = self.write_offset.get(); unsafe { &mut *self.headers.get() }.insert(header.op, header); diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index 5e64aca81..093f11aae 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -25,12 +25,12 @@ pub mod replica; use bus::MemBus; use consensus::PartitionsHandle; +use iggy_common::IggyError; use iggy_common::header::ReplyHeader; use iggy_common::message::Message; use iggy_common::sharding::IggyNamespace; -use iggy_common::{IggyError, IggyMessagesBatchSet}; use message_bus::MessageBus; -use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer}; +use partitions::{IggyMessages2, Partition, PartitionOffsets, PollingArgs, PollingConsumer}; use replica::{Replica, new_replica}; use std::sync::Arc; @@ -155,7 +155,7 @@ impl Simulator { namespace: IggyNamespace, consumer: PollingConsumer, args: PollingArgs, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessages2, IggyError> { let replica = &self.replicas[replica_idx]; let partition = replica diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs index 9aba6e17a..95e84d617 100644 --- a/core/simulator/src/packet.rs +++ b/core/simulator/src/packet.rs @@ -744,7 +744,11 @@ mod tests { let header: &mut GenericHeader = bytemuck::checked::try_from_bytes_mut(&mut buf).expect("zeroed bytes are valid"); header.command = command; - Message::<GenericHeader>::from_bytes(bytes::Bytes::from(buf)).unwrap() + Message::try_from( + iobuf::Owned::<4096>::copy_from_slice(&buf) + .split_at(std::mem::size_of::<GenericHeader>()), + ) + .expect("generic test buffer must contain a valid generic message") } /// Helper: disable all links to/from a given replica (isolate it).
