This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new af01a3479 ci: add Miri UB detector for binary_protocol and consensus
(#3284)
af01a3479 is described below
commit af01a3479a6954fe9f9c68dd118dbc37b667368d
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed May 20 19:53:38 2026 +0530
ci: add Miri UB detector for binary_protocol and consensus (#3284)
- Adds a `miri` task to the pre-merge action, scoped to
`iggy_binary_protocol` and `consensus` (the unsafe-heavy crates that
don't link compio's io_uring driver). Pinned
`nightly-2026-04-21`, `-Zmiri-tree-borrows -Zmiri-strict-provenance`,
separate cache key.
- New `rust-miri` component triggers on `core/binary_protocol/**` and
`core/consensus/**`. Mirrored in `justfile` as `just miri` for local
runs.
---
.github/actions/rust/pre-merge/action.yml | 29 +-
.github/config/components.yml | 18 ++
core/binary_protocol/src/consensus/command.rs | 5 +-
core/binary_protocol/src/consensus/header.rs | 20 +-
core/binary_protocol/src/consensus/iobuf.rs | 448 +++++++++++++++++++++++++-
core/binary_protocol/src/consensus/message.rs | 303 +++++++++++++++--
core/common/src/alloc/buffer.rs | 165 ++++++++++
core/common/src/types/send_messages2.rs | 70 +++-
core/journal/src/prepare_journal.rs | 57 +++-
justfile | 12 +
10 files changed, 1083 insertions(+), 44 deletions(-)
diff --git a/.github/actions/rust/pre-merge/action.yml
b/.github/actions/rust/pre-merge/action.yml
index 11a429dc5..b09d036e6 100644
--- a/.github/actions/rust/pre-merge/action.yml
+++ b/.github/actions/rust/pre-merge/action.yml
@@ -20,7 +20,7 @@ description: Rust pre-merge testing and linting github iggy
actions
inputs:
task:
- description: "Task to run (check, fmt, clippy, sort, machete, doctest,
verify-publish, test-1, test-2, compat)"
+ description: "Task to run (check, fmt, clippy, sort, machete, doctest,
verify-publish, test-1, test-2, compat, miri)"
required: true
component:
description: "Component name (for context)"
@@ -45,6 +45,10 @@ runs:
- name: Setup Rust with cache
uses: ./.github/actions/utils/setup-rust-with-cache
with:
+ # Miri builds against a nightly toolchain with a separate `target/miri`
+ # subtree; isolate its cache from the stable `dev` namespace so the
+ # two don't evict each other.
+ shared-key: ${{ inputs.task == 'miri' && 'miri' || 'dev' }}
print-cache-status: ${{ startsWith(inputs.task, 'test-') }}
- name: Install tools for specific tasks
@@ -303,6 +307,29 @@ runs:
--port 8090 --wait-secs 180
shell: bash
+ # Miri (UB detector) on the unsafe-heavy crates that don't pull tokio /
+ # compio. Pinned nightly so MIRIFLAGS behavior is stable across CI runs;
+ # bump the date quarterly. Tree-borrows is the future-default aliasing
+ # model and accepts the legitimate `NonNull::add` arithmetic in
+ # `iobuf.rs` that stacked-borrows would reject. Strict-provenance
+ # rejects int-to-pointer round trips, which we don't use.
+ - name: Install nightly Rust + miri component
+ if: inputs.task == 'miri'
+ run: |
+ rustup toolchain install nightly-2026-04-21 --component miri --profile
minimal --no-self-update
+ cargo +nightly-2026-04-21 miri setup
+ shell: bash
+
+ - name: Run miri
+ if: inputs.task == 'miri'
+ env:
+ MIRIFLAGS: "-Zmiri-tree-borrows -Zmiri-strict-provenance"
+ run: |
+ cargo +nightly-2026-04-21 miri test --locked \
+ -p iggy_binary_protocol \
+ -p consensus
+ shell: bash
+
# aarch64 builds (run on ARM64 runners)
- name: Build aarch64-gnu
if: inputs.task == 'build-aarch64-gnu'
diff --git a/.github/config/components.yml b/.github/config/components.yml
index e7b6dbbbc..2dd9ee3e5 100644
--- a/.github/config/components.yml
+++ b/.github/config/components.yml
@@ -78,6 +78,24 @@ components:
paths:
- "core/binary_protocol/**"
+ # Miri (UB detector) on unsafe-heavy crates that don't pull tokio/compio.
+ # Scoped to `iggy_binary_protocol` (heavy unsafe in `consensus/iobuf.rs`:
+ # ref-counted CoW buffer, manual atomics, pointer arithmetic) plus the
+ # `consensus` crate (zero unsafe but exercises binary_protocol's unsafe
+ # via integration). Other crates (journal, metadata, partitions, common)
+ # instantiate compio, which uses io_uring syscalls Miri cannot emulate.
+ # See the `# Alignment` doc on the `ConsensusHeader` trait for the
+ # alignment contract that motivates this check.
+ rust-miri:
+ depends_on:
+ - "rust-workspace"
+ - "ci-infrastructure"
+ paths:
+ - "core/binary_protocol/**"
+ - "core/consensus/**"
+ tasks:
+ - "miri"
+
rust-server:
depends_on:
- "rust-workspace"
diff --git a/core/binary_protocol/src/consensus/command.rs
b/core/binary_protocol/src/consensus/command.rs
index 660bb61cd..384bf696a 100644
--- a/core/binary_protocol/src/consensus/command.rs
+++ b/core/binary_protocol/src/consensus/command.rs
@@ -57,10 +57,13 @@ unsafe impl CheckedBitPattern for Command2 {
#[cfg(test)]
mod tests {
use crate::consensus::GenericHeader;
+ use aligned_vec::{AVec, ConstAlign};
#[test]
fn invalid_bit_pattern_rejected() {
- let mut buf = bytes::BytesMut::zeroed(256);
+ // 16-byte aligned (see `ConsensusHeader` doc); `BytesMut` fails Miri.
+ let mut buf: AVec<u8, ConstAlign<16>> = AVec::new(16);
+ buf.resize(256, 0);
buf[60] = 99;
let result = bytemuck::checked::try_from_bytes::<GenericHeader>(&buf);
assert!(result.is_err());
diff --git a/core/binary_protocol/src/consensus/header.rs
b/core/binary_protocol/src/consensus/header.rs
index a7dbd6800..650c30772 100644
--- a/core/binary_protocol/src/consensus/header.rs
+++ b/core/binary_protocol/src/consensus/header.rs
@@ -54,6 +54,15 @@ pub fn read_size_field(header: &[u8]) -> Option<u32> {
///
/// Every header is exactly [`HEADER_SIZE`] bytes, `#[repr(C)]`, and supports
/// zero-copy deserialization via `bytemuck`.
+///
+/// # Alignment
+///
+/// All headers contain `u128` fields → 16-byte alignment required by
+/// `bytemuck::checked::try_from_bytes`. Production uses `Owned<MESSAGE_ALIGN>`
+/// / `Frozen<MESSAGE_ALIGN>` (4096-aligned). `Vec<u8>` / `bytes::BytesMut`
+/// request align=1; they over-align under glibc by accident but fail under
+/// strict allocators (Miri, jemalloc, arenas). Use
+/// `aligned_vec::AVec<u8, ConstAlign<16>>` for explicit alignment.
pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
const COMMAND: Command2;
@@ -917,9 +926,14 @@ mod tests {
EvictionReason, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader, ReplyHeader,
RequestHeader, StartViewChangeHeader, StartViewHeader,
};
-
- fn aligned_zeroed(size: usize) -> bytes::BytesMut {
- bytes::BytesMut::zeroed(size)
+ use aligned_vec::{AVec, ConstAlign};
+
+ // bytemuck requires 16-byte alignment (see `ConsensusHeader` trait doc).
+ // `BytesMut::zeroed` works on glibc by accident, fails under Miri.
+ fn aligned_zeroed(size: usize) -> AVec<u8, ConstAlign<16>> {
+ let mut v: AVec<u8, ConstAlign<16>> = AVec::new(16);
+ v.resize(size, 0);
+ v
}
#[test]
diff --git a/core/binary_protocol/src/consensus/iobuf.rs
b/core/binary_protocol/src/consensus/iobuf.rs
index 340a49f06..7175ef1f6 100644
--- a/core/binary_protocol/src/consensus/iobuf.rs
+++ b/core/binary_protocol/src/consensus/iobuf.rs
@@ -119,9 +119,13 @@ impl<const ALIGN: usize> Owned<ALIGN> {
assert!(split_at <= self.inner.len());
let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+ let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
let ctrlb = ControlBlock::new(base, len, capacity);
+ // ControlBlock::new succeeded; the buffer is now owned by the
+ // control block. Skip the guard's destructor.
+ guard.defuse();
unsafe { ctrlb.as_ref().ref_count.fetch_add(1, Ordering::Relaxed) };
let prefix = Prefix {
@@ -252,9 +256,11 @@ impl<const ALIGN: usize> From<Owned<ALIGN>> for
Frozen<ALIGN> {
fn from(value: Owned<ALIGN>) -> Self {
let inner = value.inner;
let (ptr, _, len, capacity) = inner.into_raw_parts();
+ let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
let ctrlb = ControlBlock::new(base, len, capacity);
+ guard.defuse();
Self {
inner: Extent {
ptr: base,
@@ -363,6 +369,46 @@ impl ControlBlock {
}
}
+/// Drop-on-unwind guard for raw `AVec` parts. Plugs the leak window between
+/// `AVec::into_raw_parts` (surrenders the buffer) and `ControlBlock::new`
+/// (calls `Box::new`, can panic via `handle_alloc_error`). On unwind, `Drop`
+/// reconstitutes the `AVec`. Success paths must call [`AVecRawGuard::defuse`]
+/// before constructing the `Extent` (ownership transferred, no double-free).
+struct AVecRawGuard<const ALIGN: usize> {
+ ptr: *mut u8,
+ len: usize,
+ capacity: usize,
+}
+
+impl<const ALIGN: usize> AVecRawGuard<ALIGN> {
+ fn new(ptr: *mut u8, len: usize, capacity: usize) -> Self {
+ Self { ptr, len, capacity }
+ }
+
+ /// Ownership transferred to new `Extent`; skip destructor.
+ fn defuse(self) {
+ std::mem::forget(self);
+ }
+}
+
+impl<const ALIGN: usize> Drop for AVecRawGuard<ALIGN> {
+ fn drop(&mut self) {
+ // SAFETY: `ptr/len/capacity` come from `AVec::into_raw_parts` of an
+ // `AVec<u8, ConstAlign<ALIGN>>` whose ownership the caller has just
+ // surrendered. While the guard is alive no other handle to the
+ // buffer exists, so reconstituting and dropping the `AVec` is the
+ // unique deallocation path.
+ unsafe {
+ drop(AVec::<u8, ConstAlign<ALIGN>>::from_raw_parts(
+ self.ptr,
+ ALIGN,
+ self.len,
+ self.capacity,
+ ));
+ }
+ }
+}
+
struct Extent<const ALIGN: usize> {
pub(crate) ptr: NonNull<u8>,
pub(crate) len: usize,
@@ -389,9 +435,11 @@ impl<const ALIGN: usize> Extent<ALIGN> {
v.extend_from_slice(src);
let (ptr, _, len, capacity) = v.into_raw_parts();
+ let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
let data = unsafe { NonNull::new_unchecked(ptr) };
let ctrlb = ControlBlock::new(data, len, capacity);
+ guard.defuse();
Extent {
ptr: data,
@@ -425,8 +473,10 @@ pub(crate) fn extent_from_aligned_vec<const ALIGN: usize>(
vec: AVec<u8, ConstAlign<ALIGN>>,
) -> Extent<ALIGN> {
let (ptr, _, len, capacity) = vec.into_raw_parts();
+ let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
let data = unsafe { NonNull::new_unchecked(ptr) };
let ctrlb = ControlBlock::new(data, len, capacity);
+ guard.defuse();
Extent {
ptr: data,
@@ -450,11 +500,22 @@ fn extent_ref_count<const ALIGN: usize>(extent:
&Extent<ALIGN>) -> usize {
unsafe { extent.ctrlb.as_ref().ref_count.load(Ordering::Acquire) }
}
+/// Decrement refcount; on last reference, drop the box and free the `AVec`.
+///
+/// # Safety
+///
+/// * `ctrlb`: live `ControlBlock`; caller owns one refcount share.
+/// * Caller must not use `ctrlb` after this call.
+/// * `ALIGN` must match the original `AVec<u8, ConstAlign<ALIGN>>` allocation.
unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb:
NonNull<ControlBlock>) {
let old = unsafe { ctrlb.as_ref() }
.ref_count
.fetch_sub(1, Ordering::Release);
- debug_assert!(old > 0, "control block refcount underflow");
+ // Underflow = use-after-free. The `fetch_sub` has already wrapped by the
+ // time we observe `old == 0`, so detection is post-corruption, but
+ // aborting here still beats letting the wrapped count drive a
+ // double-free.
+ assert!(old > 0, "control block refcount underflow");
if old != 1 {
return;
@@ -474,6 +535,14 @@ unsafe fn release_control_block_w_allocation<const ALIGN:
usize>(ctrlb: NonNull<
};
}
+/// Take ownership of a refcount==1 control block (consumes the heap box).
+/// Used by the merge path to recycle `(base, len, capacity)`.
+///
+/// # Safety
+///
+/// * `ctrlb`: live `ControlBlock` with refcount exactly 1 (asserted). Caller
+/// must guarantee no concurrent `clone` can race the load.
+/// * Caller must not use `ctrlb` after this call.
unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) ->
ControlBlock {
assert_eq!(
unsafe { ctrlb.as_ref() }.ref_count.load(Ordering::Acquire),
@@ -482,6 +551,18 @@ unsafe fn reclaim_unique_control_block(ctrlb:
NonNull<ControlBlock>) -> ControlB
unsafe { *Box::from_raw(ctrlb.as_ptr()) }
}
+/// Merge two extents into a single `Owned`. Succeeds when tail's ctrlb is the
+/// only reference (refcount == 2 shared-ctrlb, == 1 separate-ctrlb).
+///
+/// # Safety
+///
+/// * `prefix` + `tail` passed by value (consumed); no external clone may race.
+/// * Compatible allocations:
+/// - Shared ctrlb (from `Owned::split_at`): `prefix.ptr == tail.ctrlb.base`,
+/// `prefix.len + tail.len <= tail.ctrlb.len`.
+/// - Separate ctrlb: `prefix.len == extent_offset_from_base(tail)`, prefix
+/// fits within tail's allocation.
+/// * `ALIGN` matches both backing `AVec`s.
unsafe fn try_merge_extents<const ALIGN: usize>(
prefix: Extent<ALIGN>,
tail: Extent<ALIGN>,
@@ -547,3 +628,368 @@ fn can_coalesce_prefix_with_tail<const ALIGN: usize>(
tail_ref_count == 1 && extent_offset_from_base(&tail.inner) == split_at
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ // Smaller alignment keeps the per-test Miri allocation footprint low while
+ // exercising the same control-block + atomic-refcount + pointer-arithmetic
+ // logic as the production `MESSAGE_ALIGN = 4096`.
+ const A: usize = 64;
+
+ fn owned(data: &[u8]) -> Owned<A> {
+ Owned::<A>::copy_from_slice(data)
+ }
+
+ #[test]
+ fn prefix_from_aligned_vec_roundtrip() {
+ let mut v: AVec<u8, ConstAlign<A>> = AVec::new(A);
+ v.extend_from_slice(b"abc");
+ let prefix = Prefix::<A>::from_aligned_vec(v);
+ assert_eq!(prefix.as_slice(), b"abc");
+ }
+
+ // Owned::split_at: shared control block / disjoint views
+
+ #[test]
+ fn owned_split_at_shares_ctrlb() {
+ let o = owned(b"abcdefgh");
+ let (prefix, tail) = o.split_at(4);
+ assert!(
+ prefix.aliases(&tail),
+ "Owned::split_at must produce halves sharing one control block",
+ );
+ }
+
+ #[test]
+ fn owned_split_drop_prefix_first_keeps_tail_valid() {
+ let o = owned(b"hello world");
+ let (prefix, tail) = o.split_at(5);
+ drop(prefix);
+ // refcount went 2 -> 1; tail's ctrlb / allocation must still be live.
+ assert_eq!(tail.as_slice(), b" world");
+ }
+
+ #[test]
+ fn owned_split_drop_tail_first_keeps_prefix_mutable() {
+ let o = owned(b"hello world");
+ let (mut prefix, tail) = o.split_at(5);
+ drop(tail);
+ assert_eq!(prefix.as_slice(), b"hello");
+ prefix.as_mut_slice()[0] = b'H';
+ assert_eq!(prefix.as_slice(), b"Hello");
+ }
+
+ #[test]
+ fn owned_split_disjoint_mut_does_not_corrupt_alias() {
+ // Mutate the prefix in-place while a frozen tail still aliases the
+ // same allocation. Tree-borrows must accept disjoint child accesses.
+ let o = owned(b"abcdefgh");
+ let (mut prefix, tail) = o.split_at(4);
+ prefix.as_mut_slice().copy_from_slice(b"WXYZ");
+ assert_eq!(prefix.as_slice(), b"WXYZ");
+ assert_eq!(tail.as_slice(), b"efgh");
+ }
+
+ // Frozen::clone: refcount round-trip
+
+ #[test]
+ fn frozen_clone_independent_drop() {
+ let f1: Frozen<A> = owned(b"hello").into();
+ let f2 = f1.clone();
+ let f3 = f1.clone();
+ drop(f1);
+ assert_eq!(f2.as_slice(), b"hello");
+ drop(f2);
+ assert_eq!(f3.as_slice(), b"hello");
+ }
+
+ // Frozen::slice: aliased view, parent-independent lifetime
+
+ fn fslice<R: std::ops::RangeBounds<usize>>(f: &Frozen<A>, range: R) ->
Frozen<A> {
+ f.slice(range)
+ }
+
+ #[test]
+ fn frozen_slice_survives_parent_drop() {
+ let f: Frozen<A> = owned(b"abcdefgh").into();
+ let s = fslice(&f, 2..6);
+ drop(f);
+ assert_eq!(s.as_slice(), b"cdef");
+ }
+
+ #[test]
+ fn frozen_slice_chain_outlives_intermediates() {
+ let f: Frozen<A> = owned(b"abcdefgh").into();
+ let s1 = fslice(&f, 2..7); // "cdefg"
+ let s2 = fslice(&s1, 1..4); // "def"
+ drop(f);
+ drop(s1);
+ assert_eq!(s2.as_slice(), b"def");
+ }
+
+ // coalesce: shared-ctrlb path (refcount == 2)
+
+ #[test]
+ fn coalesce_shared_ctrlb_succeeds() {
+ let o = owned(b"hello world");
+ let (prefix, tail) = o.split_at(5);
+ assert!(prefix.can_coalesce_with(&tail));
+ let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+ assert_eq!(merged.as_slice(), b"hello world");
+ }
+
+ #[test]
+ fn coalesce_shared_ctrlb_blocked_by_extra_clone() {
+ let o = owned(b"hello world");
+ let (prefix, tail) = o.split_at(5);
+ let extra = tail.clone(); // refcount 2 -> 3 (prefix + tail + extra)
+ let result = prefix.try_coalesce_with(tail);
+ assert!(
+ result.is_err(),
+ "shared-ctrlb coalesce must require refcount == 2",
+ );
+ let _ = extra;
+ }
+
+ // coalesce: separate-ctrlb path (refcount == 1, offset matches)
+
+ #[test]
+ fn coalesce_separate_ctrlb_succeeds_and_overwrites_prefix_region() {
+ let f: Frozen<A> = owned(b"abcdefgh").into();
+ let (mut prefix, tail) = f.split_at(3);
+ // prefix lives in its own allocation; mutate it before coalescing.
+ prefix.as_mut_slice().copy_from_slice(b"XYZ");
+ assert!(prefix.can_coalesce_with(&tail));
+ let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+ assert_eq!(merged.as_slice(), b"XYZdefgh");
+ }
+
+ #[test]
+ fn coalesce_separate_ctrlb_blocked_by_extra_tail_clone() {
+ let f: Frozen<A> = owned(b"abcdefgh").into();
+ let (prefix, tail) = f.split_at(3);
+ let extra = tail.clone(); // tail refcount 1 -> 2; separate-ctrlb path
requires 1
+ let result = prefix.try_coalesce_with(tail);
+ assert!(result.is_err());
+ let _ = extra;
+ }
+
+ #[test]
+ fn coalesce_separate_ctrlb_blocked_by_offset_mismatch() {
+ // Re-slice the tail forward so its offset_from_base no longer matches
+ // the prefix length. The separate-ctrlb path must reject this.
+ let f: Frozen<A> = owned(b"abcdefgh").into();
+ let (prefix, tail) = f.split_at(3);
+ let resliced = fslice(&tail, 2..); // offset_from_base = 5, prefix.len
= 3
+ drop(tail); // bring resliced refcount back to 1
+ let result = prefix.try_coalesce_with(resliced);
+ assert!(result.is_err());
+ }
+
+ // IoBufMut / SetLen plumbing through MaybeUninit
+
+ #[test]
+ fn owned_set_len_after_uninit_writes() {
+ let mut o: Owned<A> = Owned::with_capacity(64);
+ let cap = o.buf_capacity();
+ assert!(cap >= 64);
+ {
+ let uninit = o.as_uninit();
+ for (i, slot) in uninit.iter_mut().take(8).enumerate() {
+ slot.write(u8::try_from(i).unwrap());
+ }
+ }
+ // SAFETY: 8 bytes were just initialized via the uninit slice above.
+ unsafe { o.set_len(8) };
+ assert_eq!(o.as_slice(), &[0, 1, 2, 3, 4, 5, 6, 7]);
+ }
+
+ // Concurrent clone/drop: validates atomic ordering on refcount
+
+ #[test]
+ fn frozen_clone_drop_concurrently() {
+ let f: Frozen<A> = owned(b"shared payload").into();
+ let mut handles = Vec::new();
+ for _ in 0..4 {
+ let f_thread = f.clone();
+ handles.push(std::thread::spawn(move || {
+ let a = f_thread.clone();
+ let b = f_thread.clone();
+ assert_eq!(a.as_slice(), b"shared payload");
+ assert_eq!(b.as_slice(), b"shared payload");
+ drop(a);
+ drop(b);
+ drop(f_thread);
+ }));
+ }
+ for h in handles {
+ h.join().unwrap();
+ }
+ assert_eq!(f.as_slice(), b"shared payload");
+ }
+
+ // Concurrent atomics on a shared control block
+ //
+ // Three handles share one control block: `prefix` and `tail` (returned
+ // by `Owned::split_at`) plus a clone `extra` held by another thread.
+ // Refcount starts at 3. The other thread drops `extra` (Release
+ // fetch_sub: 3 -> 2); the calling thread races a `try_coalesce_with`
+ // (Acquire load of the same refcount).
+ //
+ // `prefix` and `tail` are passed to `try_coalesce_with` by value, so
+ // the only refcount transition the calling thread does NOT cause is
+ // `extra`'s drop. The two values the calling thread can observe in
+ // `can_coalesce_prefix_with_tail` are:
+ // - refcount = 3 → shared-ctrlb gate fails → Err((prefix, tail))
+ // - refcount = 2 → gate passes → merge fires → Ok(merged)
+ // Both outcomes must be sound and produce the correct bytes; Miri's
+ // tree-borrows + race detector validates the Release/Acquire pairing.
+ #[test]
+ fn concurrent_coalesce_with_dropping_clone() {
+ use std::sync::{Arc, Barrier};
+
+ // Few iterations: Miri scales poorly with thread count and loop
+ // depth, but its scheduler explores some interleavings each run.
+ for _ in 0..4 {
+ let o = owned(b"hello world");
+ let (prefix, tail) = o.split_at(5);
+ let extra = tail.clone(); // refcount: 2 -> 3
+
+ let barrier = Arc::new(Barrier::new(2));
+ let b_thread = Arc::clone(&barrier);
+
+ let handle = std::thread::spawn(move || {
+ b_thread.wait();
+ drop(extra); // refcount: 3 -> 2 (interleaving varies)
+ });
+
+ barrier.wait();
+ let result = prefix.try_coalesce_with(tail);
+ handle.join().unwrap();
+ match result {
+ Ok(merged) => {
+ // `extra` dropped before our `can_coalesce` load
+ // (observed refcount = 2): shared-ctrlb merge fired
+ // and recovered the original 11-byte buffer.
+ assert_eq!(merged.as_slice(), b"hello world");
+ }
+ Err((prefix, tail)) => {
+ // `extra` still alive at our `can_coalesce` load
+ // (observed refcount = 3): merge declined, both
+ // halves returned with their original contents.
+ assert_eq!(prefix.as_slice(), b"hello");
+ assert_eq!(tail.as_slice(), b" world");
+ // The racing drop has completed by now (we joined
+ // above), so refcount is 2 and a retry from the
+ // same caller must succeed.
+ let merged = prefix
+ .try_coalesce_with(tail)
+ .expect("retry after racing drop must succeed");
+ assert_eq!(merged.as_slice(), b"hello world");
+ }
+ }
+ }
+ }
+
+ // Drop-on-panic: destructors must clean up under unwinding
+ //
+ // Miri's leak detector enforces this implicitly: any allocation not
+ // freed by the time the test exits fails the run. The companion case
+ // where `Box::new(ControlBlock)` panics between `into_raw_parts` and
+ // the new `Extent` is covered by the `AVecRawGuard` tests below.
+ #[test]
+ fn drop_on_panic_with_active_iobuf_does_not_leak() {
+ let result = std::panic::catch_unwind(|| {
+ let o = owned(b"abcd");
+ let (prefix, tail) = o.split_at(2);
+ let frozen: Frozen<A> = owned(b"xyz").into();
+ let clone = frozen.clone();
+ // Anchor each binding with a positive content check so the
+ // test would not pass for the wrong reason if a future
+ // refactor silently elided one of the four values: the
+ // corresponding `assert_eq!` would fail to compile (the
+ // identifier disappears) and surface the regression. Without
+ // these anchors, an elided value also elides its allocation
+ // and Miri's leak detector stays satisfied.
+ assert_eq!(prefix.as_slice(), b"ab");
+ assert_eq!(tail.as_slice(), b"cd");
+ assert_eq!(frozen.as_slice(), b"xyz");
+ assert_eq!(clone.as_slice(), b"xyz");
+ // Bind everything in a tuple so all four destructors run
+ // during unwinding; each decrements one of two control
+ // blocks and Miri verifies the buffers are released.
+ let _keep = (prefix, tail, frozen, clone);
+ panic!("simulated user-code panic");
+ });
+ assert!(result.is_err());
+ }
+
+ // Boundary splits: Owned::split_at(0) and Owned::split_at(len)
+ //
+ // At split_at(0): prefix.len == 0, tail.ptr aliases base.
+ // At split_at(len): prefix.len == len, tail.ptr is one-past-end with len
0.
+ // Both must round-trip through try_coalesce_with cleanly.
+
+ #[test]
+ fn coalesce_after_owned_split_at_zero_round_trips() {
+ let o = owned(b"hello");
+ let (prefix, tail) = o.split_at(0);
+ assert!(prefix.is_empty());
+ assert_eq!(tail.as_slice(), b"hello");
+ let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+ assert_eq!(merged.as_slice(), b"hello");
+ }
+
+ #[test]
+ fn coalesce_after_owned_split_at_full_len_round_trips() {
+ let o = owned(b"hello");
+ let (prefix, tail) = o.split_at(5);
+ assert_eq!(prefix.as_slice(), b"hello");
+ assert!(tail.is_empty());
+ let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+ assert_eq!(merged.as_slice(), b"hello");
+ }
+
+ // AVecRawGuard: panic between into_raw_parts and ControlBlock::new
+ //
+ // The window between `AVec::into_raw_parts` (which surrenders the
+ // backing buffer's destructor) and `ControlBlock::new` (which calls
+ // `Box::new`, fallible under allocator pressure) used to leak the
+ // buffer if `Box::new` panicked. `AVecRawGuard` reconstitutes the
+ // `AVec` on unwind so the buffer is freed.
+ //
+ // We can't readily inject a panic into `Box::new` itself, so this
+ // exercises the guard primitive directly: a `panic!` while the guard
+ // is alive must release the buffer. Miri's leak detector enforces it.
+
+ #[test]
+ fn avec_raw_guard_releases_buffer_on_unwind() {
+ let result = std::panic::catch_unwind(|| {
+ let mut v: AVec<u8, ConstAlign<A>> = AVec::new(A);
+ v.extend_from_slice(b"buffer that must be freed when the guard
drops");
+ let (ptr, _, len, capacity) = v.into_raw_parts();
+ let _guard = AVecRawGuard::<A>::new(ptr, len, capacity);
+ panic!("simulated allocator failure inside ControlBlock::new");
+ });
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn avec_raw_guard_defuse_skips_destructor() {
+ // Success path: `defuse` must skip the destructor so the buffer is
+ // not double-freed when the new owner (Extent / control block)
+ // releases it. Reconstitute the AVec manually after defusing and
+ // confirm the bytes are intact: proves the guard did NOT free.
+ let mut v: AVec<u8, ConstAlign<A>> = AVec::new(A);
+ v.extend_from_slice(b"contents");
+ let (ptr, _, len, capacity) = v.into_raw_parts();
+ let guard = AVecRawGuard::<A>::new(ptr, len, capacity);
+ guard.defuse();
+ // SAFETY: defuse forgot the guard, so the raw parts are still
+ // ours to reconstitute exactly once.
+ let v = unsafe { AVec::<u8, ConstAlign<A>>::from_raw_parts(ptr, A,
len, capacity) };
+ assert_eq!(v.as_slice(), b"contents");
+ }
+}
diff --git a/core/binary_protocol/src/consensus/message.rs
b/core/binary_protocol/src/consensus/message.rs
index de19029c0..76bed247b 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -278,6 +278,17 @@ where
Ok(typed_message)
}
+ /// Construct a typed `Message<H, B>` without re-validating the header.
+ ///
+ /// # Safety
+ ///
+ /// Caller must guarantee:
+ /// * `backing.total_len() >= size_of::<H>()`.
+ /// * Header bytes are a valid `H` bit pattern (`try_from_bytes` would
succeed).
+ /// * `H::validate` would return `Ok`.
+ ///
+ /// Prefer `try_into_typed::<H>()`. Only use when bytes already validated
+ /// via another route (e.g. enclosing `MessageBag::try_from` dispatch).
const unsafe fn from_backing_unchecked(backing: B) -> Self {
Self {
backing,
@@ -518,40 +529,25 @@ where
{
type Error = ConsensusError;
+ // Dispatch via `try_into_typed::<H>()`: re-runs per-typed `validate()`.
+ // `from_backing_unchecked` trusts the command byte alone, letting
+ // invariant violations (Commit size != 256, DoViewChange log_view > view)
+ // reach the router.
fn try_from(value: Message<T>) -> Result<Self, Self::Error> {
let command = value.as_generic().header().command;
- let backing = value.into_inner();
match command {
- Command2::Prepare => {
- let msg = unsafe {
Message::<PrepareHeader>::from_backing_unchecked(backing) };
- Ok(Self::Prepare(msg))
- }
- Command2::Request => {
- let msg = unsafe {
Message::<RequestHeader>::from_backing_unchecked(backing) };
- Ok(Self::Request(msg))
- }
- Command2::PrepareOk => {
- let msg = unsafe {
Message::<PrepareOkHeader>::from_backing_unchecked(backing) };
- Ok(Self::PrepareOk(msg))
- }
- Command2::StartViewChange => {
- let msg =
- unsafe {
Message::<StartViewChangeHeader>::from_backing_unchecked(backing) };
- Ok(Self::StartViewChange(msg))
- }
- Command2::DoViewChange => {
- let msg = unsafe {
Message::<DoViewChangeHeader>::from_backing_unchecked(backing) };
- Ok(Self::DoViewChange(msg))
- }
- Command2::StartView => {
- let msg = unsafe {
Message::<StartViewHeader>::from_backing_unchecked(backing) };
- Ok(Self::StartView(msg))
- }
- Command2::Commit => {
- let msg = unsafe {
Message::<CommitHeader>::from_backing_unchecked(backing) };
- Ok(Self::Commit(msg))
- }
+ Command2::Prepare =>
Ok(Self::Prepare(value.try_into_typed::<PrepareHeader>()?)),
+ Command2::Request =>
Ok(Self::Request(value.try_into_typed::<RequestHeader>()?)),
+ Command2::PrepareOk =>
Ok(Self::PrepareOk(value.try_into_typed::<PrepareOkHeader>()?)),
+ Command2::StartViewChange => Ok(Self::StartViewChange(
+ value.try_into_typed::<StartViewChangeHeader>()?,
+ )),
+ Command2::DoViewChange => Ok(Self::DoViewChange(
+ value.try_into_typed::<DoViewChangeHeader>()?,
+ )),
+ Command2::StartView =>
Ok(Self::StartView(value.try_into_typed::<StartViewHeader>()?)),
+ Command2::Commit =>
Ok(Self::Commit(value.try_into_typed::<CommitHeader>()?)),
// Reply / Eviction are server-to-client frames; they do not
// appear on the inbound dispatch path.
Command2::Reply | Command2::Eviction => {
@@ -564,3 +560,250 @@ where
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::consensus::{Operation, ReplyHeader};
+ use smallvec::smallvec;
+
+ // Field offsets via `offset_of!`: a field reorder fails to compile here
+ // rather than silently corrupting test bytes.
+ const SIZE_OFF: usize = std::mem::offset_of!(RequestHeader, size);
+ const COMMAND_OFF: usize = std::mem::offset_of!(RequestHeader, command);
+ const REQUEST_CLIENT_OFF: usize = std::mem::offset_of!(RequestHeader,
client);
+ const REQUEST_OPERATION_OFF: usize = std::mem::offset_of!(RequestHeader,
operation);
+ const REQUEST_SESSION_OFF: usize = std::mem::offset_of!(RequestHeader,
session);
+
+ fn header_bytes(command: Command2, size: u32) -> Owned<MESSAGE_ALIGN> {
+ let mut o = Owned::<MESSAGE_ALIGN>::zeroed(256);
+ {
+ let buf = o.as_mut_slice();
+ buf[SIZE_OFF..SIZE_OFF + 4].copy_from_slice(&size.to_le_bytes());
+ buf[COMMAND_OFF] = command as u8;
+ // Typed headers reject client == 0. `#[repr(C)]` preamble layout
+ // is shared, so this offset works across header types.
+ buf[REQUEST_CLIENT_OFF..REQUEST_CLIENT_OFF + 16]
+ .copy_from_slice(&0xCAFE_u128.to_le_bytes());
+ }
+ o
+ }
+
+ // Construction via Message::new (zeroed)
+
+ #[test]
+ #[should_panic(expected = "size must be at least header size")]
+ fn message_new_smaller_than_header_panics() {
+ let _ = Message::<RequestHeader>::new(100);
+ }
+
+ // try_from(Owned): validation gates the unsafe construction
+
+ #[test]
+ fn try_from_owned_too_short_returns_err() {
+ let owned = Owned::<MESSAGE_ALIGN>::zeroed(100);
+ let result = Message::<RequestHeader>::try_from(owned);
+ assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+ }
+
+ #[test]
+ fn try_from_owned_invalid_bit_pattern_returns_err() {
+ let mut owned = Owned::<MESSAGE_ALIGN>::zeroed(256);
+ owned.as_mut_slice()[COMMAND_OFF] = 99; // outside Command2's
discriminant range
+ let result = Message::<RequestHeader>::try_from(owned);
+ assert!(matches!(result, Err(ConsensusError::InvalidBitPattern)));
+ }
+
+ #[test]
+ fn try_from_owned_buffer_shorter_than_claimed_size_returns_err() {
+ // Header parses cleanly (RequestHeader::validate doesn't gate on
+ // size), but the encoded `size` field claims more bytes than the
+ // backing buffer holds. The buffer-bounds check at the bottom of
+ // `Message::try_from` must reject. (Both this case and the
+ // "buffer shorter than `size_of::<H>`" case currently surface as
+ // the same `InvalidCommand` variant; promoting them to distinct
+ // `ConsensusError` variants is a separate hardening pass.)
+ let owned = header_bytes(Command2::Request, 999);
+ // header_bytes already produces a 256-byte buffer; size=999 > 256,
+ // so try_from rejects via `bytes.len() < header.size()`.
+ let result = Message::<RequestHeader>::try_from(owned);
+ assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+ }
+
+ // as_generic: const unsafe pointer cast (#[repr(C)] equivalence)
+
+ #[test]
+ fn as_generic_view_reads_command_byte() {
+ let owned = header_bytes(Command2::Request, 256);
+ let typed = Message::<RequestHeader>::try_from(owned).expect("valid");
+ let generic = typed.as_generic();
+ assert_eq!(generic.header().command, Command2::Request);
+ assert_eq!(generic.total_len(), 256);
+ }
+
+ // try_as_typed: validation gates the unsafe ptr-cast reborrow
+
+ #[test]
+ fn try_as_typed_command_mismatch_returns_err_without_unsafe_cast() {
+ // bytes are a valid Prepare; asking for RequestHeader must fail
+ // *before* the unsafe ptr-cast inside try_as_typed.
+ let owned = header_bytes(Command2::Prepare, 256);
+ let generic =
Message::<GenericHeader>::try_from(owned).expect("valid");
+ let result = generic.try_as_typed::<RequestHeader>();
+ assert!(matches!(
+ result,
+ Err(ConsensusError::InvalidCommand {
+ expected: Command2::Request,
+ found: Command2::Prepare,
+ })
+ ));
+ }
+
+ #[test]
+ fn try_as_typed_invalid_validation_returns_err() {
+ // RequestHeader::validate rejects operation=Register with non-zero
session.
+ let mut owned = header_bytes(Command2::Request, 256);
+ {
+ let buf = owned.as_mut_slice();
+ buf[REQUEST_OPERATION_OFF] = Operation::Register as u8;
+ buf[REQUEST_SESSION_OFF..REQUEST_SESSION_OFF +
8].copy_from_slice(&5u64.to_le_bytes());
+ }
+ let generic = Message::<GenericHeader>::try_from(owned).expect("valid
generic");
+ let result = generic.try_as_typed::<RequestHeader>();
+ assert!(matches!(result, Err(ConsensusError::InvalidField(_))));
+ }
+
+ // try_into_typed: consuming variant of try_as_typed
+
+ #[test]
+ fn try_into_typed_command_mismatch_returns_err() {
+ let owned = header_bytes(Command2::Prepare, 256);
+ let generic =
Message::<GenericHeader>::try_from(owned).expect("valid");
+ let result = generic.try_into_typed::<RequestHeader>();
+ assert!(matches!(
+ result,
+ Err(ConsensusError::InvalidCommand {
+ expected: Command2::Request,
+ found: Command2::Prepare,
+ })
+ ));
+ }
+
+ // MessageBag dispatch: 7 unsafe `from_backing_unchecked` arms
+
+ fn dispatch(command: Command2, size: u32) -> Result<MessageBag,
ConsensusError> {
+ let owned = header_bytes(command, size);
+ let generic = Message::<GenericHeader>::try_from(owned).expect("valid
generic");
+ MessageBag::try_from(generic)
+ }
+
+ #[test]
+ fn messagebag_dispatch_unsupported_command_returns_err() {
+ // Ping is a valid Command2 bit pattern but is not a MessageBag
variant.
+ let owned = header_bytes(Command2::Ping, 256);
+ let generic = Message::<GenericHeader>::try_from(owned).expect("valid
generic");
+ let result = MessageBag::try_from(generic);
+ assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+ }
+
+ #[test]
+ fn messagebag_command_method_round_trips() {
+ for cmd in [
+ Command2::Request,
+ Command2::Prepare,
+ Command2::PrepareOk,
+ Command2::StartViewChange,
+ Command2::DoViewChange,
+ Command2::StartView,
+ Command2::Commit,
+ ] {
+ let bag = dispatch(cmd, 256).expect("dispatch");
+ assert_eq!(bag.command(), cmd, "round-trip for {cmd:?}");
+ assert_eq!(bag.size(), 256, "size for {cmd:?}");
+ }
+ }
+
+ // MessageBag dispatch must enforce per-typed validate()
+ // (Commit size != 256, Register with session != 0, etc.)
+
+ #[test]
+ fn messagebag_dispatch_commit_with_invalid_size_returns_err() {
+ // `CommitHeader::validate` rejects size != 256.
+ let owned = header_bytes(Command2::Commit, 128);
+ let generic = Message::<GenericHeader>::try_from(owned).expect("valid
generic");
+ let result = MessageBag::try_from(generic);
+ assert!(matches!(
+ result,
+ Err(ConsensusError::CommitInvalidSize(128))
+ ));
+ }
+
+ #[test]
+ fn messagebag_dispatch_request_with_invalid_register_session_returns_err()
{
+ // `RequestHeader::validate` rejects Register with non-zero session.
+ let mut owned = header_bytes(Command2::Request, 256);
+ {
+ let buf = owned.as_mut_slice();
+ buf[REQUEST_OPERATION_OFF] = Operation::Register as u8;
+ buf[REQUEST_SESSION_OFF..REQUEST_SESSION_OFF +
8].copy_from_slice(&5u64.to_le_bytes());
+ }
+ let generic = Message::<GenericHeader>::try_from(owned).expect("valid
generic");
+ let result = MessageBag::try_from(generic);
+ assert!(matches!(result, Err(ConsensusError::InvalidField(_))));
+ }
+
+ // deep_copy: byte-level independence after clone-like API
+
+ #[test]
+ fn request_message_deep_copy_independent() {
+ let owned = header_bytes(Command2::Request, 256);
+ let mut msg =
Message::<RequestHeader>::try_from(owned).expect("valid");
+ let copy = msg.deep_copy();
+ // Mutate the original's bytes; the deep copy must be untouched.
+ msg.as_mut_slice()[200] = 0xab;
+ assert_eq!(copy.as_slice()[200], 0);
+ assert_eq!(msg.as_slice()[200], 0xab);
+ }
+
+ // transmute_header: rewrites the typed header in place
+
+ #[test]
+ fn transmute_header_request_to_prepare() {
+ let owned = header_bytes(Command2::Request, 256);
+ let msg = Message::<RequestHeader>::try_from(owned).expect("valid");
+ let prepared: Message<PrepareHeader> =
+ msg.transmute_header::<PrepareHeader>(|_old, new| {
+ new.command = Command2::Prepare;
+ new.size = 256;
+ });
+ assert_eq!(prepared.header().command, Command2::Prepare);
+ }
+
+ // ResponseBacking via SmallVec<Frozen>
+
+ #[test]
+ fn response_backing_single_fragment_roundtrip() {
+ let owned = header_bytes(Command2::Reply, 256);
+ let frozen: Frozen<MESSAGE_ALIGN> = owned.into();
+ let fragments: smallvec::SmallVec<[Frozen<MESSAGE_ALIGN>; 4]> =
smallvec![frozen];
+ let msg = Message::<ReplyHeader,
ResponseBacking>::try_from(fragments).expect("valid");
+ assert_eq!(msg.header().command, Command2::Reply);
+ assert_eq!(msg.fragments().len(), 1);
+ }
+
+ #[test]
+ fn response_backing_empty_fragments_returns_err() {
+ let fragments: smallvec::SmallVec<[Frozen<MESSAGE_ALIGN>; 4]> =
smallvec![];
+ let result = Message::<ReplyHeader,
ResponseBacking>::try_from(fragments);
+ assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+ }
+
+ #[test]
+ fn response_backing_first_fragment_too_short_returns_err() {
+ let owned = Owned::<MESSAGE_ALIGN>::zeroed(100);
+ let frozen: Frozen<MESSAGE_ALIGN> = owned.into();
+ let fragments: smallvec::SmallVec<[Frozen<MESSAGE_ALIGN>; 4]> =
smallvec![frozen];
+ let result = Message::<ReplyHeader,
ResponseBacking>::try_from(fragments);
+ assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+ }
+}
diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
index 1291febb4..0d81b9cc9 100644
--- a/core/common/src/alloc/buffer.rs
+++ b/core/common/src/alloc/buffer.rs
@@ -346,3 +346,168 @@ impl IoBufMut for PooledBuffer {
unsafe { std::slice::from_raw_parts_mut(ptr, cap) }
}
}
+
+#[cfg(test)]
+mod miri_tests {
+ //! Miri targets the 3 unsafe sites: `IoBufMut::as_uninit`
+ //! (`from_raw_parts_mut` ptr cast), `SetLen::set_len`, `split_to`
+ //! (`ptr::copy` forward-overlap + `set_len`).
+ //!
+ //! Pool-free helper avoids the global `MEMORY_POOL` (a `OnceCell` whose
+ //! `ArrayQueue` buckets leak under Miri) and `serial_test` (pulls
+ //! `sdd`/`scc` with int→ptr casts rejected by `-Zmiri-strict-provenance`).
+ //! `split_to` tests need the pool internally; gated `#[cfg(not(miri))]`.
+
+ use super::*;
+ use aligned_vec::{AVec, ConstAlign};
+
+ /// Build `PooledBuffer` with `from_pool == false` to skip the global pool.
+ fn pool_free_with_capacity(cap: usize) -> PooledBuffer {
+ let v: AVec<u8, ConstAlign<ALIGNMENT>> =
AVec::with_capacity(ALIGNMENT, cap);
+ PooledBuffer::from_existing(v)
+ }
+
+ // IoBufMut::as_uninit
+
+ #[test]
+ fn as_uninit_returns_slice_of_full_capacity() {
+ let mut buf = pool_free_with_capacity(256);
+ let cap_before = buf.capacity();
+ let uninit = buf.as_uninit();
+ assert_eq!(
+ uninit.len(),
+ cap_before,
+ "as_uninit must expose the full capacity, not just initialized
len",
+ );
+ }
+
+ #[test]
+ fn as_uninit_pointer_is_4096_aligned() {
+ let mut buf = pool_free_with_capacity(8192);
+ let addr = buf.as_uninit().as_mut_ptr() as usize;
+ assert_eq!(addr % ALIGNMENT, 0);
+ }
+
+ #[test]
+ fn as_uninit_write_then_set_len_observes_writes() {
+ let mut buf = pool_free_with_capacity(128);
+ {
+ let uninit = buf.as_uninit();
+ for (i, slot) in uninit.iter_mut().take(16).enumerate() {
+ slot.write(u8::try_from(i).unwrap());
+ }
+ }
+ // SAFETY: 16 bytes initialized above.
+ unsafe { <PooledBuffer as SetLen>::set_len(&mut buf, 16) };
+ assert_eq!(
+ buf.as_init(),
+ &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
+ );
+ }
+
+ // SetLen::set_len
+
+ #[test]
+ fn set_len_to_full_capacity_after_uninit_fill() {
+ // Fill entire capacity then `set_len(cap)`: Miri verifies every
+ // byte initialized before read via `as_init()`.
+ let mut buf = pool_free_with_capacity(256);
+ let cap = buf.capacity();
+ {
+ let uninit = buf.as_uninit();
+ assert_eq!(uninit.len(), cap);
+ for (i, slot) in uninit.iter_mut().enumerate() {
+ slot.write(u8::try_from(i & 0xff).unwrap());
+ }
+ }
+ // SAFETY: 0..cap initialized above.
+ unsafe { <PooledBuffer as SetLen>::set_len(&mut buf, cap) };
+ assert_eq!(buf.len(), cap);
+ assert_eq!(buf.as_init()[0], 0);
+ assert_eq!(
+ buf.as_init()[cap - 1],
+ u8::try_from((cap - 1) & 0xff).unwrap()
+ );
+ }
+
+ // split_to: `ptr::copy` (forward-overlap capable) + `set_len`.
+ // Needs global pool; skip Miri (ArrayQueue retention reads as leak,
+ // serial_test fails strict-provenance).
+
+ #[cfg(not(miri))]
+ mod split_to {
+ use super::*;
+ use crate::IggyByteSize;
+ use crate::alloc::memory_pool::{MemoryPool, MemoryPoolConfigOther};
+ use serial_test::serial;
+ use std::str::FromStr;
+ use std::sync::Once;
+
+ static MIRI_POOL_INIT: Once = Once::new();
+
+ fn init_pool_for_split_to_tests() {
+ MIRI_POOL_INIT.call_once(|| {
+ let config = MemoryPoolConfigOther {
+ enabled: true,
+ size: IggyByteSize::from_str("64MiB").unwrap(),
+ bucket_capacity: 16,
+ };
+ MemoryPool::init_pool(&config);
+ });
+ }
+
+ #[test]
+ #[serial(memory_pool)]
+ fn basic_split() {
+ init_pool_for_split_to_tests();
+ let mut buf = pool_free_with_capacity(64);
+ buf.extend_from_slice(b"abcdefghij");
+ let prefix = buf.split_to(4);
+ assert_eq!(prefix.as_ref(), b"abcd");
+ assert_eq!(buf.as_ref(), b"efghij");
+ }
+
+ #[test]
+ #[serial(memory_pool)]
+ fn at_zero_yields_empty_prefix_and_unchanged_self() {
+ init_pool_for_split_to_tests();
+ let mut buf = pool_free_with_capacity(32);
+ buf.extend_from_slice(b"abcd");
+ let prefix = buf.split_to(0);
+ assert!(prefix.is_empty());
+ assert_eq!(buf.as_ref(), b"abcd");
+ }
+
+ #[test]
+ #[serial(memory_pool)]
+ fn at_len_yields_full_prefix_and_empty_self() {
+ init_pool_for_split_to_tests();
+ let mut buf = pool_free_with_capacity(32);
+ buf.extend_from_slice(b"abcd");
+ let prefix = buf.split_to(4);
+ assert_eq!(prefix.as_ref(), b"abcd");
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ #[serial(memory_pool)]
+ fn forward_overlap_preserves_bytes() {
+ // at=2, len=8: source [2..8) overlaps destination [0..6).
+ // `ptr::copy` handles overlap; `copy_nonoverlapping` would be UB.
+ init_pool_for_split_to_tests();
+ let mut buf = pool_free_with_capacity(32);
+ buf.extend_from_slice(b"ABCDEFGH");
+ let prefix = buf.split_to(2);
+ assert_eq!(prefix.as_ref(), b"AB");
+ assert_eq!(buf.as_ref(), b"CDEFGH");
+ }
+ }
+
+ // Drop short-circuit for `from_pool == false`. Miri leak detector
enforces.
+ #[test]
+ fn pool_free_buffer_drop_does_not_leak() {
+ let mut buf = pool_free_with_capacity(256);
+ buf.extend_from_slice(&[0xaa; 200]);
+ drop(buf);
+ }
+}
diff --git a/core/common/src/types/send_messages2.rs
b/core/common/src/types/send_messages2.rs
index 58390db04..73d07126f 100644
--- a/core/common/src/types/send_messages2.rs
+++ b/core/common/src/types/send_messages2.rs
@@ -583,16 +583,37 @@ fn decode_request_slice(body: &[u8]) ->
Result<SendMessages2Ref<'_>, IggyError>
Ok(SendMessages2Ref { header, blob })
}
+/// Decode a `Prepare` message from a slice of bytes.
+///
+/// `bytes` must be 16-byte aligned (`PrepareHeader` has `u128` fields). Source
+/// from `Frozen<MESSAGE_ALIGN>` / `Owned<MESSAGE_ALIGN>` / `Message<H>`.
+/// Misalignment: `debug_assert!` in debug; `InvalidCommand` in release.
+///
+/// # Errors
+///
+/// `IggyError::InvalidCommand` on: short buffer, bad bit pattern, `size`
+/// outside `[header_size, bytes.len()]`, short/checksum-mismatched body.
pub fn decode_prepare_slice(bytes: &[u8]) -> Result<SendMessages2Ref<'_>,
IggyError> {
let header_size = std::mem::size_of::<PrepareHeader>();
if bytes.len() < header_size {
return Err(IggyError::InvalidCommand);
}
+ // Bytemuck enforces alignment in release (maps to InvalidCommand below);
+ // debug_assert surfaces the contract violation early in dev.
+ debug_assert_eq!(
+ bytes
+ .as_ptr()
+ .align_offset(std::mem::align_of::<PrepareHeader>()),
+ 0,
+ "decode_prepare_slice: bytes must be at least 16-byte aligned",
+ );
+
let prepare =
bytemuck::checked::try_from_bytes::<PrepareHeader>(&bytes[..header_size])
.map_err(|_| IggyError::InvalidCommand)?;
let total_size = prepare.size as usize;
- if bytes.len() < total_size {
+ // Wire-controllable `size`: reject < header_size to avoid slice OOB below.
+ if total_size < header_size || bytes.len() < total_size {
return Err(IggyError::InvalidCommand);
}
@@ -753,3 +774,50 @@ fn read_u128(bytes: &[u8], offset: usize) -> Result<u128,
IggyError> {
.map(u128::from_le_bytes)
.ok_or(IggyError::InvalidNumberEncoding)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use iggy_binary_protocol::Command2;
+
+ fn aligned_prepare_bytes(size: u32) -> Owned<MESSAGE_ALIGN> {
+ let mut owned =
Owned::<MESSAGE_ALIGN>::zeroed(std::mem::size_of::<PrepareHeader>());
+ let header: &mut PrepareHeader =
+ bytemuck::checked::try_from_bytes_mut(owned.as_mut_slice())
+ .expect("zeroed bytes form a valid PrepareHeader");
+ header.command = Command2::Prepare;
+ header.size = size;
+ owned
+ }
+
+ #[test]
+ fn decode_prepare_slice_size_below_header_size_does_not_panic() {
+ // Regression: without the `total_size < header_size` guard,
+ // `&bytes[256..size]` panics for any size < 256.
+ for adversarial_size in [0u32, 255] {
+ let owned = aligned_prepare_bytes(adversarial_size);
+ let result = decode_prepare_slice(owned.as_slice());
+ assert!(
+ matches!(result, Err(IggyError::InvalidCommand)),
+ "size={adversarial_size} must be rejected, got {result:?}",
+ );
+ }
+ }
+
+ #[cfg(debug_assertions)]
+ #[test]
+ #[should_panic(expected = "must be at least 16-byte aligned")]
+ fn decode_prepare_slice_debug_asserts_on_misaligned_input() {
+ // `Vec<u8>` requests align=1; glibc returns a base that is a
+ // multiple of 16, so `&buf[1..]` has offset 1 mod 16, eliably
+ // misaligned.
+ let buf: Vec<u8> = vec![0u8; std::mem::size_of::<PrepareHeader>() + 1];
+ let misaligned = &buf[1..];
+ assert_ne!(
+ misaligned.as_ptr().align_offset(16),
+ 0,
+ "test setup: allocator returned non-16k base",
+ );
+ let _ = decode_prepare_slice(misaligned);
+ }
+}
diff --git a/core/journal/src/prepare_journal.rs
b/core/journal/src/prepare_journal.rs
index 283db5b4d..2c5b564bc 100644
--- a/core/journal/src/prepare_journal.rs
+++ b/core/journal/src/prepare_journal.rs
@@ -19,7 +19,7 @@ use crate::file_storage::FileStorage;
use crate::{Journal, JournalHandle};
use compio::io::AsyncWriteAtExt;
use iggy_binary_protocol::consensus::iobuf::Owned;
-use iggy_binary_protocol::consensus::message::Message;
+use iggy_binary_protocol::consensus::message::{MESSAGE_ALIGN, Message};
use iggy_binary_protocol::consensus::{Command2, PrepareHeader};
use std::cell::{Cell, Ref, RefCell};
use std::fmt;
@@ -139,12 +139,23 @@ impl PrepareJournal {
let mut last_op: Option<u64> = None;
let mut pos: u64 = 0;
let mut header_buf = vec![0u8; HEADER_SIZE];
+ // Reused 16-aligned scratch (PrepareHeader has u128 fields). Avoids
+ // per-iteration 4 KiB-aligned alloc; bytes never become a `Message`.
+ let mut aligned = Owned::<16>::zeroed(HEADER_SIZE);
while pos + HEADER_SIZE as u64 <= file_len {
// Read the 256-byte header
header_buf = storage.read_at(pos, header_buf).await?;
- let header: PrepareHeader =
- *bytemuck::checked::from_bytes::<PrepareHeader>(&header_buf);
+ aligned.as_mut_slice().copy_from_slice(&header_buf);
+ // `try_from_bytes`: corrupt discriminant on disk must NOT panic;
+ // route through the same truncate-here branch as command/size
below.
+ let Ok(header_ref) =
+
bytemuck::checked::try_from_bytes::<PrepareHeader>(aligned.as_slice())
+ else {
+ storage.truncate(pos).await?;
+ break;
+ };
+ let header: PrepareHeader = *header_ref;
// Validate: must be a Prepare command with sane size
if header.command != Command2::Prepare
@@ -259,7 +270,7 @@ impl PrepareJournal {
};
let buf = vec![0u8; size];
let buf = self.storage.read_at(offset, buf).await?;
- let msg =
Message::try_from(Owned::<4096>::copy_from_slice(&buf)).map_err(
+ let msg =
Message::try_from(Owned::<MESSAGE_ALIGN>::copy_from_slice(&buf)).map_err(
|e: iggy_binary_protocol::consensus::ConsensusError| {
io::Error::new(io::ErrorKind::InvalidData, e.to_string())
},
@@ -350,7 +361,7 @@ impl Journal<FileStorage> for PrepareJournal {
for (header, offset) in &to_drain {
let buf = vec![0u8; header.size as usize];
let buf = self.storage.read_at(*offset, buf).await?;
- let msg =
Message::try_from(Owned::<4096>::copy_from_slice(&buf)).map_err(
+ let msg =
Message::try_from(Owned::<MESSAGE_ALIGN>::copy_from_slice(&buf)).map_err(
|e: iggy_binary_protocol::consensus::ConsensusError| {
io::Error::new(io::ErrorKind::InvalidData, e.to_string())
},
@@ -456,7 +467,7 @@ impl Journal<FileStorage> for PrepareJournal {
let buffer = vec![0u8; size];
let buffer = self.storage.read_at(offset, buffer).await.ok()?;
- Message::try_from(Owned::<4096>::copy_from_slice(&buffer)).ok()
+
Message::try_from(Owned::<MESSAGE_ALIGN>::copy_from_slice(&buffer)).ok()
}
}
@@ -478,7 +489,7 @@ mod tests {
fn make_prepare(op: u64, body_size: usize) -> Message<PrepareHeader> {
let total_size = HEADER_SIZE + body_size;
- let mut buffer = Owned::<4096>::zeroed(total_size);
+ let mut buffer = Owned::<MESSAGE_ALIGN>::zeroed(total_size);
let header = bytemuck::checked::from_bytes_mut::<PrepareHeader>(
&mut buffer.as_mut_slice()[..HEADER_SIZE],
@@ -557,6 +568,38 @@ mod tests {
}
}
+ #[compio::test]
+ async fn corrupt_command_byte_truncates_on_reopen() {
+ // Bit-flipped `Command2` discriminant: must truncate, not panic.
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+
+ {
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
+ journal.append(make_prepare(1, 64)).await.unwrap();
+ journal.append(make_prepare(2, 128)).await.unwrap();
+ journal.storage.fsync().await.unwrap();
+ }
+
+ // Entry 2 at offset HEADER_SIZE+64=320; `offset_of!` guards against
+ // future field reorders silently corrupting an unrelated byte.
+ let entry_2_offset = (HEADER_SIZE + 64) as u64;
+ let command_byte_offset =
+ entry_2_offset + std::mem::offset_of!(PrepareHeader, command) as
u64;
+ {
+ use std::io::{Seek, SeekFrom, Write};
+ let mut file =
std::fs::OpenOptions::new().write(true).open(&path).unwrap();
+ file.seek(SeekFrom::Start(command_byte_offset)).unwrap();
+ file.write_all(&[99u8]).unwrap(); // out of range for Command2
+ file.sync_all().unwrap();
+ }
+
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
+ assert_eq!(journal.last_op(), Some(1));
+ assert!(journal.header(2).is_none());
+ assert_eq!(journal.storage.file_len(), entry_2_offset);
+ }
+
#[compio::test]
async fn truncated_entry_on_reopen() {
let dir = tempdir().unwrap();
diff --git a/justfile b/justfile
index 6f37a5e81..7b68d0c4d 100644
--- a/justfile
+++ b/justfile
@@ -48,6 +48,18 @@ nextest: build
nextests TEST: build
cargo nextest run --nocapture -- {{TEST}}
+# Run Miri (UB detector) on the unsafe-heavy crates that don't pull
+# tokio/compio. Mirrors the `miri` task in CI. Pinned to the same nightly
+# as `.github/actions/rust/pre-merge/action.yml` so local runs don't drift
+# from CI on the next nightly bump — keep these two dates in sync.
+# Requires:
+#
+# rustup toolchain install nightly-2026-04-21 --component miri
+#
+miri:
+ MIRIFLAGS="-Zmiri-tree-borrows -Zmiri-strict-provenance" \
+ cargo +nightly-2026-04-21 miri test -p iggy_binary_protocol -p consensus
+
server *ARGS:
cargo run --bin iggy-server {{ARGS}}