This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partition_redesign in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 11bc1c9e8a1fd81e5dd5810bcea2d3b4e5988b0f Author: numinex <[email protected]> AuthorDate: Fri Mar 20 07:15:26 2026 +0100 restructure the file --- core/buf/src/lib.rs | 328 ++++++++++++++++++++++++++++------------------------ 1 file changed, 176 insertions(+), 152 deletions(-) diff --git a/core/buf/src/lib.rs b/core/buf/src/lib.rs index d24d6e470..5d4075d74 100644 --- a/core/buf/src/lib.rs +++ b/core/buf/src/lib.rs @@ -5,136 +5,6 @@ use std::sync::atomic::{AtomicUsize, Ordering, fence}; use aligned_vec::{AVec, ConstAlign}; -#[repr(C, align(64))] -struct ControlBlock { - ref_count: AtomicUsize, - base: NonNull<u8>, - len: usize, - capacity: usize, - _pad: [u8; 32], -} - -impl ControlBlock { - fn new(base: NonNull<u8>, len: usize, capacity: usize) -> NonNull<Self> { - let ctrl = Box::new(ControlBlock { - ref_count: AtomicUsize::new(1), - base, - len, - capacity, - _pad: [0; 32], - }); - // SAFETY: Box::into_raw returns a valid pointer - unsafe { NonNull::new_unchecked(Box::into_raw(ctrl)) } - } -} - -#[derive(Copy)] -struct Extent { - ptr: NonNull<u8>, - len: usize, - ctrlb: NonNull<ControlBlock>, - _pad: usize, -} - -impl Extent { - fn as_slice(&self) -> &[u8] { - // SAFETY: ptr and len describe a valid allocation - unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } - } - - unsafe fn as_mut_slice(&mut self) -> &mut [u8] { - // SAFETY: caller guarantees exclusive access - unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } - } - - fn copy_from_slice<const ALIGN: usize>(src: &[u8]) -> Self { - let mut v: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN); - v.extend_from_slice(src); - - let (ptr, _, len, capacity) = v.into_raw_parts(); - let data = unsafe { NonNull::new_unchecked(ptr) }; - - let ctrlb = ControlBlock::new(data, len, capacity); - - Extent { - ptr: data, - len, - ctrlb, - _pad: 0, - } - } -} - -impl Clone for Extent { - fn clone(&self) -> Self { - // SAFETY: `self.ctrlb` points to a live control block while `self` is alive. - unsafe { - self.ctrlb - .as_ref() - .ref_count - .fetch_add(1, Ordering::Relaxed); - } - *self - } -} - -unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb: NonNull<ControlBlock>) { - // SAFETY: ctrlb is valid per function preconditions - let old = unsafe { ctrlb.as_ref() } - .ref_count - .fetch_sub(1, Ordering::Release); - debug_assert!(old > 0, "control block refcount underflow"); - - if old != 1 { - return; - } - - // This fence is needed to prevent reordering of use of the data and - // deletion of the data. Because it is marked `Release`, the decreasing - // of the reference count synchronizes with this `Acquire` fence. This - // means that use of the data happens before decreasing the reference - // count, which happens before this fence, which happens before the - // deletion of the data. - // - // As explained in the [Boost documentation][1], - // - // > It is important to enforce any possible access to the object in one - // > thread (through an existing reference) to *happen before* deleting - // > the object in a different thread. This is achieved by a "release" - // > operation after dropping a reference (any access to the object - // > through this reference must obviously happened before), and an - // > "acquire" operation before deleting the object. - // - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - // - fence(Ordering::Acquire); - - // SAFETY: refcount is zero, we have exclusive ownership - let ctrlb = unsafe { Box::from_raw(ctrlb.as_ptr()) }; - - // SAFETY: `ctrlb.base`, `ctrlb.len` and `ctrlb.capacity` were captured from an `AVec` - // allocation. We reconstruct the AVec and let it deallocate properly. - let _ = unsafe { - AVec::<u8, ConstAlign<ALIGN>>::from_raw_parts( - ctrlb.base.as_ptr(), - ALIGN, - ctrlb.len, - ctrlb.capacity, - ) - }; -} - -unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) -> ControlBlock { - debug_assert_eq!( - // SAFETY: caller guarantees `ctrlb` points to a live control block. - unsafe { ctrlb.as_ref() }.ref_count.load(Ordering::Acquire), - 1 - ); - - // SAFETY: caller guarantees uniqueness, so ownership of the control block can be reclaimed directly. - unsafe { *Box::from_raw(ctrlb.as_ptr()) } -} - #[derive(Debug)] pub struct Owned<const ALIGN: usize = 4096> { inner: AVec<u8, ConstAlign<ALIGN>>, @@ -180,7 +50,7 @@ impl<const ALIGN: usize> Owned<ALIGN> { let ctrlb = ControlBlock::new(base, len, capacity); TwoHalves { - buf: ( + inner: ( Extent { ptr: base, len: split_at, @@ -199,36 +69,44 @@ impl<const ALIGN: usize> Owned<ALIGN> { } pub struct TwoHalves<const ALIGN: usize> { - buf: (Extent, Extent), + inner: (Extent, Extent), } impl<const ALIGN: usize> TwoHalves<ALIGN> { pub fn head(&self) -> &[u8] { - self.buf.0.as_slice() + self.inner.0.as_slice() } pub fn head_mut(&mut self) -> &mut [u8] { // SAFETY: We are accessing the head half mutably, this is the only correct operation, as the head is not shared between clones, // instead it gets copied. - unsafe { self.buf.0.as_mut_slice() } + unsafe { self.inner.0.as_mut_slice() } } pub fn tail(&self) -> &[u8] { - self.buf.1.as_slice() + self.inner.1.as_slice() } pub fn split_at(&self) -> usize { - self.buf.0.len + self.inner.0.len } pub fn total_len(&self) -> usize { - self.buf.0.len + self.buf.1.len + self.inner.0.len + self.inner.1.len } pub fn is_unique(&self) -> bool { - // `buf.1` is the authoritative owner of the original frame allocation. - // SAFETY: `buf.1.ctrlb` points to a live control block while `self` is alive. - unsafe { self.buf.1.ctrlb.as_ref().ref_count.load(Ordering::Acquire) == 1 } + // `inner.1` is the authoritative owner of the original frame allocation. + // SAFETY: `inner.1.ctrlb` points to a live control block while `self` is alive. + unsafe { + self.inner + .1 + .ctrlb + .as_ref() + .ref_count + .load(Ordering::Acquire) + == 1 + } } pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> { @@ -238,8 +116,8 @@ impl<const ALIGN: usize> TwoHalves<ALIGN> { // Transfer ownership to prevent double-free let this = ManuallyDrop::new(self); - let head = this.buf.0; - let tail = this.buf.1; + let head = this.inner.0; + let tail = this.inner.1; let split_at = head.len; // SAFETY: `tail.ctrlb` is unique at this point, @@ -269,9 +147,9 @@ impl<const ALIGN: usize> TwoHalves<ALIGN> { impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> { fn clone(&self) -> Self { Self { - buf: ( + inner: ( Extent::copy_from_slice::<ALIGN>(self.head()), - self.buf.1.clone(), + self.inner.1.clone(), ), } } @@ -279,15 +157,15 @@ impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> { impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> { fn drop(&mut self) { - // SAFETY: `buf.0.ctrlb` / `buf.1.ctrlb` point to live control blocks while `self` is alive. - let ctrlb_eq = std::ptr::addr_eq(self.buf.0.ctrlb.as_ptr(), self.buf.1.ctrlb.as_ptr()); + // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control blocks while `self` is alive. + let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), self.inner.1.ctrlb.as_ptr()); unsafe { if ctrlb_eq { - release_control_block_w_allocation::<ALIGN>(self.buf.1.ctrlb); + release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb); } else { // Different control blocks, release both - release_control_block_w_allocation::<ALIGN>(self.buf.0.ctrlb); - release_control_block_w_allocation::<ALIGN>(self.buf.1.ctrlb); + release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb); + release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb); } } } @@ -297,13 +175,159 @@ impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("TwoHalves") .field("split_at", &self.split_at()) - .field("head_len", &self.buf.0.len) - .field("tail_len", &self.buf.1.len) - .field("halves_alias", &(self.buf.0.ctrlb == self.buf.1.ctrlb)) + .field("head_len", &self.inner.0.len) + .field("tail_len", &self.inner.1.len) + .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb)) .finish() } } +#[derive(Clone)] +pub struct Frozen<const ALIGN: usize> { + inner: Extent, +} + +impl<const ALIGN: usize> Frozen<ALIGN> { + pub fn as_slice(&self) -> &[u8] { + self.inner.as_slice() + } +} + +#[repr(C, align(64))] +struct ControlBlock { + ref_count: AtomicUsize, + base: NonNull<u8>, + len: usize, + capacity: usize, + _pad: [u8; 32], +} + +impl ControlBlock { + fn new(base: NonNull<u8>, len: usize, capacity: usize) -> NonNull<Self> { + let ctrl = Box::new(ControlBlock { + ref_count: AtomicUsize::new(1), + base, + len, + capacity, + _pad: [0; 32], + }); + // SAFETY: Box::into_raw returns a valid pointer + unsafe { NonNull::new_unchecked(Box::into_raw(ctrl)) } + } +} + +#[derive(Copy)] +struct Extent { + ptr: NonNull<u8>, + len: usize, + ctrlb: NonNull<ControlBlock>, + _pad: usize, +} + +impl Extent { + fn as_slice(&self) -> &[u8] { + // SAFETY: ptr and len describe a valid allocation + unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } + } + + unsafe fn as_mut_slice(&mut self) -> &mut [u8] { + // SAFETY: caller guarantees exclusive access + unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } + + fn copy_from_slice<const ALIGN: usize>(src: &[u8]) -> Self { + let mut v: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN); + v.extend_from_slice(src); + + let (ptr, _, len, capacity) = v.into_raw_parts(); + let data = unsafe { NonNull::new_unchecked(ptr) }; + + let ctrlb = ControlBlock::new(data, len, capacity); + + Extent { + ptr: data, + len, + ctrlb, + _pad: 0, + } + } +} + +impl Clone for Extent { + fn clone(&self) -> Self { + // SAFETY: `self.ctrlb` points to a live control block while `self` is alive. + unsafe { + self.ctrlb + .as_ref() + .ref_count + .fetch_add(1, Ordering::Relaxed); + } + *self + } +} + +unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb: NonNull<ControlBlock>) { + // SAFETY: ctrlb is valid per function preconditions + let old = unsafe { ctrlb.as_ref() } + .ref_count + .fetch_sub(1, Ordering::Release); + debug_assert!(old > 0, "control block refcount underflow"); + + if old != 1 { + return; + } + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the decreasing + // of the reference count synchronizes with this `Acquire` fence. This + // means that use of the data happens before decreasing the reference + // count, which happens before this fence, which happens before the + // deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // > It is important to enforce any possible access to the object in one + // > thread (through an existing reference) to *happen before* deleting + // > the object in a different thread. This is achieved by a "release" + // > operation after dropping a reference (any access to the object + // > through this reference must obviously happened before), and an + // > "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + // + fence(Ordering::Acquire); + + // SAFETY: refcount is zero, we have exclusive ownership + let ctrlb = unsafe { Box::from_raw(ctrlb.as_ptr()) }; + + // SAFETY: `ctrlb.base`, `ctrlb.len` and `ctrlb.capacity` were captured from an `AVec` + // allocation. We reconstruct the AVec and let it deallocate properly. + let _ = unsafe { + AVec::<u8, ConstAlign<ALIGN>>::from_raw_parts( + ctrlb.base.as_ptr(), + ALIGN, + ctrlb.len, + ctrlb.capacity, + ) + }; +} + +unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) -> ControlBlock { + debug_assert_eq!( + // SAFETY: caller guarantees `ctrlb` points to a live control block. + unsafe { ctrlb.as_ref() }.ref_count.load(Ordering::Acquire), + 1 + ); + + // SAFETY: caller guarantees uniqueness, so ownership of the control block can be reclaimed directly. + unsafe { *Box::from_raw(ctrlb.as_ptr()) } +} + +// ============================================================================= +// Tests +// ============================================================================= + +// TODO: Better tests & miri. #[cfg(test)] mod tests { use super::Owned;
