This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new af01a3479 ci: add Miri UB detector for binary_protocol and consensus 
(#3284)
af01a3479 is described below

commit af01a3479a6954fe9f9c68dd118dbc37b667368d
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed May 20 19:53:38 2026 +0530

    ci: add Miri UB detector for binary_protocol and consensus (#3284)
    
    - Adds a `miri` task to the pre-merge action, scoped to
    `iggy_binary_protocol` and `consensus` (the unsafe-heavy crates that
    don't link compio's io_uring driver). Pinned
    `nightly-2026-04-21`, `-Zmiri-tree-borrows -Zmiri-strict-provenance`,
    separate cache key.
    - New `rust-miri` component triggers on `core/binary_protocol/**` and
    `core/consensus/**`. Mirrored in `justfile` as `just miri` for local
    runs.
---
 .github/actions/rust/pre-merge/action.yml     |  29 +-
 .github/config/components.yml                 |  18 ++
 core/binary_protocol/src/consensus/command.rs |   5 +-
 core/binary_protocol/src/consensus/header.rs  |  20 +-
 core/binary_protocol/src/consensus/iobuf.rs   | 448 +++++++++++++++++++++++++-
 core/binary_protocol/src/consensus/message.rs | 303 +++++++++++++++--
 core/common/src/alloc/buffer.rs               | 165 ++++++++++
 core/common/src/types/send_messages2.rs       |  70 +++-
 core/journal/src/prepare_journal.rs           |  57 +++-
 justfile                                      |  12 +
 10 files changed, 1083 insertions(+), 44 deletions(-)

diff --git a/.github/actions/rust/pre-merge/action.yml 
b/.github/actions/rust/pre-merge/action.yml
index 11a429dc5..b09d036e6 100644
--- a/.github/actions/rust/pre-merge/action.yml
+++ b/.github/actions/rust/pre-merge/action.yml
@@ -20,7 +20,7 @@ description: Rust pre-merge testing and linting github iggy 
actions
 
 inputs:
   task:
-    description: "Task to run (check, fmt, clippy, sort, machete, doctest, 
verify-publish, test-1, test-2, compat)"
+    description: "Task to run (check, fmt, clippy, sort, machete, doctest, 
verify-publish, test-1, test-2, compat, miri)"
     required: true
   component:
     description: "Component name (for context)"
@@ -45,6 +45,10 @@ runs:
     - name: Setup Rust with cache
       uses: ./.github/actions/utils/setup-rust-with-cache
       with:
+        # Miri builds against a nightly toolchain with a separate `target/miri`
+        # subtree; isolate its cache from the stable `dev` namespace so the
+        # two don't evict each other.
+        shared-key: ${{ inputs.task == 'miri' && 'miri' || 'dev' }}
         print-cache-status: ${{ startsWith(inputs.task, 'test-') }}
 
     - name: Install tools for specific tasks
@@ -303,6 +307,29 @@ runs:
           --port 8090 --wait-secs 180
       shell: bash
 
+    # Miri (UB detector) on the unsafe-heavy crates that don't pull tokio /
+    # compio. Pinned nightly so MIRIFLAGS behavior is stable across CI runs;
+    # bump the date quarterly. Tree-borrows is the future-default aliasing
+    # model and accepts the legitimate `NonNull::add` arithmetic in
+    # `iobuf.rs` that stacked-borrows would reject. Strict-provenance
+    # rejects int-to-pointer round trips, which we don't use.
+    - name: Install nightly Rust + miri component
+      if: inputs.task == 'miri'
+      run: |
+        rustup toolchain install nightly-2026-04-21 --component miri --profile 
minimal --no-self-update
+        cargo +nightly-2026-04-21 miri setup
+      shell: bash
+
+    - name: Run miri
+      if: inputs.task == 'miri'
+      env:
+        MIRIFLAGS: "-Zmiri-tree-borrows -Zmiri-strict-provenance"
+      run: |
+        cargo +nightly-2026-04-21 miri test --locked \
+          -p iggy_binary_protocol \
+          -p consensus
+      shell: bash
+
     # aarch64 builds (run on ARM64 runners)
     - name: Build aarch64-gnu
       if: inputs.task == 'build-aarch64-gnu'
diff --git a/.github/config/components.yml b/.github/config/components.yml
index e7b6dbbbc..2dd9ee3e5 100644
--- a/.github/config/components.yml
+++ b/.github/config/components.yml
@@ -78,6 +78,24 @@ components:
     paths:
       - "core/binary_protocol/**"
 
+  # Miri (UB detector) on unsafe-heavy crates that don't pull tokio/compio.
+  # Scoped to `iggy_binary_protocol` (heavy unsafe in `consensus/iobuf.rs`:
+  # ref-counted CoW buffer, manual atomics, pointer arithmetic) plus the
+  # `consensus` crate (zero unsafe but exercises binary_protocol's unsafe
+  # via integration). Other crates (journal, metadata, partitions, common)
+  # instantiate compio, which uses io_uring syscalls Miri cannot emulate.
+  # See the `# Alignment` doc on the `ConsensusHeader` trait for the
+  # alignment contract that motivates this check.
+  rust-miri:
+    depends_on:
+      - "rust-workspace"
+      - "ci-infrastructure"
+    paths:
+      - "core/binary_protocol/**"
+      - "core/consensus/**"
+    tasks:
+      - "miri"
+
   rust-server:
     depends_on:
       - "rust-workspace"
diff --git a/core/binary_protocol/src/consensus/command.rs 
b/core/binary_protocol/src/consensus/command.rs
index 660bb61cd..384bf696a 100644
--- a/core/binary_protocol/src/consensus/command.rs
+++ b/core/binary_protocol/src/consensus/command.rs
@@ -57,10 +57,13 @@ unsafe impl CheckedBitPattern for Command2 {
 #[cfg(test)]
 mod tests {
     use crate::consensus::GenericHeader;
+    use aligned_vec::{AVec, ConstAlign};
 
     #[test]
     fn invalid_bit_pattern_rejected() {
-        let mut buf = bytes::BytesMut::zeroed(256);
+        // 16-byte aligned (see `ConsensusHeader` doc); `BytesMut` fails Miri.
+        let mut buf: AVec<u8, ConstAlign<16>> = AVec::new(16);
+        buf.resize(256, 0);
         buf[60] = 99;
         let result = bytemuck::checked::try_from_bytes::<GenericHeader>(&buf);
         assert!(result.is_err());
diff --git a/core/binary_protocol/src/consensus/header.rs 
b/core/binary_protocol/src/consensus/header.rs
index a7dbd6800..650c30772 100644
--- a/core/binary_protocol/src/consensus/header.rs
+++ b/core/binary_protocol/src/consensus/header.rs
@@ -54,6 +54,15 @@ pub fn read_size_field(header: &[u8]) -> Option<u32> {
 ///
 /// Every header is exactly [`HEADER_SIZE`] bytes, `#[repr(C)]`, and supports
 /// zero-copy deserialization via `bytemuck`.
+///
+/// # Alignment
+///
+/// All headers contain `u128` fields → 16-byte alignment required by
+/// `bytemuck::checked::try_from_bytes`. Production uses `Owned<MESSAGE_ALIGN>`
+/// / `Frozen<MESSAGE_ALIGN>` (4096-aligned). `Vec<u8>` / `bytes::BytesMut`
+/// request align=1; they over-align under glibc by accident but fail under
+/// strict allocators (Miri, jemalloc, arenas). Use
+/// `aligned_vec::AVec<u8, ConstAlign<16>>` for explicit alignment.
 pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
     const COMMAND: Command2;
 
@@ -917,9 +926,14 @@ mod tests {
         EvictionReason, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, ReplyHeader,
         RequestHeader, StartViewChangeHeader, StartViewHeader,
     };
-
-    fn aligned_zeroed(size: usize) -> bytes::BytesMut {
-        bytes::BytesMut::zeroed(size)
+    use aligned_vec::{AVec, ConstAlign};
+
+    // bytemuck requires 16-byte alignment (see `ConsensusHeader` trait doc).
+    // `BytesMut::zeroed` works on glibc by accident, fails under Miri.
+    fn aligned_zeroed(size: usize) -> AVec<u8, ConstAlign<16>> {
+        let mut v: AVec<u8, ConstAlign<16>> = AVec::new(16);
+        v.resize(size, 0);
+        v
     }
 
     #[test]
diff --git a/core/binary_protocol/src/consensus/iobuf.rs 
b/core/binary_protocol/src/consensus/iobuf.rs
index 340a49f06..7175ef1f6 100644
--- a/core/binary_protocol/src/consensus/iobuf.rs
+++ b/core/binary_protocol/src/consensus/iobuf.rs
@@ -119,9 +119,13 @@ impl<const ALIGN: usize> Owned<ALIGN> {
         assert!(split_at <= self.inner.len());
 
         let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+        let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
         let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
         let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
         let ctrlb = ControlBlock::new(base, len, capacity);
+        // ControlBlock::new succeeded; the buffer is now owned by the
+        // control block. Skip the guard's destructor.
+        guard.defuse();
         unsafe { ctrlb.as_ref().ref_count.fetch_add(1, Ordering::Relaxed) };
 
         let prefix = Prefix {
@@ -252,9 +256,11 @@ impl<const ALIGN: usize> From<Owned<ALIGN>> for 
Frozen<ALIGN> {
     fn from(value: Owned<ALIGN>) -> Self {
         let inner = value.inner;
         let (ptr, _, len, capacity) = inner.into_raw_parts();
+        let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
 
         let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
         let ctrlb = ControlBlock::new(base, len, capacity);
+        guard.defuse();
         Self {
             inner: Extent {
                 ptr: base,
@@ -363,6 +369,46 @@ impl ControlBlock {
     }
 }
 
+/// Drop-on-unwind guard for raw `AVec` parts. Plugs the leak window between
+/// `AVec::into_raw_parts` (surrenders the buffer) and `ControlBlock::new`
+/// (calls `Box::new`, can panic via `handle_alloc_error`). On unwind, `Drop`
+/// reconstitutes the `AVec`. Success paths must call [`AVecRawGuard::defuse`]
+/// before constructing the `Extent` (ownership transferred, no double-free).
+struct AVecRawGuard<const ALIGN: usize> {
+    ptr: *mut u8,
+    len: usize,
+    capacity: usize,
+}
+
+impl<const ALIGN: usize> AVecRawGuard<ALIGN> {
+    fn new(ptr: *mut u8, len: usize, capacity: usize) -> Self {
+        Self { ptr, len, capacity }
+    }
+
+    /// Ownership transferred to new `Extent`; skip destructor.
+    fn defuse(self) {
+        std::mem::forget(self);
+    }
+}
+
+impl<const ALIGN: usize> Drop for AVecRawGuard<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `ptr/len/capacity` come from `AVec::into_raw_parts` of an
+        // `AVec<u8, ConstAlign<ALIGN>>` whose ownership the caller has just
+        // surrendered. While the guard is alive no other handle to the
+        // buffer exists, so reconstituting and dropping the `AVec` is the
+        // unique deallocation path.
+        unsafe {
+            drop(AVec::<u8, ConstAlign<ALIGN>>::from_raw_parts(
+                self.ptr,
+                ALIGN,
+                self.len,
+                self.capacity,
+            ));
+        }
+    }
+}
+
 struct Extent<const ALIGN: usize> {
     pub(crate) ptr: NonNull<u8>,
     pub(crate) len: usize,
@@ -389,9 +435,11 @@ impl<const ALIGN: usize> Extent<ALIGN> {
         v.extend_from_slice(src);
 
         let (ptr, _, len, capacity) = v.into_raw_parts();
+        let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
         let data = unsafe { NonNull::new_unchecked(ptr) };
 
         let ctrlb = ControlBlock::new(data, len, capacity);
+        guard.defuse();
 
         Extent {
             ptr: data,
@@ -425,8 +473,10 @@ pub(crate) fn extent_from_aligned_vec<const ALIGN: usize>(
     vec: AVec<u8, ConstAlign<ALIGN>>,
 ) -> Extent<ALIGN> {
     let (ptr, _, len, capacity) = vec.into_raw_parts();
+    let guard = AVecRawGuard::<ALIGN>::new(ptr, len, capacity);
     let data = unsafe { NonNull::new_unchecked(ptr) };
     let ctrlb = ControlBlock::new(data, len, capacity);
+    guard.defuse();
 
     Extent {
         ptr: data,
@@ -450,11 +500,22 @@ fn extent_ref_count<const ALIGN: usize>(extent: 
&Extent<ALIGN>) -> usize {
     unsafe { extent.ctrlb.as_ref().ref_count.load(Ordering::Acquire) }
 }
 
+/// Decrement refcount; on last reference, drop the box and free the `AVec`.
+///
+/// # Safety
+///
+/// * `ctrlb`: live `ControlBlock`; caller owns one refcount share.
+/// * Caller must not use `ctrlb` after this call.
+/// * `ALIGN` must match the original `AVec<u8, ConstAlign<ALIGN>>` allocation.
 unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb: 
NonNull<ControlBlock>) {
     let old = unsafe { ctrlb.as_ref() }
         .ref_count
         .fetch_sub(1, Ordering::Release);
-    debug_assert!(old > 0, "control block refcount underflow");
+    // Underflow = use-after-free. The `fetch_sub` has already wrapped by the
+    // time we observe `old == 0`, so detection is post-corruption, but
+    // aborting here still beats letting the wrapped count drive a
+    // double-free.
+    assert!(old > 0, "control block refcount underflow");
 
     if old != 1 {
         return;
@@ -474,6 +535,14 @@ unsafe fn release_control_block_w_allocation<const ALIGN: 
usize>(ctrlb: NonNull<
     };
 }
 
+/// Take ownership of a refcount==1 control block (consumes the heap box).
+/// Used by the merge path to recycle `(base, len, capacity)`.
+///
+/// # Safety
+///
+/// * `ctrlb`: live `ControlBlock` with refcount exactly 1 (asserted). Caller
+///   must guarantee no concurrent `clone` can race the load.
+/// * Caller must not use `ctrlb` after this call.
 unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) -> 
ControlBlock {
     assert_eq!(
         unsafe { ctrlb.as_ref() }.ref_count.load(Ordering::Acquire),
@@ -482,6 +551,18 @@ unsafe fn reclaim_unique_control_block(ctrlb: 
NonNull<ControlBlock>) -> ControlB
     unsafe { *Box::from_raw(ctrlb.as_ptr()) }
 }
 
+/// Merge two extents into a single `Owned`. Succeeds when tail's ctrlb is the
+/// only reference (refcount == 2 shared-ctrlb, == 1 separate-ctrlb).
+///
+/// # Safety
+///
+/// * `prefix` + `tail` passed by value (consumed); no external clone may race.
+/// * Compatible allocations:
+///   - Shared ctrlb (from `Owned::split_at`): `prefix.ptr == tail.ctrlb.base`,
+///     `prefix.len + tail.len <= tail.ctrlb.len`.
+///   - Separate ctrlb: `prefix.len == extent_offset_from_base(tail)`, prefix
+///     fits within tail's allocation.
+/// * `ALIGN` matches both backing `AVec`s.
 unsafe fn try_merge_extents<const ALIGN: usize>(
     prefix: Extent<ALIGN>,
     tail: Extent<ALIGN>,
@@ -547,3 +628,368 @@ fn can_coalesce_prefix_with_tail<const ALIGN: usize>(
 
     tail_ref_count == 1 && extent_offset_from_base(&tail.inner) == split_at
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    // Smaller alignment keeps the per-test Miri allocation footprint low while
+    // exercising the same control-block + atomic-refcount + pointer-arithmetic
+    // logic as the production `MESSAGE_ALIGN = 4096`.
+    const A: usize = 64;
+
+    fn owned(data: &[u8]) -> Owned<A> {
+        Owned::<A>::copy_from_slice(data)
+    }
+
+    #[test]
+    fn prefix_from_aligned_vec_roundtrip() {
+        let mut v: AVec<u8, ConstAlign<A>> = AVec::new(A);
+        v.extend_from_slice(b"abc");
+        let prefix = Prefix::<A>::from_aligned_vec(v);
+        assert_eq!(prefix.as_slice(), b"abc");
+    }
+
+    //  Owned::split_at: shared control block / disjoint views
+
+    #[test]
+    fn owned_split_at_shares_ctrlb() {
+        let o = owned(b"abcdefgh");
+        let (prefix, tail) = o.split_at(4);
+        assert!(
+            prefix.aliases(&tail),
+            "Owned::split_at must produce halves sharing one control block",
+        );
+    }
+
+    #[test]
+    fn owned_split_drop_prefix_first_keeps_tail_valid() {
+        let o = owned(b"hello world");
+        let (prefix, tail) = o.split_at(5);
+        drop(prefix);
+        // refcount went 2 -> 1; tail's ctrlb / allocation must still be live.
+        assert_eq!(tail.as_slice(), b" world");
+    }
+
+    #[test]
+    fn owned_split_drop_tail_first_keeps_prefix_mutable() {
+        let o = owned(b"hello world");
+        let (mut prefix, tail) = o.split_at(5);
+        drop(tail);
+        assert_eq!(prefix.as_slice(), b"hello");
+        prefix.as_mut_slice()[0] = b'H';
+        assert_eq!(prefix.as_slice(), b"Hello");
+    }
+
+    #[test]
+    fn owned_split_disjoint_mut_does_not_corrupt_alias() {
+        // Mutate the prefix in-place while a frozen tail still aliases the
+        // same allocation. Tree-borrows must accept disjoint child accesses.
+        let o = owned(b"abcdefgh");
+        let (mut prefix, tail) = o.split_at(4);
+        prefix.as_mut_slice().copy_from_slice(b"WXYZ");
+        assert_eq!(prefix.as_slice(), b"WXYZ");
+        assert_eq!(tail.as_slice(), b"efgh");
+    }
+
+    //  Frozen::clone: refcount round-trip
+
+    #[test]
+    fn frozen_clone_independent_drop() {
+        let f1: Frozen<A> = owned(b"hello").into();
+        let f2 = f1.clone();
+        let f3 = f1.clone();
+        drop(f1);
+        assert_eq!(f2.as_slice(), b"hello");
+        drop(f2);
+        assert_eq!(f3.as_slice(), b"hello");
+    }
+
+    //  Frozen::slice: aliased view, parent-independent lifetime
+
+    fn fslice<R: std::ops::RangeBounds<usize>>(f: &Frozen<A>, range: R) -> 
Frozen<A> {
+        f.slice(range)
+    }
+
+    #[test]
+    fn frozen_slice_survives_parent_drop() {
+        let f: Frozen<A> = owned(b"abcdefgh").into();
+        let s = fslice(&f, 2..6);
+        drop(f);
+        assert_eq!(s.as_slice(), b"cdef");
+    }
+
+    #[test]
+    fn frozen_slice_chain_outlives_intermediates() {
+        let f: Frozen<A> = owned(b"abcdefgh").into();
+        let s1 = fslice(&f, 2..7); // "cdefg"
+        let s2 = fslice(&s1, 1..4); // "def"
+        drop(f);
+        drop(s1);
+        assert_eq!(s2.as_slice(), b"def");
+    }
+
+    //  coalesce: shared-ctrlb path (refcount == 2)
+
+    #[test]
+    fn coalesce_shared_ctrlb_succeeds() {
+        let o = owned(b"hello world");
+        let (prefix, tail) = o.split_at(5);
+        assert!(prefix.can_coalesce_with(&tail));
+        let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+        assert_eq!(merged.as_slice(), b"hello world");
+    }
+
+    #[test]
+    fn coalesce_shared_ctrlb_blocked_by_extra_clone() {
+        let o = owned(b"hello world");
+        let (prefix, tail) = o.split_at(5);
+        let extra = tail.clone(); // refcount 2 -> 3 (prefix + tail + extra)
+        let result = prefix.try_coalesce_with(tail);
+        assert!(
+            result.is_err(),
+            "shared-ctrlb coalesce must require refcount == 2",
+        );
+        let _ = extra;
+    }
+
+    //  coalesce: separate-ctrlb path (refcount == 1, offset matches)
+
+    #[test]
+    fn coalesce_separate_ctrlb_succeeds_and_overwrites_prefix_region() {
+        let f: Frozen<A> = owned(b"abcdefgh").into();
+        let (mut prefix, tail) = f.split_at(3);
+        // prefix lives in its own allocation; mutate it before coalescing.
+        prefix.as_mut_slice().copy_from_slice(b"XYZ");
+        assert!(prefix.can_coalesce_with(&tail));
+        let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+        assert_eq!(merged.as_slice(), b"XYZdefgh");
+    }
+
+    #[test]
+    fn coalesce_separate_ctrlb_blocked_by_extra_tail_clone() {
+        let f: Frozen<A> = owned(b"abcdefgh").into();
+        let (prefix, tail) = f.split_at(3);
+        let extra = tail.clone(); // tail refcount 1 -> 2; separate-ctrlb path 
requires 1
+        let result = prefix.try_coalesce_with(tail);
+        assert!(result.is_err());
+        let _ = extra;
+    }
+
+    #[test]
+    fn coalesce_separate_ctrlb_blocked_by_offset_mismatch() {
+        // Re-slice the tail forward so its offset_from_base no longer matches
+        // the prefix length. The separate-ctrlb path must reject this.
+        let f: Frozen<A> = owned(b"abcdefgh").into();
+        let (prefix, tail) = f.split_at(3);
+        let resliced = fslice(&tail, 2..); // offset_from_base = 5, prefix.len 
= 3
+        drop(tail); // bring resliced refcount back to 1
+        let result = prefix.try_coalesce_with(resliced);
+        assert!(result.is_err());
+    }
+
+    //  IoBufMut / SetLen plumbing through MaybeUninit
+
+    #[test]
+    fn owned_set_len_after_uninit_writes() {
+        let mut o: Owned<A> = Owned::with_capacity(64);
+        let cap = o.buf_capacity();
+        assert!(cap >= 64);
+        {
+            let uninit = o.as_uninit();
+            for (i, slot) in uninit.iter_mut().take(8).enumerate() {
+                slot.write(u8::try_from(i).unwrap());
+            }
+        }
+        // SAFETY: 8 bytes were just initialized via the uninit slice above.
+        unsafe { o.set_len(8) };
+        assert_eq!(o.as_slice(), &[0, 1, 2, 3, 4, 5, 6, 7]);
+    }
+
+    //  Concurrent clone/drop: validates atomic ordering on refcount
+
+    #[test]
+    fn frozen_clone_drop_concurrently() {
+        let f: Frozen<A> = owned(b"shared payload").into();
+        let mut handles = Vec::new();
+        for _ in 0..4 {
+            let f_thread = f.clone();
+            handles.push(std::thread::spawn(move || {
+                let a = f_thread.clone();
+                let b = f_thread.clone();
+                assert_eq!(a.as_slice(), b"shared payload");
+                assert_eq!(b.as_slice(), b"shared payload");
+                drop(a);
+                drop(b);
+                drop(f_thread);
+            }));
+        }
+        for h in handles {
+            h.join().unwrap();
+        }
+        assert_eq!(f.as_slice(), b"shared payload");
+    }
+
+    //  Concurrent atomics on a shared control block
+    //
+    // Three handles share one control block: `prefix` and `tail` (returned
+    // by `Owned::split_at`) plus a clone `extra` held by another thread.
+    // Refcount starts at 3. The other thread drops `extra` (Release
+    // fetch_sub: 3 -> 2); the calling thread races a `try_coalesce_with`
+    // (Acquire load of the same refcount).
+    //
+    // `prefix` and `tail` are passed to `try_coalesce_with` by value, so
+    // the only refcount transition the calling thread does NOT cause is
+    // `extra`'s drop. The two values the calling thread can observe in
+    // `can_coalesce_prefix_with_tail` are:
+    //   - refcount = 3 → shared-ctrlb gate fails → Err((prefix, tail))
+    //   - refcount = 2 → gate passes → merge fires → Ok(merged)
+    // Both outcomes must be sound and produce the correct bytes; Miri's
+    // tree-borrows + race detector validates the Release/Acquire pairing.
+    #[test]
+    fn concurrent_coalesce_with_dropping_clone() {
+        use std::sync::{Arc, Barrier};
+
+        // Few iterations: Miri scales poorly with thread count and loop
+        // depth, but its scheduler explores some interleavings each run.
+        for _ in 0..4 {
+            let o = owned(b"hello world");
+            let (prefix, tail) = o.split_at(5);
+            let extra = tail.clone(); // refcount: 2 -> 3
+
+            let barrier = Arc::new(Barrier::new(2));
+            let b_thread = Arc::clone(&barrier);
+
+            let handle = std::thread::spawn(move || {
+                b_thread.wait();
+                drop(extra); // refcount: 3 -> 2 (interleaving varies)
+            });
+
+            barrier.wait();
+            let result = prefix.try_coalesce_with(tail);
+            handle.join().unwrap();
+            match result {
+                Ok(merged) => {
+                    // `extra` dropped before our `can_coalesce` load
+                    // (observed refcount = 2): shared-ctrlb merge fired
+                    // and recovered the original 11-byte buffer.
+                    assert_eq!(merged.as_slice(), b"hello world");
+                }
+                Err((prefix, tail)) => {
+                    // `extra` still alive at our `can_coalesce` load
+                    // (observed refcount = 3): merge declined, both
+                    // halves returned with their original contents.
+                    assert_eq!(prefix.as_slice(), b"hello");
+                    assert_eq!(tail.as_slice(), b" world");
+                    // The racing drop has completed by now (we joined
+                    // above), so refcount is 2 and a retry from the
+                    // same caller must succeed.
+                    let merged = prefix
+                        .try_coalesce_with(tail)
+                        .expect("retry after racing drop must succeed");
+                    assert_eq!(merged.as_slice(), b"hello world");
+                }
+            }
+        }
+    }
+
+    //  Drop-on-panic: destructors must clean up under unwinding
+    //
+    // Miri's leak detector enforces this implicitly: any allocation not
+    // freed by the time the test exits fails the run. The companion case
+    // where `Box::new(ControlBlock)` panics between `into_raw_parts` and
+    // the new `Extent` is covered by the `AVecRawGuard` tests below.
+    #[test]
+    fn drop_on_panic_with_active_iobuf_does_not_leak() {
+        let result = std::panic::catch_unwind(|| {
+            let o = owned(b"abcd");
+            let (prefix, tail) = o.split_at(2);
+            let frozen: Frozen<A> = owned(b"xyz").into();
+            let clone = frozen.clone();
+            // Anchor each binding with a positive content check so the
+            // test would not pass for the wrong reason if a future
+            // refactor silently elided one of the four values: the
+            // corresponding `assert_eq!` would fail to compile (the
+            // identifier disappears) and surface the regression. Without
+            // these anchors, an elided value also elides its allocation
+            // and Miri's leak detector stays satisfied.
+            assert_eq!(prefix.as_slice(), b"ab");
+            assert_eq!(tail.as_slice(), b"cd");
+            assert_eq!(frozen.as_slice(), b"xyz");
+            assert_eq!(clone.as_slice(), b"xyz");
+            // Bind everything in a tuple so all four destructors run
+            // during unwinding; each decrements one of two control
+            // blocks and Miri verifies the buffers are released.
+            let _keep = (prefix, tail, frozen, clone);
+            panic!("simulated user-code panic");
+        });
+        assert!(result.is_err());
+    }
+
+    //  Boundary splits: Owned::split_at(0) and Owned::split_at(len)
+    //
+    // At split_at(0): prefix.len == 0, tail.ptr aliases base.
+    // At split_at(len): prefix.len == len, tail.ptr is one-past-end with len 
0.
+    // Both must round-trip through try_coalesce_with cleanly.
+
+    #[test]
+    fn coalesce_after_owned_split_at_zero_round_trips() {
+        let o = owned(b"hello");
+        let (prefix, tail) = o.split_at(0);
+        assert!(prefix.is_empty());
+        assert_eq!(tail.as_slice(), b"hello");
+        let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+        assert_eq!(merged.as_slice(), b"hello");
+    }
+
+    #[test]
+    fn coalesce_after_owned_split_at_full_len_round_trips() {
+        let o = owned(b"hello");
+        let (prefix, tail) = o.split_at(5);
+        assert_eq!(prefix.as_slice(), b"hello");
+        assert!(tail.is_empty());
+        let merged = prefix.try_coalesce_with(tail).expect("coalesce");
+        assert_eq!(merged.as_slice(), b"hello");
+    }
+
+    //  AVecRawGuard: panic between into_raw_parts and ControlBlock::new
+    //
+    // The window between `AVec::into_raw_parts` (which surrenders the
+    // backing buffer's destructor) and `ControlBlock::new` (which calls
+    // `Box::new`, fallible under allocator pressure) used to leak the
+    // buffer if `Box::new` panicked. `AVecRawGuard` reconstitutes the
+    // `AVec` on unwind so the buffer is freed.
+    //
+    // We can't readily inject a panic into `Box::new` itself, so this
+    // exercises the guard primitive directly: a `panic!` while the guard
+    // is alive must release the buffer. Miri's leak detector enforces it.
+
+    #[test]
+    fn avec_raw_guard_releases_buffer_on_unwind() {
+        let result = std::panic::catch_unwind(|| {
+            let mut v: AVec<u8, ConstAlign<A>> = AVec::new(A);
+            v.extend_from_slice(b"buffer that must be freed when the guard 
drops");
+            let (ptr, _, len, capacity) = v.into_raw_parts();
+            let _guard = AVecRawGuard::<A>::new(ptr, len, capacity);
+            panic!("simulated allocator failure inside ControlBlock::new");
+        });
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn avec_raw_guard_defuse_skips_destructor() {
+        // Success path: `defuse` must skip the destructor so the buffer is
+        // not double-freed when the new owner (Extent / control block)
+        // releases it. Reconstitute the AVec manually after defusing and
+        // confirm the bytes are intact: proves the guard did NOT free.
+        let mut v: AVec<u8, ConstAlign<A>> = AVec::new(A);
+        v.extend_from_slice(b"contents");
+        let (ptr, _, len, capacity) = v.into_raw_parts();
+        let guard = AVecRawGuard::<A>::new(ptr, len, capacity);
+        guard.defuse();
+        // SAFETY: defuse forgot the guard, so the raw parts are still
+        // ours to reconstitute exactly once.
+        let v = unsafe { AVec::<u8, ConstAlign<A>>::from_raw_parts(ptr, A, 
len, capacity) };
+        assert_eq!(v.as_slice(), b"contents");
+    }
+}
diff --git a/core/binary_protocol/src/consensus/message.rs 
b/core/binary_protocol/src/consensus/message.rs
index de19029c0..76bed247b 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -278,6 +278,17 @@ where
         Ok(typed_message)
     }
 
+    /// Construct a typed `Message<H, B>` without re-validating the header.
+    ///
+    /// # Safety
+    ///
+    /// Caller must guarantee:
+    /// * `backing.total_len() >= size_of::<H>()`.
+    /// * Header bytes are a valid `H` bit pattern (`try_from_bytes` would 
succeed).
+    /// * `H::validate` would return `Ok`.
+    ///
+    /// Prefer `try_into_typed::<H>()`. Only use when bytes already validated
+    /// via another route (e.g. enclosing `MessageBag::try_from` dispatch).
     const unsafe fn from_backing_unchecked(backing: B) -> Self {
         Self {
             backing,
@@ -518,40 +529,25 @@ where
 {
     type Error = ConsensusError;
 
+    // Dispatch via `try_into_typed::<H>()`: re-runs per-typed `validate()`.
+    // `from_backing_unchecked` trusts the command byte alone, letting
+    // invariant violations (Commit size != 256, DoViewChange log_view > view)
+    // reach the router.
     fn try_from(value: Message<T>) -> Result<Self, Self::Error> {
         let command = value.as_generic().header().command;
-        let backing = value.into_inner();
 
         match command {
-            Command2::Prepare => {
-                let msg = unsafe { 
Message::<PrepareHeader>::from_backing_unchecked(backing) };
-                Ok(Self::Prepare(msg))
-            }
-            Command2::Request => {
-                let msg = unsafe { 
Message::<RequestHeader>::from_backing_unchecked(backing) };
-                Ok(Self::Request(msg))
-            }
-            Command2::PrepareOk => {
-                let msg = unsafe { 
Message::<PrepareOkHeader>::from_backing_unchecked(backing) };
-                Ok(Self::PrepareOk(msg))
-            }
-            Command2::StartViewChange => {
-                let msg =
-                    unsafe { 
Message::<StartViewChangeHeader>::from_backing_unchecked(backing) };
-                Ok(Self::StartViewChange(msg))
-            }
-            Command2::DoViewChange => {
-                let msg = unsafe { 
Message::<DoViewChangeHeader>::from_backing_unchecked(backing) };
-                Ok(Self::DoViewChange(msg))
-            }
-            Command2::StartView => {
-                let msg = unsafe { 
Message::<StartViewHeader>::from_backing_unchecked(backing) };
-                Ok(Self::StartView(msg))
-            }
-            Command2::Commit => {
-                let msg = unsafe { 
Message::<CommitHeader>::from_backing_unchecked(backing) };
-                Ok(Self::Commit(msg))
-            }
+            Command2::Prepare => 
Ok(Self::Prepare(value.try_into_typed::<PrepareHeader>()?)),
+            Command2::Request => 
Ok(Self::Request(value.try_into_typed::<RequestHeader>()?)),
+            Command2::PrepareOk => 
Ok(Self::PrepareOk(value.try_into_typed::<PrepareOkHeader>()?)),
+            Command2::StartViewChange => Ok(Self::StartViewChange(
+                value.try_into_typed::<StartViewChangeHeader>()?,
+            )),
+            Command2::DoViewChange => Ok(Self::DoViewChange(
+                value.try_into_typed::<DoViewChangeHeader>()?,
+            )),
+            Command2::StartView => 
Ok(Self::StartView(value.try_into_typed::<StartViewHeader>()?)),
+            Command2::Commit => 
Ok(Self::Commit(value.try_into_typed::<CommitHeader>()?)),
             // Reply / Eviction are server-to-client frames; they do not
             // appear on the inbound dispatch path.
             Command2::Reply | Command2::Eviction => {
@@ -564,3 +560,250 @@ where
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::consensus::{Operation, ReplyHeader};
+    use smallvec::smallvec;
+
+    // Field offsets via `offset_of!`: a field reorder fails to compile here
+    // rather than silently corrupting test bytes.
+    const SIZE_OFF: usize = std::mem::offset_of!(RequestHeader, size);
+    const COMMAND_OFF: usize = std::mem::offset_of!(RequestHeader, command);
+    const REQUEST_CLIENT_OFF: usize = std::mem::offset_of!(RequestHeader, 
client);
+    const REQUEST_OPERATION_OFF: usize = std::mem::offset_of!(RequestHeader, 
operation);
+    const REQUEST_SESSION_OFF: usize = std::mem::offset_of!(RequestHeader, 
session);
+
+    fn header_bytes(command: Command2, size: u32) -> Owned<MESSAGE_ALIGN> {
+        let mut o = Owned::<MESSAGE_ALIGN>::zeroed(256);
+        {
+            let buf = o.as_mut_slice();
+            buf[SIZE_OFF..SIZE_OFF + 4].copy_from_slice(&size.to_le_bytes());
+            buf[COMMAND_OFF] = command as u8;
+            // Typed headers reject client == 0. `#[repr(C)]` preamble layout
+            // is shared, so this offset works across header types.
+            buf[REQUEST_CLIENT_OFF..REQUEST_CLIENT_OFF + 16]
+                .copy_from_slice(&0xCAFE_u128.to_le_bytes());
+        }
+        o
+    }
+
+    // Construction via Message::new (zeroed)
+
+    #[test]
+    #[should_panic(expected = "size must be at least header size")]
+    fn message_new_smaller_than_header_panics() {
+        let _ = Message::<RequestHeader>::new(100);
+    }
+
+    // try_from(Owned): validation gates the unsafe construction
+
+    #[test]
+    fn try_from_owned_too_short_returns_err() {
+        let owned = Owned::<MESSAGE_ALIGN>::zeroed(100);
+        let result = Message::<RequestHeader>::try_from(owned);
+        assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+    }
+
+    #[test]
+    fn try_from_owned_invalid_bit_pattern_returns_err() {
+        let mut owned = Owned::<MESSAGE_ALIGN>::zeroed(256);
+        owned.as_mut_slice()[COMMAND_OFF] = 99; // outside Command2's 
discriminant range
+        let result = Message::<RequestHeader>::try_from(owned);
+        assert!(matches!(result, Err(ConsensusError::InvalidBitPattern)));
+    }
+
+    #[test]
+    fn try_from_owned_buffer_shorter_than_claimed_size_returns_err() {
+        // Header parses cleanly (RequestHeader::validate doesn't gate on
+        // size), but the encoded `size` field claims more bytes than the
+        // backing buffer holds. The buffer-bounds check at the bottom of
+        // `Message::try_from` must reject. (Both this case and the
+        // "buffer shorter than `size_of::<H>`" case currently surface as
+        // the same `InvalidCommand` variant; promoting them to distinct
+        // `ConsensusError` variants is a separate hardening pass.)
+        let owned = header_bytes(Command2::Request, 999);
+        // header_bytes already produces a 256-byte buffer; size=999 > 256,
+        // so try_from rejects via `bytes.len() < header.size()`.
+        let result = Message::<RequestHeader>::try_from(owned);
+        assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+    }
+
+    // as_generic: const unsafe pointer cast (#[repr(C)] equivalence)
+
+    #[test]
+    fn as_generic_view_reads_command_byte() {
+        let owned = header_bytes(Command2::Request, 256);
+        let typed = Message::<RequestHeader>::try_from(owned).expect("valid");
+        let generic = typed.as_generic();
+        assert_eq!(generic.header().command, Command2::Request);
+        assert_eq!(generic.total_len(), 256);
+    }
+
+    // try_as_typed: validation gates the unsafe ptr-cast reborrow
+
+    #[test]
+    fn try_as_typed_command_mismatch_returns_err_without_unsafe_cast() {
+        // bytes are a valid Prepare; asking for RequestHeader must fail
+        // *before* the unsafe ptr-cast inside try_as_typed.
+        let owned = header_bytes(Command2::Prepare, 256);
+        let generic = 
Message::<GenericHeader>::try_from(owned).expect("valid");
+        let result = generic.try_as_typed::<RequestHeader>();
+        assert!(matches!(
+            result,
+            Err(ConsensusError::InvalidCommand {
+                expected: Command2::Request,
+                found: Command2::Prepare,
+            })
+        ));
+    }
+
+    #[test]
+    fn try_as_typed_invalid_validation_returns_err() {
+        // RequestHeader::validate rejects operation=Register with non-zero 
session.
+        let mut owned = header_bytes(Command2::Request, 256);
+        {
+            let buf = owned.as_mut_slice();
+            buf[REQUEST_OPERATION_OFF] = Operation::Register as u8;
+            buf[REQUEST_SESSION_OFF..REQUEST_SESSION_OFF + 
8].copy_from_slice(&5u64.to_le_bytes());
+        }
+        let generic = Message::<GenericHeader>::try_from(owned).expect("valid 
generic");
+        let result = generic.try_as_typed::<RequestHeader>();
+        assert!(matches!(result, Err(ConsensusError::InvalidField(_))));
+    }
+
+    // try_into_typed: consuming variant of try_as_typed
+
+    #[test]
+    fn try_into_typed_command_mismatch_returns_err() {
+        let owned = header_bytes(Command2::Prepare, 256);
+        let generic = 
Message::<GenericHeader>::try_from(owned).expect("valid");
+        let result = generic.try_into_typed::<RequestHeader>();
+        assert!(matches!(
+            result,
+            Err(ConsensusError::InvalidCommand {
+                expected: Command2::Request,
+                found: Command2::Prepare,
+            })
+        ));
+    }
+
+    // MessageBag dispatch: 7 unsafe `from_backing_unchecked` arms
+
+    fn dispatch(command: Command2, size: u32) -> Result<MessageBag, 
ConsensusError> {
+        let owned = header_bytes(command, size);
+        let generic = Message::<GenericHeader>::try_from(owned).expect("valid 
generic");
+        MessageBag::try_from(generic)
+    }
+
+    #[test]
+    fn messagebag_dispatch_unsupported_command_returns_err() {
+        // Ping is a valid Command2 bit pattern but is not a MessageBag 
variant.
+        let owned = header_bytes(Command2::Ping, 256);
+        let generic = Message::<GenericHeader>::try_from(owned).expect("valid 
generic");
+        let result = MessageBag::try_from(generic);
+        assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+    }
+
+    #[test]
+    fn messagebag_command_method_round_trips() {
+        for cmd in [
+            Command2::Request,
+            Command2::Prepare,
+            Command2::PrepareOk,
+            Command2::StartViewChange,
+            Command2::DoViewChange,
+            Command2::StartView,
+            Command2::Commit,
+        ] {
+            let bag = dispatch(cmd, 256).expect("dispatch");
+            assert_eq!(bag.command(), cmd, "round-trip for {cmd:?}");
+            assert_eq!(bag.size(), 256, "size for {cmd:?}");
+        }
+    }
+
+    // MessageBag dispatch must enforce per-typed validate()
+    // (Commit size != 256, Register with session != 0, etc.)
+
+    #[test]
+    fn messagebag_dispatch_commit_with_invalid_size_returns_err() {
+        // `CommitHeader::validate` rejects size != 256.
+        let owned = header_bytes(Command2::Commit, 128);
+        let generic = Message::<GenericHeader>::try_from(owned).expect("valid 
generic");
+        let result = MessageBag::try_from(generic);
+        assert!(matches!(
+            result,
+            Err(ConsensusError::CommitInvalidSize(128))
+        ));
+    }
+
+    #[test]
+    fn messagebag_dispatch_request_with_invalid_register_session_returns_err() 
{
+        // `RequestHeader::validate` rejects Register with non-zero session.
+        let mut owned = header_bytes(Command2::Request, 256);
+        {
+            let buf = owned.as_mut_slice();
+            buf[REQUEST_OPERATION_OFF] = Operation::Register as u8;
+            buf[REQUEST_SESSION_OFF..REQUEST_SESSION_OFF + 
8].copy_from_slice(&5u64.to_le_bytes());
+        }
+        let generic = Message::<GenericHeader>::try_from(owned).expect("valid 
generic");
+        let result = MessageBag::try_from(generic);
+        assert!(matches!(result, Err(ConsensusError::InvalidField(_))));
+    }
+
+    // deep_copy: byte-level independence after clone-like API
+
+    #[test]
+    fn request_message_deep_copy_independent() {
+        let owned = header_bytes(Command2::Request, 256);
+        let mut msg = 
Message::<RequestHeader>::try_from(owned).expect("valid");
+        let copy = msg.deep_copy();
+        // Mutate the original's bytes; the deep copy must be untouched.
+        msg.as_mut_slice()[200] = 0xab;
+        assert_eq!(copy.as_slice()[200], 0);
+        assert_eq!(msg.as_slice()[200], 0xab);
+    }
+
+    // transmute_header: rewrites the typed header in place
+
+    #[test]
+    fn transmute_header_request_to_prepare() {
+        let owned = header_bytes(Command2::Request, 256);
+        let msg = Message::<RequestHeader>::try_from(owned).expect("valid");
+        let prepared: Message<PrepareHeader> =
+            msg.transmute_header::<PrepareHeader>(|_old, new| {
+                new.command = Command2::Prepare;
+                new.size = 256;
+            });
+        assert_eq!(prepared.header().command, Command2::Prepare);
+    }
+
+    // ResponseBacking via SmallVec<Frozen>
+
+    #[test]
+    fn response_backing_single_fragment_roundtrip() {
+        let owned = header_bytes(Command2::Reply, 256);
+        let frozen: Frozen<MESSAGE_ALIGN> = owned.into();
+        let fragments: smallvec::SmallVec<[Frozen<MESSAGE_ALIGN>; 4]> = 
smallvec![frozen];
+        let msg = Message::<ReplyHeader, 
ResponseBacking>::try_from(fragments).expect("valid");
+        assert_eq!(msg.header().command, Command2::Reply);
+        assert_eq!(msg.fragments().len(), 1);
+    }
+
+    #[test]
+    fn response_backing_empty_fragments_returns_err() {
+        let fragments: smallvec::SmallVec<[Frozen<MESSAGE_ALIGN>; 4]> = 
smallvec![];
+        let result = Message::<ReplyHeader, 
ResponseBacking>::try_from(fragments);
+        assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+    }
+
+    #[test]
+    fn response_backing_first_fragment_too_short_returns_err() {
+        let owned = Owned::<MESSAGE_ALIGN>::zeroed(100);
+        let frozen: Frozen<MESSAGE_ALIGN> = owned.into();
+        let fragments: smallvec::SmallVec<[Frozen<MESSAGE_ALIGN>; 4]> = 
smallvec![frozen];
+        let result = Message::<ReplyHeader, 
ResponseBacking>::try_from(fragments);
+        assert!(matches!(result, Err(ConsensusError::InvalidCommand { .. })));
+    }
+}
diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
index 1291febb4..0d81b9cc9 100644
--- a/core/common/src/alloc/buffer.rs
+++ b/core/common/src/alloc/buffer.rs
@@ -346,3 +346,168 @@ impl IoBufMut for PooledBuffer {
         unsafe { std::slice::from_raw_parts_mut(ptr, cap) }
     }
 }
+
+#[cfg(test)]
+mod miri_tests {
+    //! Miri targets the 3 unsafe sites: `IoBufMut::as_uninit`
+    //! (`from_raw_parts_mut` ptr cast), `SetLen::set_len`, `split_to`
+    //! (`ptr::copy` forward-overlap + `set_len`).
+    //!
+    //! Pool-free helper avoids the global `MEMORY_POOL` (a `OnceCell` whose
+    //! `ArrayQueue` buckets leak under Miri) and `serial_test` (pulls
+    //! `sdd`/`scc` with int→ptr casts rejected by `-Zmiri-strict-provenance`).
+    //! `split_to` tests need the pool internally; gated `#[cfg(not(miri))]`.
+
+    use super::*;
+    use aligned_vec::{AVec, ConstAlign};
+
+    /// Build `PooledBuffer` with `from_pool == false` to skip the global pool.
+    fn pool_free_with_capacity(cap: usize) -> PooledBuffer {
+        let v: AVec<u8, ConstAlign<ALIGNMENT>> = 
AVec::with_capacity(ALIGNMENT, cap);
+        PooledBuffer::from_existing(v)
+    }
+
+    // IoBufMut::as_uninit
+
+    #[test]
+    fn as_uninit_returns_slice_of_full_capacity() {
+        let mut buf = pool_free_with_capacity(256);
+        let cap_before = buf.capacity();
+        let uninit = buf.as_uninit();
+        assert_eq!(
+            uninit.len(),
+            cap_before,
+            "as_uninit must expose the full capacity, not just initialized 
len",
+        );
+    }
+
+    #[test]
+    fn as_uninit_pointer_is_4096_aligned() {
+        let mut buf = pool_free_with_capacity(8192);
+        let addr = buf.as_uninit().as_mut_ptr() as usize;
+        assert_eq!(addr % ALIGNMENT, 0);
+    }
+
+    #[test]
+    fn as_uninit_write_then_set_len_observes_writes() {
+        let mut buf = pool_free_with_capacity(128);
+        {
+            let uninit = buf.as_uninit();
+            for (i, slot) in uninit.iter_mut().take(16).enumerate() {
+                slot.write(u8::try_from(i).unwrap());
+            }
+        }
+        // SAFETY: 16 bytes initialized above.
+        unsafe { <PooledBuffer as SetLen>::set_len(&mut buf, 16) };
+        assert_eq!(
+            buf.as_init(),
+            &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
+        );
+    }
+
+    // SetLen::set_len
+
+    #[test]
+    fn set_len_to_full_capacity_after_uninit_fill() {
+        // Fill entire capacity then `set_len(cap)`: Miri verifies every
+        // byte initialized before read via `as_init()`.
+        let mut buf = pool_free_with_capacity(256);
+        let cap = buf.capacity();
+        {
+            let uninit = buf.as_uninit();
+            assert_eq!(uninit.len(), cap);
+            for (i, slot) in uninit.iter_mut().enumerate() {
+                slot.write(u8::try_from(i & 0xff).unwrap());
+            }
+        }
+        // SAFETY: 0..cap initialized above.
+        unsafe { <PooledBuffer as SetLen>::set_len(&mut buf, cap) };
+        assert_eq!(buf.len(), cap);
+        assert_eq!(buf.as_init()[0], 0);
+        assert_eq!(
+            buf.as_init()[cap - 1],
+            u8::try_from((cap - 1) & 0xff).unwrap()
+        );
+    }
+
+    // split_to: `ptr::copy` (forward-overlap capable) + `set_len`.
+    // Needs global pool; skip Miri (ArrayQueue retention reads as leak,
+    // serial_test fails strict-provenance).
+
+    #[cfg(not(miri))]
+    mod split_to {
+        use super::*;
+        use crate::IggyByteSize;
+        use crate::alloc::memory_pool::{MemoryPool, MemoryPoolConfigOther};
+        use serial_test::serial;
+        use std::str::FromStr;
+        use std::sync::Once;
+
+        static MIRI_POOL_INIT: Once = Once::new();
+
+        fn init_pool_for_split_to_tests() {
+            MIRI_POOL_INIT.call_once(|| {
+                let config = MemoryPoolConfigOther {
+                    enabled: true,
+                    size: IggyByteSize::from_str("64MiB").unwrap(),
+                    bucket_capacity: 16,
+                };
+                MemoryPool::init_pool(&config);
+            });
+        }
+
+        #[test]
+        #[serial(memory_pool)]
+        fn basic_split() {
+            init_pool_for_split_to_tests();
+            let mut buf = pool_free_with_capacity(64);
+            buf.extend_from_slice(b"abcdefghij");
+            let prefix = buf.split_to(4);
+            assert_eq!(prefix.as_ref(), b"abcd");
+            assert_eq!(buf.as_ref(), b"efghij");
+        }
+
+        #[test]
+        #[serial(memory_pool)]
+        fn at_zero_yields_empty_prefix_and_unchanged_self() {
+            init_pool_for_split_to_tests();
+            let mut buf = pool_free_with_capacity(32);
+            buf.extend_from_slice(b"abcd");
+            let prefix = buf.split_to(0);
+            assert!(prefix.is_empty());
+            assert_eq!(buf.as_ref(), b"abcd");
+        }
+
+        #[test]
+        #[serial(memory_pool)]
+        fn at_len_yields_full_prefix_and_empty_self() {
+            init_pool_for_split_to_tests();
+            let mut buf = pool_free_with_capacity(32);
+            buf.extend_from_slice(b"abcd");
+            let prefix = buf.split_to(4);
+            assert_eq!(prefix.as_ref(), b"abcd");
+            assert!(buf.is_empty());
+        }
+
+        #[test]
+        #[serial(memory_pool)]
+        fn forward_overlap_preserves_bytes() {
+            // at=2, len=8: source [2..8) overlaps destination [0..6).
+            // `ptr::copy` handles overlap; `copy_nonoverlapping` would be UB.
+            init_pool_for_split_to_tests();
+            let mut buf = pool_free_with_capacity(32);
+            buf.extend_from_slice(b"ABCDEFGH");
+            let prefix = buf.split_to(2);
+            assert_eq!(prefix.as_ref(), b"AB");
+            assert_eq!(buf.as_ref(), b"CDEFGH");
+        }
+    }
+
+    // Drop short-circuit for `from_pool == false`. Miri leak detector 
enforces.
+    #[test]
+    fn pool_free_buffer_drop_does_not_leak() {
+        let mut buf = pool_free_with_capacity(256);
+        buf.extend_from_slice(&[0xaa; 200]);
+        drop(buf);
+    }
+}
diff --git a/core/common/src/types/send_messages2.rs 
b/core/common/src/types/send_messages2.rs
index 58390db04..73d07126f 100644
--- a/core/common/src/types/send_messages2.rs
+++ b/core/common/src/types/send_messages2.rs
@@ -583,16 +583,37 @@ fn decode_request_slice(body: &[u8]) -> 
Result<SendMessages2Ref<'_>, IggyError>
     Ok(SendMessages2Ref { header, blob })
 }
 
+/// Decode a `Prepare` message from a slice of bytes.
+///
+/// `bytes` must be 16-byte aligned (`PrepareHeader` has `u128` fields). Source
+/// from `Frozen<MESSAGE_ALIGN>` / `Owned<MESSAGE_ALIGN>` / `Message<H>`.
+/// Misalignment: `debug_assert!` in debug; `InvalidCommand` in release.
+///
+/// # Errors
+///
+/// `IggyError::InvalidCommand` on: short buffer, bad bit pattern, `size`
+/// outside `[header_size, bytes.len()]`, short/checksum-mismatched body.
 pub fn decode_prepare_slice(bytes: &[u8]) -> Result<SendMessages2Ref<'_>, 
IggyError> {
     let header_size = std::mem::size_of::<PrepareHeader>();
     if bytes.len() < header_size {
         return Err(IggyError::InvalidCommand);
     }
 
+    // Bytemuck enforces alignment in release (maps to InvalidCommand below);
+    // debug_assert surfaces the contract violation early in dev.
+    debug_assert_eq!(
+        bytes
+            .as_ptr()
+            .align_offset(std::mem::align_of::<PrepareHeader>()),
+        0,
+        "decode_prepare_slice: bytes must be at least 16-byte aligned",
+    );
+
     let prepare = 
bytemuck::checked::try_from_bytes::<PrepareHeader>(&bytes[..header_size])
         .map_err(|_| IggyError::InvalidCommand)?;
     let total_size = prepare.size as usize;
-    if bytes.len() < total_size {
+    // Wire-controllable `size`: reject < header_size to avoid slice OOB below.
+    if total_size < header_size || bytes.len() < total_size {
         return Err(IggyError::InvalidCommand);
     }
 
@@ -753,3 +774,50 @@ fn read_u128(bytes: &[u8], offset: usize) -> Result<u128, 
IggyError> {
         .map(u128::from_le_bytes)
         .ok_or(IggyError::InvalidNumberEncoding)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_binary_protocol::Command2;
+
+    fn aligned_prepare_bytes(size: u32) -> Owned<MESSAGE_ALIGN> {
+        let mut owned = 
Owned::<MESSAGE_ALIGN>::zeroed(std::mem::size_of::<PrepareHeader>());
+        let header: &mut PrepareHeader =
+            bytemuck::checked::try_from_bytes_mut(owned.as_mut_slice())
+                .expect("zeroed bytes form a valid PrepareHeader");
+        header.command = Command2::Prepare;
+        header.size = size;
+        owned
+    }
+
+    #[test]
+    fn decode_prepare_slice_size_below_header_size_does_not_panic() {
+        // Regression: without the `total_size < header_size` guard,
+        // `&bytes[256..size]` panics for any size < 256.
+        for adversarial_size in [0u32, 255] {
+            let owned = aligned_prepare_bytes(adversarial_size);
+            let result = decode_prepare_slice(owned.as_slice());
+            assert!(
+                matches!(result, Err(IggyError::InvalidCommand)),
+                "size={adversarial_size} must be rejected, got {result:?}",
+            );
+        }
+    }
+
+    #[cfg(debug_assertions)]
+    #[test]
+    #[should_panic(expected = "must be at least 16-byte aligned")]
+    fn decode_prepare_slice_debug_asserts_on_misaligned_input() {
+        // `Vec<u8>` requests align=1; glibc returns a base that is a
+        // multiple of 16, so `&buf[1..]` has offset 1 mod 16, eliably
+        // misaligned.
+        let buf: Vec<u8> = vec![0u8; std::mem::size_of::<PrepareHeader>() + 1];
+        let misaligned = &buf[1..];
+        assert_ne!(
+            misaligned.as_ptr().align_offset(16),
+            0,
+            "test setup: allocator returned non-16k base",
+        );
+        let _ = decode_prepare_slice(misaligned);
+    }
+}
diff --git a/core/journal/src/prepare_journal.rs 
b/core/journal/src/prepare_journal.rs
index 283db5b4d..2c5b564bc 100644
--- a/core/journal/src/prepare_journal.rs
+++ b/core/journal/src/prepare_journal.rs
@@ -19,7 +19,7 @@ use crate::file_storage::FileStorage;
 use crate::{Journal, JournalHandle};
 use compio::io::AsyncWriteAtExt;
 use iggy_binary_protocol::consensus::iobuf::Owned;
-use iggy_binary_protocol::consensus::message::Message;
+use iggy_binary_protocol::consensus::message::{MESSAGE_ALIGN, Message};
 use iggy_binary_protocol::consensus::{Command2, PrepareHeader};
 use std::cell::{Cell, Ref, RefCell};
 use std::fmt;
@@ -139,12 +139,23 @@ impl PrepareJournal {
         let mut last_op: Option<u64> = None;
         let mut pos: u64 = 0;
         let mut header_buf = vec![0u8; HEADER_SIZE];
+        // Reused 16-aligned scratch (PrepareHeader has u128 fields). Avoids
+        // per-iteration 4 KiB-aligned alloc; bytes never become a `Message`.
+        let mut aligned = Owned::<16>::zeroed(HEADER_SIZE);
 
         while pos + HEADER_SIZE as u64 <= file_len {
             // Read the 256-byte header
             header_buf = storage.read_at(pos, header_buf).await?;
-            let header: PrepareHeader =
-                *bytemuck::checked::from_bytes::<PrepareHeader>(&header_buf);
+            aligned.as_mut_slice().copy_from_slice(&header_buf);
+            // `try_from_bytes`: corrupt discriminant on disk must NOT panic;
+            // route through the same truncate-here branch as command/size 
below.
+            let Ok(header_ref) =
+                
bytemuck::checked::try_from_bytes::<PrepareHeader>(aligned.as_slice())
+            else {
+                storage.truncate(pos).await?;
+                break;
+            };
+            let header: PrepareHeader = *header_ref;
 
             // Validate: must be a Prepare command with sane size
             if header.command != Command2::Prepare
@@ -259,7 +270,7 @@ impl PrepareJournal {
         };
         let buf = vec![0u8; size];
         let buf = self.storage.read_at(offset, buf).await?;
-        let msg = 
Message::try_from(Owned::<4096>::copy_from_slice(&buf)).map_err(
+        let msg = 
Message::try_from(Owned::<MESSAGE_ALIGN>::copy_from_slice(&buf)).map_err(
             |e: iggy_binary_protocol::consensus::ConsensusError| {
                 io::Error::new(io::ErrorKind::InvalidData, e.to_string())
             },
@@ -350,7 +361,7 @@ impl Journal<FileStorage> for PrepareJournal {
         for (header, offset) in &to_drain {
             let buf = vec![0u8; header.size as usize];
             let buf = self.storage.read_at(*offset, buf).await?;
-            let msg = 
Message::try_from(Owned::<4096>::copy_from_slice(&buf)).map_err(
+            let msg = 
Message::try_from(Owned::<MESSAGE_ALIGN>::copy_from_slice(&buf)).map_err(
                 |e: iggy_binary_protocol::consensus::ConsensusError| {
                     io::Error::new(io::ErrorKind::InvalidData, e.to_string())
                 },
@@ -456,7 +467,7 @@ impl Journal<FileStorage> for PrepareJournal {
 
         let buffer = vec![0u8; size];
         let buffer = self.storage.read_at(offset, buffer).await.ok()?;
-        Message::try_from(Owned::<4096>::copy_from_slice(&buffer)).ok()
+        
Message::try_from(Owned::<MESSAGE_ALIGN>::copy_from_slice(&buffer)).ok()
     }
 }
 
@@ -478,7 +489,7 @@ mod tests {
 
     fn make_prepare(op: u64, body_size: usize) -> Message<PrepareHeader> {
         let total_size = HEADER_SIZE + body_size;
-        let mut buffer = Owned::<4096>::zeroed(total_size);
+        let mut buffer = Owned::<MESSAGE_ALIGN>::zeroed(total_size);
 
         let header = bytemuck::checked::from_bytes_mut::<PrepareHeader>(
             &mut buffer.as_mut_slice()[..HEADER_SIZE],
@@ -557,6 +568,38 @@ mod tests {
         }
     }
 
+    #[compio::test]
+    async fn corrupt_command_byte_truncates_on_reopen() {
+        // Bit-flipped `Command2` discriminant: must truncate, not panic.
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+
+        {
+            let journal = PrepareJournal::open(&path, 0).await.unwrap();
+            journal.append(make_prepare(1, 64)).await.unwrap();
+            journal.append(make_prepare(2, 128)).await.unwrap();
+            journal.storage.fsync().await.unwrap();
+        }
+
+        // Entry 2 at offset HEADER_SIZE+64=320; `offset_of!` guards against
+        // future field reorders silently corrupting an unrelated byte.
+        let entry_2_offset = (HEADER_SIZE + 64) as u64;
+        let command_byte_offset =
+            entry_2_offset + std::mem::offset_of!(PrepareHeader, command) as 
u64;
+        {
+            use std::io::{Seek, SeekFrom, Write};
+            let mut file = 
std::fs::OpenOptions::new().write(true).open(&path).unwrap();
+            file.seek(SeekFrom::Start(command_byte_offset)).unwrap();
+            file.write_all(&[99u8]).unwrap(); // out of range for Command2
+            file.sync_all().unwrap();
+        }
+
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
+        assert_eq!(journal.last_op(), Some(1));
+        assert!(journal.header(2).is_none());
+        assert_eq!(journal.storage.file_len(), entry_2_offset);
+    }
+
     #[compio::test]
     async fn truncated_entry_on_reopen() {
         let dir = tempdir().unwrap();
diff --git a/justfile b/justfile
index 6f37a5e81..7b68d0c4d 100644
--- a/justfile
+++ b/justfile
@@ -48,6 +48,18 @@ nextest: build
 nextests TEST: build
   cargo nextest run --nocapture -- {{TEST}}
 
+# Run Miri (UB detector) on the unsafe-heavy crates that don't pull
+# tokio/compio. Mirrors the `miri` task in CI. Pinned to the same nightly
+# as `.github/actions/rust/pre-merge/action.yml` so local runs don't drift
+# from CI on the next nightly bump — keep these two dates in sync.
+# Requires:
+#
+#   rustup toolchain install nightly-2026-04-21 --component miri
+#
+miri:
+  MIRIFLAGS="-Zmiri-tree-borrows -Zmiri-strict-provenance" \
+    cargo +nightly-2026-04-21 miri test -p iggy_binary_protocol -p consensus
+
 server *ARGS:
   cargo run --bin iggy-server {{ARGS}}
 

Reply via email to