This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c586bd72505b94f24ccf6d699b10a47357794712
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Apr 24 14:20:25 2026 +0200

    fix(message_bus): tag-verify before nonce insert + review hardening
    
    verify_envelope inserted the nonce ring slot BEFORE verifying
    the BLAKE3 tag. An off-secret attacker with TCP reachability
    to the listener could flush the ring with forged-tag Pings
    and open a 30 s replay window for a captured legit Ping.
    Reorder to timestamp -> tag -> nonce-insert; add regression
    test. Triple-confirmed by independent multi-expert review.
    
    Same pass landed adjacent hardening: NonceRing per-peer +
    bus-resident with inline [u128; CAP] storage; LABEL_REPLICA /
    LABEL_CLIENT plane separation in AuthChallenge; UnsupportedKind
    -> Unauthenticated for mixed-version triage; max_batch
    boot-asserted in 1..=IOV_MAX/2; auth + TcpTransport* + dedup
    surfaces shrunk to pub(crate); Unpin dropped from transport
    traits; InvalidHex carries byte; Y2554 debug_assert in
    build_ping_message; notify_connection_lost reentrant-safe;
    framing.rs reserve_exact diagnostic log; dedup docstring
    clarified to "at-most-once per-shard-lifetime"; installer.rs
    race comments tightened. 10 new regression tests; 120 pass.
---
 core/message_bus/src/auth.rs                       | 274 +++++++++++++++++----
 core/message_bus/src/auth_config.rs                |  20 +-
 core/message_bus/src/config.rs                     |  11 +
 core/message_bus/src/connector.rs                  |  18 +-
 core/message_bus/src/framing.rs                    |  13 +-
 core/message_bus/src/installer.rs                  |   8 +-
 core/message_bus/src/lib.rs                        | 164 +++++++++++-
 core/message_bus/src/replica_io.rs                 |   2 +
 core/message_bus/src/replica_listener.rs           |  44 +++-
 core/message_bus/src/transports/mod.rs             |  23 +-
 core/message_bus/src/transports/tcp.rs             |  26 +-
 core/message_bus/src/writer_task.rs                |   4 +-
 core/message_bus/tests/backpressure.rs             |   2 +
 core/message_bus/tests/connection_lost_notify.rs   |   2 +
 core/message_bus/tests/directional_connection.rs   |   4 +
 core/message_bus/tests/handshake_auth.rs           |   4 +
 core/message_bus/tests/head_of_line.rs             |   2 +
 core/message_bus/tests/reconnect.rs                |   2 +
 core/message_bus/tests/reconnect_skip_connected.rs |   2 +
 core/message_bus/tests/replica_roundtrip.rs        |   2 +
 core/message_bus/tests/shard_zero_gating.rs        |   2 +
 core/message_bus/tests/vectored_batch.rs           |   2 +
 core/server-ng/src/dedup.rs                        |  52 +++-
 core/server-ng/src/lib.rs                          |   6 +-
 24 files changed, 586 insertions(+), 103 deletions(-)

diff --git a/core/message_bus/src/auth.rs b/core/message_bus/src/auth.rs
index 1205a4578..dd90b18d2 100644
--- a/core/message_bus/src/auth.rs
+++ b/core/message_bus/src/auth.rs
@@ -40,15 +40,39 @@ use thiserror::Error;
 pub const SECRET_SIZE: usize = 32;
 
 /// Byte range of the envelope within `GenericHeader.reserved_command`.
-pub const ENVELOPE_LEN: usize = 57;
+pub(crate) const ENVELOPE_LEN: usize = 57;
 
 /// Acceptance window around `auth_timestamp`, in nanoseconds. 30 s.
-pub const TIMESTAMP_WINDOW_NS: u128 = 30_000_000_000;
+pub(crate) const TIMESTAMP_WINDOW_NS: u128 = 30_000_000_000;
 
-/// Default nonce-dedup LRU capacity on the acceptor.
-pub const NONCE_LRU_CAPACITY: usize = 256;
-
-const LABEL: &[u8; 16] = b"iggy-bus-auth-v1";
+/// Per-peer nonce-dedup ring capacity on the acceptor.
+///
+/// 8 slots × 5-replica clusters × 16 B = 640 B inline; survives the
+/// 30 s acceptance window even at the worst-case dial cadence
+/// (`reconnect_period = 5 s`). Per-peer (rather than a single global
+/// ring) means a flapping or hostile peer cannot evict honest peers'
+/// nonce slots, so the verify-order guard in [`verify_envelope`] holds
+/// up against an attacker who can reach the listener but lacks the
+/// secret. See plan F2.
+#[doc(hidden)]
+pub const PER_PEER_NONCE_CAPACITY: usize = 8;
+
+/// Plane-tagged 16-byte label prefixed onto every challenge before the MAC.
+///
+/// Domain separation: same cluster secret produces distinct MACs across
+/// the replica plane and the (Phase 2) SDK-client plane, so a captured
+/// replica handshake cannot be replayed onto the client plane and vice
+/// versa even when both planes share a [`StaticSharedSecret`]. Bump the
+/// trailing version digits when the challenge layout itself changes;
+/// bump [`AuthKind`] when the MAC algorithm changes.
+pub(crate) const LABEL_REPLICA: &[u8; 16] = b"iggy-bus-rep-v01";
+
+/// Plane label for the SDK-client plane (reserved for Phase 2; not
+/// exercised on the wire today). Defined now so the cross-plane MAC
+/// guard test in this module can exercise both labels and the test
+/// catches a regression before Phase 2 wiring lands.
+#[allow(dead_code)]
+pub(crate) const LABEL_CLIENT: &[u8; 16] = b"iggy-bus-cli-v01";
 
 const KIND_OFFSET: usize = 56;
 const TAG_RANGE: std::ops::Range<usize> = 0..32;
@@ -59,7 +83,7 @@ const RESERVED_RANGE: std::ops::Range<usize> = 
ENVELOPE_LEN..128;
 /// Envelope version tag.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 #[repr(u8)]
-pub enum AuthKind {
+pub(crate) enum AuthKind {
     Blake3V1 = 1,
 }
 
@@ -72,9 +96,13 @@ impl AuthKind {
     }
 }
 
-/// Fully reconstructed challenge input. All fields come from the header
-/// (`GenericHeader.cluster`, `GenericHeader.replica`, `GenericHeader.release`)
-/// plus the envelope timestamp and nonce.
+/// Fully reconstructed challenge input.
+///
+/// All fields come from the header (`GenericHeader.cluster`,
+/// `GenericHeader.replica`, `GenericHeader.release`) plus the envelope
+/// timestamp and nonce. `label` is the plane-tagged domain-separation
+/// prefix; pick [`LABEL_REPLICA`] for the consensus plane and
+/// [`LABEL_CLIENT`] for the SDK-client plane (Phase 2).
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub struct AuthChallenge {
     pub cluster: u128,
@@ -82,15 +110,17 @@ pub struct AuthChallenge {
     pub timestamp_ns: u64,
     pub release: u32,
     pub nonce: u128,
+    pub label: &'static [u8; 16],
 }
 
 impl AuthChallenge {
-    /// Serialize the challenge into a fixed-size buffer. Domain-separated by
-    /// a constant label so the same secret cannot be misused to forge a
-    /// different protocol's MAC.
+    /// Serialize the challenge into a fixed-size buffer. The leading 16 B
+    /// are the plane-tagged label, so the same secret cannot be misused
+    /// to forge a different plane's MAC even with otherwise-identical
+    /// inputs.
     fn encode(&self) -> [u8; 16 + 16 + 16 + 8 + 4 + 16] {
         let mut out = [0u8; 76];
-        out[0..16].copy_from_slice(LABEL);
+        out[0..16].copy_from_slice(self.label);
         out[16..32].copy_from_slice(&self.cluster.to_le_bytes());
         out[32..48].copy_from_slice(&self.peer_id.to_le_bytes());
         out[48..56].copy_from_slice(&self.timestamp_ns.to_le_bytes());
@@ -102,11 +132,15 @@ impl AuthChallenge {
 
 /// MAC over an `AuthChallenge`. Wraps `blake3::Hash` for its
 /// constant-time `PartialEq` implementation.
+///
+/// The inner `blake3::Hash` is `pub(crate)` so downstream callers cannot
+/// reach in for byte-level `==` comparisons that would bypass the
+/// constant-time `PartialEq` on the wrapper.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct AuthTag(pub blake3::Hash);
+pub struct AuthTag(pub(crate) blake3::Hash);
 
 impl AuthTag {
-    const fn as_bytes(&self) -> &[u8; 32] {
+    pub(crate) const fn as_bytes(&self) -> &[u8; 32] {
         self.0.as_bytes()
     }
 }
@@ -130,15 +164,25 @@ pub enum AuthError {
 
 /// Map a credential-layer `AuthError` to the public `IggyError` surface.
 ///
+/// `UnsupportedKind` is routed to `Unauthenticated` rather than
+/// `InvalidCommand` so that pre-auth peers in a mixed-version cluster
+/// (whose all-zero `reserved_command` decodes to `AuthKind::from_byte(0)
+/// -> UnsupportedKind`) surface in the operator's auth-bucket triage,
+/// matching the actual deployment failure mode. `ReservedNonzero` keeps
+/// the `InvalidCommand` mapping because the peer DID stamp a recognized
+/// kind byte and then violated the envelope's reserved-zero contract:
+/// that is a genuine protocol bug, not an auth gap.
+///
 /// Not implemented as `From<AuthError> for IggyError` because the impl
 /// would leak into every crate transitively linking `message_bus`, adding
 /// an extra `impl From<_> for IggyError` candidate to the orphan-rule set
 /// and tripping type inference (`?` ambiguity) in downstream crates.
 #[must_use]
-pub const fn to_iggy_error(e: AuthError) -> IggyError {
+pub(crate) const fn to_iggy_error(e: AuthError) -> IggyError {
     match e {
-        AuthError::UnsupportedKind | AuthError::ReservedNonzero => 
IggyError::InvalidCommand,
-        AuthError::TagMismatch
+        AuthError::ReservedNonzero => IggyError::InvalidCommand,
+        AuthError::UnsupportedKind
+        | AuthError::TagMismatch
         | AuthError::UnknownPeer
         | AuthError::TimestampOutOfWindow
         | AuthError::NonceReplay => IggyError::Unauthenticated,
@@ -193,51 +237,65 @@ impl TokenSource for StaticSharedSecret {
     }
 }
 
-/// Fixed-capacity ring-buffer nonce dedup.
+/// Fixed-capacity ring-buffer nonce dedup. Inline `[u128; CAP]` storage —
+/// no heap allocation, deterministic memory footprint.
 ///
-/// Eviction is O(1) via a round-robin cursor; lookup is O(N) over
-/// `NONCE_LRU_CAPACITY` entries, which is a 256-wide linear scan on the
-/// hot path. Memory footprint is deterministic and bounded regardless of
-/// peer behaviour.
+/// Eviction is O(1) via a round-robin cursor; lookup is O(N) over `CAP`
+/// entries (linear scan, but `CAP * 16 B` fits a single cache line for
+/// `CAP <= 4` and stays L1-resident through `CAP = 256`). Memory bound
+/// is `CAP * 16 B + 2 * sizeof(usize)` per ring; per-peer instances cap
+/// the cluster-wide cost.
+#[doc(hidden)]
 #[derive(Debug)]
-pub struct NonceRing {
-    slots: Vec<u128>,
+pub struct NonceRing<const CAP: usize> {
+    slots: [u128; CAP],
+    len: usize,
     cursor: usize,
-    capacity: usize,
 }
 
-impl NonceRing {
+impl<const CAP: usize> NonceRing<CAP> {
     #[must_use]
-    pub const fn new(capacity: usize) -> Self {
+    #[doc(hidden)]
+    pub const fn new() -> Self {
         Self {
-            slots: Vec::new(),
+            slots: [0u128; CAP],
+            len: 0,
             cursor: 0,
-            capacity,
         }
     }
 
     /// Returns `true` if `nonce` was not present (and has now been
     /// recorded); `false` if it was already seen.
+    ///
+    /// `slots[..self.len]` is the only range scanned for membership, so
+    /// the `[0; CAP]` initial state never collides with a real `0`
+    /// nonce: until the ring fills, untouched slots are out-of-range.
+    #[doc(hidden)]
     pub fn insert(&mut self, nonce: u128) -> bool {
-        if self.slots.contains(&nonce) {
+        if self.slots[..self.len].contains(&nonce) {
             return false;
         }
-        if self.slots.len() < self.capacity {
-            self.slots.push(nonce);
+        if self.len < CAP {
+            self.slots[self.len] = nonce;
+            self.len += 1;
         } else {
             self.slots[self.cursor] = nonce;
-            self.cursor = (self.cursor + 1) % self.capacity;
+            self.cursor = (self.cursor + 1) % CAP;
         }
         true
     }
 }
 
-impl Default for NonceRing {
+impl<const CAP: usize> Default for NonceRing<CAP> {
     fn default() -> Self {
-        Self::new(NONCE_LRU_CAPACITY)
+        Self::new()
     }
 }
 
+/// Per-peer nonce ring with the production [`PER_PEER_NONCE_CAPACITY`].
+#[doc(hidden)]
+pub type ReplicaNonceRing = NonceRing<PER_PEER_NONCE_CAPACITY>;
+
 /// Encode an auth envelope into `reserved_command`.
 ///
 /// Overwrites the first `ENVELOPE_LEN` bytes and zero-fills the trailing
@@ -245,7 +303,7 @@ impl Default for NonceRing {
 /// and `nonce` are caller-controlled so the same routine can be used for
 /// deterministic tests (fake clock, fixed nonce) and production (system
 /// clock, CSPRNG).
-pub fn encode_envelope(
+pub(crate) fn encode_envelope(
     reserved: &mut [u8; 128],
     token_source: &dyn TokenSource,
     challenge: &AuthChallenge,
@@ -262,10 +320,10 @@ pub fn encode_envelope(
 
 /// Parsed view of an envelope's raw bytes. No cryptographic checks.
 #[derive(Debug, Clone, Copy)]
-pub struct DecodedEnvelope {
-    pub tag: AuthTag,
-    pub timestamp_ns: u64,
-    pub nonce: u128,
+pub(crate) struct DecodedEnvelope {
+    pub(crate) tag: AuthTag,
+    pub(crate) timestamp_ns: u64,
+    pub(crate) nonce: u128,
 }
 
 /// Decode an envelope and reject malformed layout.
@@ -278,7 +336,7 @@ pub struct DecodedEnvelope {
 ///
 /// Returns `AuthError::UnsupportedKind` for unknown envelope versions or
 /// `AuthError::ReservedNonzero` for non-zero reserved padding.
-pub fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, 
AuthError> {
+pub(crate) fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, 
AuthError> {
     AuthKind::from_byte(reserved[KIND_OFFSET])?;
     if reserved[RESERVED_RANGE].iter().any(|&b| b != 0) {
         return Err(AuthError::ReservedNonzero);
@@ -296,27 +354,34 @@ pub fn decode_envelope(reserved: &[u8; 128]) -> 
Result<DecodedEnvelope, AuthErro
     })
 }
 
-/// Full credential check: timestamp window -> nonce dedup -> tag.
+/// Full credential check: timestamp window -> tag verify -> nonce dedup.
+///
+/// **Order is load-bearing.** A tag-failed envelope must NOT consume a
+/// nonce-ring slot, otherwise an off-secret attacker with TCP reachability
+/// to the listener could flush the ring with forged-tag Pings (valid
+/// timestamps + random nonces) and open a replay window for any captured
+/// legit Ping inside the remaining timestamp slack.
 ///
 /// # Errors
 ///
 /// Returns the first `AuthError` triggered by the check sequence.
-pub fn verify_envelope(
+pub(crate) fn verify_envelope<const CAP: usize>(
     token_source: &dyn TokenSource,
     challenge: &AuthChallenge,
     decoded: &DecodedEnvelope,
     now_ns: u128,
-    nonces: &mut NonceRing,
+    nonces: &mut NonceRing<CAP>,
 ) -> Result<(), AuthError> {
     let ts_u128 = u128::from(decoded.timestamp_ns);
     let delta = now_ns.abs_diff(ts_u128);
     if delta > TIMESTAMP_WINDOW_NS {
         return Err(AuthError::TimestampOutOfWindow);
     }
+    token_source.verify(challenge, &decoded.tag)?;
     if !nonces.insert(decoded.nonce) {
         return Err(AuthError::NonceReplay);
     }
-    token_source.verify(challenge, &decoded.tag)
+    Ok(())
 }
 
 #[cfg(test)]
@@ -330,6 +395,7 @@ mod tests {
             timestamp_ns: 1_700_000_000_000_000_000,
             release: 1,
             nonce: 0x11_2233_4455_6677_8899_aabb_ccdd_eeff,
+            label: LABEL_REPLICA,
         }
     }
 
@@ -341,7 +407,7 @@ mod tests {
         encode_envelope(&mut reserved, &source, &challenge);
 
         let decoded = decode_envelope(&reserved).expect("decode");
-        let mut ring = NonceRing::new(32);
+        let mut ring = NonceRing::<32>::new();
         verify_envelope(
             &source,
             &challenge,
@@ -361,7 +427,7 @@ mod tests {
         encode_envelope(&mut reserved, &signer, &challenge);
 
         let decoded = decode_envelope(&reserved).unwrap();
-        let mut ring = NonceRing::new(32);
+        let mut ring = NonceRing::<32>::new();
         let err = verify_envelope(
             &verifier,
             &challenge,
@@ -381,7 +447,7 @@ mod tests {
         encode_envelope(&mut reserved, &source, &challenge);
 
         let decoded = decode_envelope(&reserved).unwrap();
-        let mut ring = NonceRing::new(32);
+        let mut ring = NonceRing::<32>::new();
         let now = u128::from(challenge.timestamp_ns);
         verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap();
         let err = verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap_err();
@@ -396,7 +462,7 @@ mod tests {
         encode_envelope(&mut reserved, &source, &challenge);
 
         let decoded = decode_envelope(&reserved).unwrap();
-        let mut ring = NonceRing::new(32);
+        let mut ring = NonceRing::<32>::new();
         let now = u128::from(challenge.timestamp_ns) + TIMESTAMP_WINDOW_NS + 1;
         let err = verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap_err();
         assert_eq!(err, AuthError::TimestampOutOfWindow);
@@ -410,7 +476,7 @@ mod tests {
         encode_envelope(&mut reserved, &source, &challenge);
 
         let decoded = decode_envelope(&reserved).unwrap();
-        let mut ring = NonceRing::new(32);
+        let mut ring = NonceRing::<32>::new();
         let now = 
u128::from(challenge.timestamp_ns).saturating_sub(TIMESTAMP_WINDOW_NS + 1);
         let err = verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap_err();
         assert_eq!(err, AuthError::TimestampOutOfWindow);
@@ -438,9 +504,93 @@ mod tests {
         assert_eq!(err, AuthError::ReservedNonzero);
     }
 
+    #[test]
+    fn forged_tag_does_not_consume_ring_slot() {
+        // Sender signs with secret A; verifier holds secret B. Verifier's
+        // tag check fails. The ring MUST NOT have recorded the nonce -
+        // otherwise an attacker without the secret could flood forged
+        // Pings to evict honest entries (auth.rs verify_envelope ordering
+        // bug; review F1 critical).
+        let signer = StaticSharedSecret::new([1u8; SECRET_SIZE]);
+        let verifier = StaticSharedSecret::new([2u8; SECRET_SIZE]);
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &signer, &challenge);
+
+        let decoded = decode_envelope(&reserved).unwrap();
+        let mut ring = NonceRing::<4>::new();
+        let err = verify_envelope(
+            &verifier,
+            &challenge,
+            &decoded,
+            u128::from(challenge.timestamp_ns),
+            &mut ring,
+        )
+        .unwrap_err();
+        assert_eq!(err, AuthError::TagMismatch);
+
+        // Same nonce now signed correctly with secret B must succeed -
+        // proves the failed-tag attempt did NOT burn the ring slot.
+        let mut reserved_legit = [0u8; 128];
+        encode_envelope(&mut reserved_legit, &verifier, &challenge);
+        let decoded_legit = decode_envelope(&reserved_legit).unwrap();
+        verify_envelope(
+            &verifier,
+            &challenge,
+            &decoded_legit,
+            u128::from(challenge.timestamp_ns),
+            &mut ring,
+        )
+        .expect("legit verify must succeed; nonce slot must have been free");
+    }
+
+    #[test]
+    fn cross_plane_mac_does_not_verify() {
+        // Same secret, same numeric inputs - only label differs. Replica-
+        // plane signer must NOT pass client-plane verification (and vice
+        // versa). Otherwise a captured replica handshake could be replayed
+        // onto the client plane once Phase 2 ships. Review F4.
+        let secret = StaticSharedSecret::new([3u8; SECRET_SIZE]);
+        let replica_challenge = sample_challenge();
+        let mut client_challenge = replica_challenge;
+        client_challenge.label = LABEL_CLIENT;
+
+        let mut replica_envelope = [0u8; 128];
+        encode_envelope(&mut replica_envelope, &secret, &replica_challenge);
+        let decoded = decode_envelope(&replica_envelope).unwrap();
+        let mut ring = NonceRing::<4>::new();
+        let err = verify_envelope(
+            &secret,
+            &client_challenge,
+            &decoded,
+            u128::from(replica_challenge.timestamp_ns),
+            &mut ring,
+        )
+        .unwrap_err();
+        assert_eq!(err, AuthError::TagMismatch);
+    }
+
+    #[test]
+    fn unsupported_kind_maps_to_unauthenticated() {
+        // Mixed-version cluster: pre-auth peer sends zeroed
+        // reserved_command -> AuthKind::from_byte(0) -> UnsupportedKind.
+        // Operator log triage must hit the auth bucket, not the
+        // protocol-violation bucket. Review F3.
+        assert_eq!(
+            to_iggy_error(AuthError::UnsupportedKind),
+            IggyError::Unauthenticated
+        );
+        // ReservedNonzero stays InvalidCommand: peer DID stamp a known
+        // kind, then violated the envelope's reserved-zero contract.
+        assert_eq!(
+            to_iggy_error(AuthError::ReservedNonzero),
+            IggyError::InvalidCommand
+        );
+    }
+
     #[test]
     fn nonce_ring_fills_and_evicts_fifo() {
-        let mut ring = NonceRing::new(3);
+        let mut ring = NonceRing::<3>::new();
         assert!(ring.insert(1));
         assert!(ring.insert(2));
         assert!(ring.insert(3));
@@ -449,4 +599,22 @@ mod tests {
         // Nonce 1 evicted; reinserting it should now succeed again.
         assert!(ring.insert(1));
     }
+
+    #[test]
+    fn nonce_ring_zero_initial_does_not_false_positive() {
+        // [0; CAP] initial state must NOT match a real `0` nonce until
+        // the slot has been written. Edge case for the const-generic
+        // refactor (review F2).
+        let mut ring = NonceRing::<4>::new();
+        assert!(ring.insert(0));
+        assert!(!ring.insert(0));
+        assert!(ring.insert(1));
+        assert!(ring.insert(2));
+    }
+
+    #[test]
+    fn nonce_ring_default_is_empty() {
+        let mut ring = NonceRing::<4>::default();
+        assert!(ring.insert(42));
+    }
 }
diff --git a/core/message_bus/src/auth_config.rs 
b/core/message_bus/src/auth_config.rs
index af715c6d3..dfba8bb89 100644
--- a/core/message_bus/src/auth_config.rs
+++ b/core/message_bus/src/auth_config.rs
@@ -66,8 +66,8 @@ pub enum SecretLoadError {
     BothSet,
     #[error("secret has {got} hex characters; expected {HEX_SECRET_LEN}")]
     WrongLength { got: usize },
-    #[error("secret contains non-hex character at byte offset {0}")]
-    InvalidHex(usize),
+    #[error("secret contains non-hex character {byte:#04x} at byte offset 
{offset}")]
+    InvalidHex { offset: usize, byte: u8 },
     #[error("failed to read secret file {path}")]
     FileRead {
         path: String,
@@ -135,8 +135,14 @@ fn decode_hex_secret(input: &str) -> Result<[u8; 
SECRET_SIZE], SecretLoadError>
     let bytes = input.as_bytes();
     let mut out = [0u8; SECRET_SIZE];
     for (i, chunk) in bytes.chunks_exact(2).enumerate() {
-        let hi = 
decode_hex_nibble(chunk[0]).ok_or(SecretLoadError::InvalidHex(i * 2))?;
-        let lo = 
decode_hex_nibble(chunk[1]).ok_or(SecretLoadError::InvalidHex(i * 2 + 1))?;
+        let hi = decode_hex_nibble(chunk[0]).ok_or(SecretLoadError::InvalidHex 
{
+            offset: i * 2,
+            byte: chunk[0],
+        })?;
+        let lo = decode_hex_nibble(chunk[1]).ok_or(SecretLoadError::InvalidHex 
{
+            offset: i * 2 + 1,
+            byte: chunk[1],
+        })?;
         out[i] = (hi << 4) | lo;
     }
     Ok(out)
@@ -211,7 +217,10 @@ mod tests {
         bad.replace_range(4..5, "Z");
         let err = resolve(Some(&bad), None).unwrap_err();
         match err {
-            SecretLoadError::InvalidHex(offset) => assert_eq!(offset, 4),
+            SecretLoadError::InvalidHex { offset, byte } => {
+                assert_eq!(offset, 4);
+                assert_eq!(byte, b'Z');
+            }
             other => panic!("unexpected error: {other:?}"),
         }
     }
@@ -260,6 +269,7 @@ mod tests {
             timestamp_ns: 3,
             release: 4,
             nonce: 5,
+            label: crate::auth::LABEL_REPLICA,
         };
         let from_loader = source.sign(&challenge);
         let from_reference = reference.sign(&challenge);
diff --git a/core/message_bus/src/config.rs b/core/message_bus/src/config.rs
index f82114461..ea50a817d 100644
--- a/core/message_bus/src/config.rs
+++ b/core/message_bus/src/config.rs
@@ -35,6 +35,17 @@
 
 use std::time::Duration;
 
+/// Hard upper bound on `max_batch`, in iovecs.
+///
+/// Linux's `IOV_MAX` is 1024 (`/usr/include/bits/uio_lim.h`). Future WS
+/// transports emit one iovec for the header and one for the body, so a
+/// batch of N messages costs `2 * N` iovecs; cap `max_batch` at
+/// `IOV_MAX / 2 = 512` to keep that worst case below the syscall limit.
+/// Bus construction asserts this in [`crate::IggyMessageBus::with_config`];
+/// breaching it at boot panics rather than silently delivering writev
+/// `EMSGSIZE` errors on every batch.
+pub const IOV_MAX_LIMIT: usize = 512;
+
 /// Aggregated runtime configuration for an `IggyMessageBus` instance.
 ///
 /// All fields map onto constants that previously lived inline across
diff --git a/core/message_bus/src/connector.rs 
b/core/message_bus/src/connector.rs
index 01d18a255..a3b5a1c54 100644
--- a/core/message_bus/src/connector.rs
+++ b/core/message_bus/src/connector.rs
@@ -25,7 +25,7 @@
 //! (see `shard::coordinator::ShardZeroCoordinator`).
 
 use crate::IggyMessageBus;
-use crate::auth::{self, TokenSource};
+use crate::auth::{self, LABEL_REPLICA, TokenSource};
 use crate::framing;
 use crate::lifecycle::ShutdownToken;
 use crate::socket_opts::apply_keepalive_for_connection;
@@ -102,9 +102,11 @@ async fn connect_all(
         // connection lives on shard 0; `owning_shard` covers multi-shard
         // deployments where the fd was delegated to a peer shard but the
         // mapping broadcast reached shard 0. Either hit means a previous
-        // sweep (or the inbound listener) already installed this peer and
-        // we must not dial again - redialing would tear down the live
-        // socket via the `AlreadyRegistered` race and flap the mapping.
+        // sweep (or the inbound listener) already installed this peer.
+        // Redialing wastes a TCP round-trip; the live entry stays intact
+        // (the loser of the registry insert race in `install_replica_conn`
+        // gets `RejectedRegistration` back and tears DOWN ITS OWN orphan
+        // tasks via `drain_rejected_registration`, never the winner's).
         if bus.replicas().contains(peer_id) || 
bus.owning_shard(peer_id).is_some() {
             debug!(
                 replica = peer_id,
@@ -197,8 +199,13 @@ fn build_ping_message(
     replica_id: u8,
     token_source: &dyn TokenSource,
 ) -> Message<GenericHeader> {
+    let now_ns = IggyTimestamp::now().as_nanos();
+    debug_assert!(
+        now_ns <= u128::from(u64::MAX),
+        "auth_timestamp overflows u64 ns (year ~2554) - envelope 
TIMESTAMP_RANGE is 8 bytes by design"
+    );
     #[allow(clippy::cast_possible_truncation)]
-    let timestamp_ns = IggyTimestamp::now().as_nanos() as u64;
+    let timestamp_ns = now_ns as u64;
     let nonce = {
         let mut buf = [0u8; 16];
         rand::rng().fill_bytes(&mut buf);
@@ -218,6 +225,7 @@ fn build_ping_message(
                 timestamp_ns,
                 release: h.release,
                 nonce,
+                label: LABEL_REPLICA,
             };
             auth::encode_envelope(&mut h.reserved_command, token_source, 
&challenge);
         },
diff --git a/core/message_bus/src/framing.rs b/core/message_bus/src/framing.rs
index 391a86aa2..0f3af8576 100644
--- a/core/message_bus/src/framing.rs
+++ b/core/message_bus/src/framing.rs
@@ -28,6 +28,7 @@ use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
 use iggy_binary_protocol::consensus::iobuf::Owned;
 use iggy_binary_protocol::{GenericHeader, HEADER_SIZE, Message};
 use iggy_common::IggyError;
+use tracing::error;
 
 /// Default hard ceiling on a single wire frame. Frames above this are
 /// almost certainly a protocol violation or a malicious peer; we drop
@@ -126,7 +127,17 @@ pub async fn read_message<S: AsyncReadExt + Unpin>(
     // (e.g. capacity overflow, unsupported buffer kind) and silently
     // ignoring it would leave `owned` at header-only capacity, causing the
     // subsequent `read_exact` to read into an ungrown buffer.
-    IoBufMut::reserve_exact(&mut owned, body_size).map_err(|_| 
IggyError::TcpError)?;
+    //
+    // No `IggyError::OutOfMemory` variant exists; `TcpError` is the closest
+    // bucket. The `error!` log gives operators the actionable diagnostic
+    // (size, OOM-vs-overflow signal) the error code itself loses.
+    IoBufMut::reserve_exact(&mut owned, body_size).map_err(|_| {
+        error!(
+            body_size,
+            "framing reserve_exact failed (OOM or capacity overflow); dropping 
connection"
+        );
+        IggyError::TcpError
+    })?;
     let BufResult(result, slice) = stream
         .read_exact(owned.slice(HEADER_SIZE..total_size))
         .await;
diff --git a/core/message_bus/src/installer.rs 
b/core/message_bus/src/installer.rs
index 21d381296..4d274860c 100644
--- a/core/message_bus/src/installer.rs
+++ b/core/message_bus/src/installer.rs
@@ -157,9 +157,11 @@ pub fn install_replica_conn<C: TransportConn>(
     let notified = Rc::new(Cell::new(false));
     // If the registry insert below races with a concurrent install for
     // the same peer id and loses, both spawned halves must skip their
-    // post-loop cleanup: calling `replicas().remove` / `close_peer` or
-    // `notify_connection_lost` would evict the winner's entry and
-    // clobber its mapping broadcast. `compio::runtime::JoinHandle::drop`
+    // post-loop cleanup: the loser's `replicas().remove` /
+    // `close_peer_if_token_matches` calls would no-op against the winner's
+    // generation token (so they can't evict the live entry), but
+    // `notify_connection_lost` has no token guard and would still broadcast
+    // a spurious mapping-clear round. `compio::runtime::JoinHandle::drop`
     // does not cancel the spawned task, so we have to tell the tasks to
     // stand down in-band.
     let install_aborted = Rc::new(Cell::new(false));
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 96d87d961..811102e51 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -40,6 +40,13 @@
 //! timestamp, 16 B nonce, 1 B kind — so the 256 B header layout is
 //! unchanged.
 //!
+//! Each plane prepends a distinct 16-byte LABEL into the MAC input
+//! (`auth::LABEL_REPLICA` for the consensus plane;
+//! `auth::LABEL_CLIENT` reserved for Phase 2). The label is part of
+//! the cryptographic challenge, so a captured handshake on one plane
+//! cannot be replayed onto the other even if both share one
+//! `StaticSharedSecret`.
+//!
 //! # Invariants worth naming
 //!
 //! - [`send_to_client`](IggyMessageBus::send_to_client) and
@@ -87,7 +94,7 @@ pub(crate) mod socket_opts;
 pub mod transports;
 pub mod writer_task;
 
-pub use config::MessageBusConfig;
+pub use config::{IOV_MAX_LIMIT, MessageBusConfig};
 pub use error::SendError;
 pub use installer::ConnectionInstaller;
 pub use lifecycle::{
@@ -95,14 +102,31 @@ pub use lifecycle::{
     ReplicaRegistry, Shutdown, ShutdownToken,
 };
 
+use auth::ReplicaNonceRing;
 use compio::runtime::JoinHandle;
 use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
 use iggy_binary_protocol::consensus::iobuf::Frozen;
 use iggy_binary_protocol::{GenericHeader, Message};
 use std::cell::RefCell;
 use std::collections::HashMap;
+use std::rc::Rc;
 use std::time::Duration;
 
+/// Per-replica nonce dedup store, owned by [`IggyMessageBus`] and
+/// shared with the listener task via [`Rc`]. Keyed by `replica_id` so
+/// each peer's ring is isolated; a flapping or hostile peer cannot
+/// evict honest peers' nonce slots. Bus-resident (rather than
+/// task-local on the listener) so the nonce window survives a listener
+/// task respawn within one process. Process restart still drops the
+/// store; cross-process persistence is out of scope.
+///
+/// Type is `pub` (rather than `pub(crate)`) only because it appears in
+/// the public signature of [`replica_listener::run`]; downstream code
+/// should obtain a clone via [`IggyMessageBus::replica_nonces`] rather
+/// than constructing one directly.
+#[doc(hidden)]
+pub type ReplicaNonceStore = RefCell<HashMap<u8, ReplicaNonceRing>>;
+
 /// Callback for forwarding a consensus message to a remote shard.
 ///
 /// Provided by the shard layer at bus construction and installed via
@@ -194,7 +218,10 @@ pub trait MessageBus {
 /// - a [`ConnectionRegistry<u128>`] for clients (`u128` id is shard-packed),
 /// - a [`ReplicaRegistry`] for replicas (`u8` id from the Ping handshake,
 ///   backed by a fixed-size array to avoid hash lookup on every send),
-/// - the `JoinHandle`s of background tasks (accept loops, reconnect periodic).
+/// - the `JoinHandle`s of background tasks (accept loops, reconnect periodic),
+/// - a per-peer [`ReplicaNonceStore`] used by the replica-plane handshake
+///   to dedup nonces inside the 30 s timestamp window (see
+///   [`Self::replica_nonces`]).
 ///
 /// Send semantics:
 /// - `send_to_*` clones the per-peer `Sender` out of the registry, calls
@@ -229,6 +256,10 @@ pub struct IggyMessageBus {
     /// when they exit abnormally. `None` when running without a shard-0
     /// coordinator (single-shard deployments and tests).
     connection_lost_fn: RefCell<Option<ConnectionLostFn>>,
+    /// Per-peer replica handshake nonce dedup store. Held in an `Rc` so
+    /// the listener task can take a clone across the spawn boundary
+    /// without stealing the bus's only handle. See [`ReplicaNonceStore`].
+    replica_nonces: Rc<ReplicaNonceStore>,
 }
 
 impl IggyMessageBus {
@@ -253,8 +284,20 @@ impl IggyMessageBus {
     }
 
     /// Construct a bus with the given [`MessageBusConfig`].
+    ///
+    /// # Panics
+    ///
+    /// Panics if `config.max_batch == 0` or
+    /// `config.max_batch > IOV_MAX_LIMIT`. Boot-time validation; surfaces
+    /// operator misconfiguration loudly rather than letting every writev
+    /// fail silently with `EMSGSIZE` once traffic starts.
     #[must_use]
     pub fn with_config(shard_id: u16, config: MessageBusConfig) -> Self {
+        assert!(
+            config.max_batch > 0 && config.max_batch <= IOV_MAX_LIMIT,
+            "MessageBusConfig::max_batch must be in 1..={IOV_MAX_LIMIT} 
(writev IOV_MAX/2 cap); got {}",
+            config.max_batch,
+        );
         let (shutdown, token) = Shutdown::new();
         Self {
             shard_id,
@@ -268,9 +311,24 @@ impl IggyMessageBus {
             replica_forward_fn: RefCell::new(None),
             client_forward_fn: RefCell::new(None),
             connection_lost_fn: RefCell::new(None),
+            replica_nonces: Rc::new(RefCell::new(HashMap::new())),
         }
     }
 
+    /// Cheap clone of the bus-resident replica-plane nonce store.
+    ///
+    /// The listener task takes one clone at boot and reaches into it
+    /// from `handshake_verify`; the `Rc` keeps the store alive across
+    /// the listener task's lifetime and survives a respawn within one
+    /// process. Tests that drive `replica_listener::run` directly use
+    /// this accessor to obtain the same store the production bootstrap
+    /// would.
+    #[doc(hidden)]
+    #[must_use]
+    pub fn replica_nonces(&self) -> Rc<ReplicaNonceStore> {
+        Rc::clone(&self.replica_nonces)
+    }
+
     /// Install the notifier used by delegated replica connections to tell
     /// shard 0 that a connection died. Single place to inject in tests too.
     pub fn set_connection_lost_fn(&self, f: ConnectionLostFn) {
@@ -278,8 +336,18 @@ impl IggyMessageBus {
     }
 
     /// Invoke the registered connection-lost notifier, if any.
+    ///
+    /// Clones the `Rc` out of the `RefCell` borrow before invoking the
+    /// closure so the closure body is free to call
+    /// [`Self::set_connection_lost_fn`] (which takes a `borrow_mut`)
+    /// without tripping the runtime borrow check.
     pub(crate) fn notify_connection_lost(&self, replica_id: u8) {
-        if let Some(f) = self.connection_lost_fn.borrow().as_ref() {
+        let cb = self
+            .connection_lost_fn
+            .borrow()
+            .as_ref()
+            .map(std::rc::Rc::clone);
+        if let Some(f) = cb {
             f(replica_id);
         }
     }
@@ -688,6 +756,96 @@ mod tests {
         assert_eq!(bus.background_tasks.borrow().len(), 2);
     }
 
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn replica_nonce_store_isolates_peers() {
+        // Per-peer rings: two peers replaying the same nonce must both
+        // be admitted (different scopes), and a same-peer replay must
+        // be rejected. Review F2.
+        let bus = IggyMessageBus::new(0);
+        let nonces = bus.replica_nonces();
+        let mut store = nonces.borrow_mut();
+        assert!(store.entry(1).or_default().insert(0xdead_beef));
+        assert!(
+            store.entry(2).or_default().insert(0xdead_beef),
+            "different peer with same nonce must not be a replay"
+        );
+        assert!(
+            !store.entry(1).or_default().insert(0xdead_beef),
+            "same peer + same nonce is a replay"
+        );
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn replica_nonce_store_survives_handle_drop() {
+        // Listener task drop must NOT clear the nonce window; the store
+        // is `Rc`-shared and the bus retains a clone. Proves the
+        // restart-survival property the per-task `RefCell<NonceRing>`
+        // could not deliver. Review F2.
+        let bus = IggyMessageBus::new(0);
+        let nonces1 = bus.replica_nonces();
+        {
+            let mut store = nonces1.borrow_mut();
+            assert!(store.entry(1).or_default().insert(0xfeed_face));
+        }
+        drop(nonces1); // simulates listener task exit
+
+        let nonces2 = bus.replica_nonces();
+        let mut store = nonces2.borrow_mut();
+        assert!(
+            !store.entry(1).or_default().insert(0xfeed_face),
+            "post-restart replay must be rejected"
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "MessageBusConfig::max_batch must be in")]
+    fn max_batch_oversize_rejected() {
+        let cfg = MessageBusConfig {
+            max_batch: 4096,
+            ..MessageBusConfig::default()
+        };
+        let _ = IggyMessageBus::with_config(0, cfg);
+    }
+
+    #[test]
+    #[should_panic(expected = "MessageBusConfig::max_batch must be in")]
+    fn max_batch_zero_rejected() {
+        let cfg = MessageBusConfig {
+            max_batch: 0,
+            ..MessageBusConfig::default()
+        };
+        let _ = IggyMessageBus::with_config(0, cfg);
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn notify_connection_lost_handles_reentrant_install() {
+        // Closure swaps itself out via `set_connection_lost_fn`, which
+        // calls `borrow_mut`. The pre-fix code held a `Ref` across the
+        // closure invocation and panicked here.
+        let bus = std::rc::Rc::new(IggyMessageBus::new(0));
+        let bus_for_closure = bus.clone();
+        let counter: std::rc::Rc<std::cell::Cell<u8>> = 
std::rc::Rc::new(std::cell::Cell::new(0));
+        let counter_inner = counter.clone();
+        let cb: ConnectionLostFn = std::rc::Rc::new(move |_replica: u8| {
+            counter_inner.set(counter_inner.get() + 1);
+            // Reentrant install: replace the closure mid-invoke.
+            let counter_replacement = counter_inner.clone();
+            bus_for_closure.set_connection_lost_fn(std::rc::Rc::new(move |_| {
+                counter_replacement.set(counter_replacement.get() + 10);
+            }));
+        });
+        bus.set_connection_lost_fn(cb);
+
+        bus.notify_connection_lost(1); // first closure runs (+1) and swaps
+        assert_eq!(counter.get(), 1);
+
+        bus.notify_connection_lost(1); // second closure runs (+10)
+        assert_eq!(counter.get(), 11);
+    }
+
     #[compio::test]
     #[allow(clippy::future_not_send)]
     async fn shutdown_loop_drains_tasks_added_during_shutdown() {
diff --git a/core/message_bus/src/replica_io.rs 
b/core/message_bus/src/replica_io.rs
index 3731cc6d8..cc2eca8c1 100644
--- a/core/message_bus/src/replica_io.rs
+++ b/core/message_bus/src/replica_io.rs
@@ -85,6 +85,7 @@ pub async fn start_on_shard_zero(
     let on_accepted_replica_for_listener = on_accepted_replica.clone();
     let listener_max_message_size = bus.config().max_message_size;
     let token_source_for_listener = Rc::clone(&token_source);
+    let nonces_for_listener = bus.replica_nonces();
     let replica_handle = compio::runtime::spawn(async move {
         run_replica_listener(
             replica_listener,
@@ -95,6 +96,7 @@ pub async fn start_on_shard_zero(
             on_accepted_replica_for_listener,
             listener_max_message_size,
             token_source_for_listener,
+            nonces_for_listener,
         )
         .await;
     });
diff --git a/core/message_bus/src/replica_listener.rs 
b/core/message_bus/src/replica_listener.rs
index b056f30ea..6a0789369 100644
--- a/core/message_bus/src/replica_listener.rs
+++ b/core/message_bus/src/replica_listener.rs
@@ -50,16 +50,35 @@
 //! NOT provided here: operators still need to deploy the replica port on a
 //! trusted network boundary (cluster-local VPC, private subnet, overlay)
 //! if wire confidentiality is required.
+//!
+//! # Trust model
+//!
+//! The cluster ships with one shared 32 B secret across all replicas
+//! (see [`crate::auth_config`]). Any holder of the secret can mint a
+//! valid MAC for ANY `peer_id` in either direction. The dialer-side
+//! directional rule (`peer_id > self_id` only dials) plus the
+//! acceptor-side check (`parsed.replica < self_id`) blocks UPWARD
+//! impersonation - a low-id peer cannot pretend to be a high-id peer
+//! during dial - but cross-impersonation in the OTHER direction is open
+//! by design under a single shared secret. A compromised replica B can
+//! authenticate as any replica A with `A < B`. Mitigation comes from
+//! the trusted-LAN deployment assumption + the fact that the
+//! authenticated identity is only used for VSR consensus framing on the
+//! same plane.
+//!
+//! Per-peer secrets (each replica gets its own key, the acceptor looks
+//! up the right one for the announced `peer_id`) is the documented
+//! Phase 2+ hardening. Until then, the cluster operator must treat
+//! "secret compromise" as equivalent to "any-replica compromise."
 
-use crate::auth::{self, NonceRing, TokenSource};
+use crate::auth::{self, LABEL_REPLICA, TokenSource};
 use crate::framing;
 use crate::lifecycle::ShutdownToken;
-use crate::{AcceptedReplicaFn, GenericHeader, Message};
+use crate::{AcceptedReplicaFn, GenericHeader, Message, ReplicaNonceStore};
 use compio::net::{SocketOpts, TcpListener, TcpStream};
 use futures::FutureExt;
 use iggy_binary_protocol::Command2;
 use iggy_common::{IggyError, IggyTimestamp};
-use std::cell::RefCell;
 use std::net::SocketAddr;
 use std::rc::Rc;
 use tracing::{debug, error, info, warn};
@@ -94,6 +113,12 @@ pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, 
SocketAddr), IggyErr
 /// Run the inbound replica listener accept loop until the shutdown token
 /// fires. Every successful handshake fires the `on_accepted` callback; the
 /// callback owns the accepted stream from that point on.
+///
+/// `nonces` is the bus-resident per-peer nonce dedup store. The listener
+/// reaches into it via `borrow_mut().entry(peer).or_default()` for each
+/// handshake; the borrow is held only across the synchronous
+/// [`handshake_verify`], never across an `await`, so the `RefCell` is
+/// safe under the single-threaded compio runtime.
 #[allow(clippy::future_not_send)]
 #[allow(clippy::too_many_arguments)]
 pub async fn run(
@@ -105,12 +130,12 @@ pub async fn run(
     on_accepted: AcceptedReplicaFn,
     max_message_size: usize,
     token_source: Rc<dyn TokenSource>,
+    nonces: Rc<ReplicaNonceStore>,
 ) {
     info!(
         "Replica listener accepting on {:?}",
         listener.local_addr().ok()
     );
-    let nonces = RefCell::new(NonceRing::default());
     loop {
         futures::select! {
             () = token.wait().fuse() => {
@@ -124,11 +149,12 @@ pub async fn run(
                         let outcome = match read {
                             Ok(parsed) => {
                                 let now_ns = IggyTimestamp::now().as_nanos();
-                                let mut ring = nonces.borrow_mut();
+                                let mut store = nonces.borrow_mut();
+                                let ring = 
store.entry(parsed.replica).or_default();
                                 handshake_verify(
                                     &parsed,
                                     token_source.as_ref(),
-                                    &mut ring,
+                                    ring,
                                     now_ns,
                                     self_id,
                                     replica_count,
@@ -186,6 +212,7 @@ async fn handshake_read(
         timestamp_ns: decoded.timestamp_ns,
         release: header.release,
         nonce: decoded.nonce,
+        label: LABEL_REPLICA,
     };
     Ok(ParsedHandshake {
         replica: header.replica,
@@ -195,11 +222,12 @@ async fn handshake_read(
 }
 
 /// Synchronous verifier: tag + replay, then the directional tiebreak.
-/// Runs outside any await so the `&mut NonceRing` borrow cannot span I/O.
+/// Runs outside any await so the `&mut ReplicaNonceRing` borrow cannot
+/// span I/O.
 fn handshake_verify(
     parsed: &ParsedHandshake,
     token_source: &dyn TokenSource,
-    nonces: &mut NonceRing,
+    nonces: &mut auth::ReplicaNonceRing,
     now_ns: u128,
     self_id: u8,
     replica_count: u8,
diff --git a/core/message_bus/src/transports/mod.rs 
b/core/message_bus/src/transports/mod.rs
index c98faf467..157a5780a 100644
--- a/core/message_bus/src/transports/mod.rs
+++ b/core/message_bus/src/transports/mod.rs
@@ -67,7 +67,14 @@
 
 pub mod tcp;
 
-pub use tcp::{TcpTransportConn, TcpTransportListener, TcpTransportReader, 
TcpTransportWriter};
+// Only `Conn` and `Writer` have crate-internal callers today
+// (`installer.rs` wraps the dialed/accepted stream in a `TcpTransportConn`;
+// `writer_task.rs` wraps the owned write half in a `TcpTransportWriter`).
+// `TcpTransportListener` and `TcpTransportReader` stay reachable via the
+// `tcp` submodule and exist primarily so the trait surface compiles
+// end-to-end against a real impl; future WSS / QUIC listeners drop in
+// alongside without touching call sites.
+pub(crate) use tcp::{TcpTransportConn, TcpTransportWriter};
 
 use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
 use iggy_binary_protocol::consensus::iobuf::Frozen;
@@ -118,8 +125,14 @@ pub trait TransportConn: 'static {
 }
 
 /// Read half. Produces one framed [`Message<GenericHeader>`] per call.
+///
+/// `'static` so the half can be moved into a `compio::runtime::spawn`'d
+/// task. `Unpin` is NOT required at the trait level: TCP impls satisfy it
+/// trivially via `OwnedReadHalf<TcpStream>`, and a future
+/// `compio-quic::RecvStream` impl that is not `Unpin` can pin internally
+/// without forcing every other impl to `Box::pin`.
 #[allow(async_fn_in_trait)]
-pub trait TransportReader: Unpin + 'static {
+pub trait TransportReader: 'static {
     /// Read the next `GenericHeader`-framed message off the wire.
     ///
     /// Validates the 256 B header and bounds the total frame size to
@@ -141,8 +154,12 @@ pub trait TransportReader: Unpin + 'static {
 }
 
 /// Write half. Atomic-or-error batched writer.
+///
+/// `'static` so the half can be moved into a `compio::runtime::spawn`'d
+/// task. `Unpin` is NOT required at the trait level (see
+/// [`TransportReader`] for the rationale).
 #[allow(async_fn_in_trait)]
-pub trait TransportWriter: Unpin + 'static {
+pub trait TransportWriter: 'static {
     /// Send every buffer in `batch` atomically.
     ///
     /// On success the Vec is drained (empty on return); the same
diff --git a/core/message_bus/src/transports/tcp.rs 
b/core/message_bus/src/transports/tcp.rs
index 7478c99b4..79694b06d 100644
--- a/core/message_bus/src/transports/tcp.rs
+++ b/core/message_bus/src/transports/tcp.rs
@@ -49,13 +49,22 @@ use std::net::SocketAddr;
 /// `keepalive`) via `compio::net::SocketOpts`. See
 /// [`crate::replica_listener::bind`] and
 /// [`crate::client_listener::bind`] for the canonical construction.
-pub struct TcpTransportListener {
+///
+/// Currently only used by trait-conformance tests; production `accept`
+/// loops in [`crate::replica_listener`] / [`crate::client_listener`]
+/// drive a raw `TcpListener` directly. Kept `pub(crate)` so the
+/// `TransportListener` impl below is actually exercisable; the
+/// `dead_code` allow is intentional and will fall away when the listener
+/// loops migrate behind the trait (Phase 2+).
+#[allow(dead_code)]
+pub(crate) struct TcpTransportListener {
     inner: TcpListener,
 }
 
 impl TcpTransportListener {
     #[must_use]
-    pub const fn new(inner: TcpListener) -> Self {
+    #[allow(dead_code)]
+    pub(crate) const fn new(inner: TcpListener) -> Self {
         Self { inner }
     }
 }
@@ -76,13 +85,13 @@ impl TransportListener for TcpTransportListener {
 /// result of a `TcpStream::connect` on the dialer path. Takes ownership
 /// of the stream; [`Self::into_split`] transfers that ownership into
 /// the read and write halves bound to the per-connection tasks.
-pub struct TcpTransportConn {
+pub(crate) struct TcpTransportConn {
     stream: TcpStream,
 }
 
 impl TcpTransportConn {
     #[must_use]
-    pub const fn new(stream: TcpStream) -> Self {
+    pub(crate) const fn new(stream: TcpStream) -> Self {
         Self { stream }
     }
 }
@@ -106,13 +115,14 @@ impl TransportConn for TcpTransportConn {
 /// [`framing::read_message`]; the two paths share the same header
 /// decode, bounds check, and zero-copy `Owned<MESSAGE_ALIGN>`
 /// allocation strategy.
-pub struct TcpTransportReader {
+pub(crate) struct TcpTransportReader {
     inner: OwnedReadHalf<TcpStream>,
 }
 
 impl TcpTransportReader {
     #[must_use]
-    pub const fn new(inner: OwnedReadHalf<TcpStream>) -> Self {
+    #[allow(dead_code)]
+    pub(crate) const fn new(inner: OwnedReadHalf<TcpStream>) -> Self {
         Self { inner }
     }
 }
@@ -136,13 +146,13 @@ impl TransportReader for TcpTransportReader {
 /// `max_batch <= IOV_MAX / 2 = 512`; this impl does not enforce a cap
 /// because the Vec is already drained by the caller's admission
 /// control.
-pub struct TcpTransportWriter {
+pub(crate) struct TcpTransportWriter {
     inner: OwnedWriteHalf<TcpStream>,
 }
 
 impl TcpTransportWriter {
     #[must_use]
-    pub const fn new(inner: OwnedWriteHalf<TcpStream>) -> Self {
+    pub(crate) const fn new(inner: OwnedWriteHalf<TcpStream>) -> Self {
         Self { inner }
     }
 }
diff --git a/core/message_bus/src/writer_task.rs 
b/core/message_bus/src/writer_task.rs
index 75b09c827..14d292bab 100644
--- a/core/message_bus/src/writer_task.rs
+++ b/core/message_bus/src/writer_task.rs
@@ -43,8 +43,8 @@ use tracing::{debug, error, trace};
 /// Larger batches reduce syscalls per N messages at the cost of memory
 /// per batch and worst-case latency for the head-of-batch message.
 ///
-/// TCP entry point. Wraps the owned write half in a
-/// [`TcpTransportWriter`] and delegates the drain loop to
+/// TCP entry point. Wraps the owned write half in the crate-internal
+/// `TcpTransportWriter` and delegates the drain loop to
 /// [`run_transport`]; every syscall still flows through
 /// [`TransportWriter::send_batch`] so future transports drop in behind
 /// the same drain logic.
diff --git a/core/message_bus/tests/backpressure.rs 
b/core/message_bus/tests/backpressure.rs
index 7e849a981..a92e94090 100644
--- a/core/message_bus/tests/backpressure.rs
+++ b/core/message_bus/tests/backpressure.rs
@@ -52,6 +52,7 @@ async fn try_send_returns_backpressure_when_queue_full() {
     });
     let (l1, addr1) = bind(loopback()).await.unwrap();
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let accept_1 = install_replicas_locally(bus1.clone(), on_message1);
     let l1_handle = compio::runtime::spawn(async move {
         run(
@@ -63,6 +64,7 @@ async fn try_send_returns_backpressure_when_queue_full() {
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/message_bus/tests/connection_lost_notify.rs 
b/core/message_bus/tests/connection_lost_notify.rs
index 00a05b5f7..fcb2cce65 100644
--- a/core/message_bus/tests/connection_lost_notify.rs
+++ b/core/message_bus/tests/connection_lost_notify.rs
@@ -54,6 +54,7 @@ async fn 
connection_lost_fires_exactly_once_per_peer_disconnect() {
     // bus1 listens; bus0 dials.
     let (l1, addr1) = bind(loopback()).await.unwrap();
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let accept_1 = install_replicas_locally(bus1.clone(), on_message.clone());
     let l1_handle = compio::runtime::spawn(async move {
         run(
@@ -65,6 +66,7 @@ async fn 
connection_lost_fires_exactly_once_per_peer_disconnect() {
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/message_bus/tests/directional_connection.rs 
b/core/message_bus/tests/directional_connection.rs
index 6251939ce..9f80f352c 100644
--- a/core/message_bus/tests/directional_connection.rs
+++ b/core/message_bus/tests/directional_connection.rs
@@ -43,6 +43,7 @@ async fn lower_id_dials_higher_id_accepts() {
     let (l1, addr1) = bind(loopback()).await.unwrap();
 
     let token_for_l0 = bus0.token();
+    let nonces_for_l0 = bus0.replica_nonces();
     let accept_0 = install_replicas_locally(bus0.clone(), on_message.clone());
     let l0_handle = compio::runtime::spawn(async move {
         run(
@@ -54,12 +55,14 @@ async fn lower_id_dials_higher_id_accepts() {
             accept_0,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l0,
         )
         .await;
     });
     bus0.track_background(l0_handle);
 
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let accept_1 = install_replicas_locally(bus1.clone(), on_message.clone());
     let l1_handle = compio::runtime::spawn(async move {
         run(
@@ -71,6 +74,7 @@ async fn lower_id_dials_higher_id_accepts() {
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/message_bus/tests/handshake_auth.rs 
b/core/message_bus/tests/handshake_auth.rs
index b833354e7..6ed94257b 100644
--- a/core/message_bus/tests/handshake_auth.rs
+++ b/core/message_bus/tests/handshake_auth.rs
@@ -47,6 +47,7 @@ async fn handshake_with_matching_secret_registers_peer() {
     let token_for_listener = bus_listener.token();
     let accept = install_replicas_locally(bus_listener.clone(), 
on_msg.clone());
     let secret_for_listener = Rc::clone(&secret);
+    let nonces_for_listener = bus_listener.replica_nonces();
     let listener_handle = compio::runtime::spawn(async move {
         run(
             listener,
@@ -57,6 +58,7 @@ async fn handshake_with_matching_secret_registers_peer() {
             accept,
             message_bus::framing::MAX_MESSAGE_SIZE,
             secret_for_listener,
+            nonces_for_listener,
         )
         .await;
     });
@@ -102,6 +104,7 @@ async fn handshake_with_mismatched_secret_rejects_peer() {
     let (listener, addr) = bind(loopback()).await.unwrap();
     let token_for_listener = bus_listener.token();
     let accept = install_replicas_locally(bus_listener.clone(), 
on_msg.clone());
+    let nonces_for_listener = bus_listener.replica_nonces();
     let listener_handle = compio::runtime::spawn(async move {
         run(
             listener,
@@ -112,6 +115,7 @@ async fn handshake_with_mismatched_secret_rejects_peer() {
             accept,
             message_bus::framing::MAX_MESSAGE_SIZE,
             listener_secret,
+            nonces_for_listener,
         )
         .await;
     });
diff --git a/core/message_bus/tests/head_of_line.rs 
b/core/message_bus/tests/head_of_line.rs
index b756897ce..d046715a3 100644
--- a/core/message_bus/tests/head_of_line.rs
+++ b/core/message_bus/tests/head_of_line.rs
@@ -48,6 +48,7 @@ async fn slow_peer_does_not_block_other_peers() {
     });
     let (la, addr_a) = bind(loopback()).await.unwrap();
     let token_a = bus_a.token();
+    let nonces_a = bus_a.replica_nonces();
     let accept_a = install_replicas_locally(bus_a.clone(), on_a.clone());
     let la_handle = compio::runtime::spawn(async move {
         run(
@@ -59,6 +60,7 @@ async fn slow_peer_does_not_block_other_peers() {
             accept_a,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_a,
         )
         .await;
     });
diff --git a/core/message_bus/tests/reconnect.rs 
b/core/message_bus/tests/reconnect.rs
index 93ac64332..20add2649 100644
--- a/core/message_bus/tests/reconnect.rs
+++ b/core/message_bus/tests/reconnect.rs
@@ -59,6 +59,7 @@ async fn periodic_retry_picks_up_late_listener() {
     let bus1 = Rc::new(IggyMessageBus::new(0));
     let (l1, _) = bind(addr).await.unwrap();
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let accept_delegate = install_replicas_locally(bus1.clone(), 
on_message.clone());
     let l1_handle = compio::runtime::spawn(async move {
         run(
@@ -70,6 +71,7 @@ async fn periodic_retry_picks_up_late_listener() {
             accept_delegate,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/message_bus/tests/reconnect_skip_connected.rs 
b/core/message_bus/tests/reconnect_skip_connected.rs
index 31159e1db..635250a28 100644
--- a/core/message_bus/tests/reconnect_skip_connected.rs
+++ b/core/message_bus/tests/reconnect_skip_connected.rs
@@ -50,6 +50,7 @@ async fn periodic_reconnect_skips_already_connected_peer() {
         accept_inner(stream, peer_id);
     });
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let l1_handle = compio::runtime::spawn(async move {
         run(
             l1,
@@ -60,6 +61,7 @@ async fn periodic_reconnect_skips_already_connected_peer() {
             accept_delegate,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/message_bus/tests/replica_roundtrip.rs 
b/core/message_bus/tests/replica_roundtrip.rs
index 8d8fde80a..8b609c0a4 100644
--- a/core/message_bus/tests/replica_roundtrip.rs
+++ b/core/message_bus/tests/replica_roundtrip.rs
@@ -45,6 +45,7 @@ async fn two_replicas_exchange_prepare_and_ack() {
     let (l1, addr1) = bind(loopback()).await.expect("bind r1");
 
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let accept_delegate_1 = install_replicas_locally(bus1.clone(), 
on_message1.clone());
     let l1_handle = compio::runtime::spawn(async move {
         run(
@@ -56,6 +57,7 @@ async fn two_replicas_exchange_prepare_and_ack() {
             accept_delegate_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/message_bus/tests/shard_zero_gating.rs 
b/core/message_bus/tests/shard_zero_gating.rs
index 1dc0cc339..1b99d0a5b 100644
--- a/core/message_bus/tests/shard_zero_gating.rs
+++ b/core/message_bus/tests/shard_zero_gating.rs
@@ -41,6 +41,7 @@ async fn shard_zero_binds_listener_and_starts_connector() {
     let peer_handler: MessageHandler = Rc::new(|_, _| {});
     let (peer_listener, peer_addr) = bind(loopback()).await.expect("bind peer 
listener");
     let peer_token = peer_bus.token();
+    let peer_nonces = peer_bus.replica_nonces();
     let peer_accept = install_replicas_locally(peer_bus.clone(), 
peer_handler.clone());
     let peer_listen_handle = compio::runtime::spawn(async move {
         run(
@@ -52,6 +53,7 @@ async fn shard_zero_binds_listener_and_starts_connector() {
             peer_accept,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            peer_nonces,
         )
         .await;
     });
diff --git a/core/message_bus/tests/vectored_batch.rs 
b/core/message_bus/tests/vectored_batch.rs
index b079725a5..95deac326 100644
--- a/core/message_bus/tests/vectored_batch.rs
+++ b/core/message_bus/tests/vectored_batch.rs
@@ -47,6 +47,7 @@ async fn writer_batches_pipelined_sends_in_order() {
     });
     let (l1, addr1) = bind(loopback()).await.unwrap();
     let token_for_l1 = bus1.token();
+    let nonces_for_l1 = bus1.replica_nonces();
     let accept_1 = install_replicas_locally(bus1.clone(), on_message.clone());
     let l1_handle = compio::runtime::spawn(async move {
         run(
@@ -58,6 +59,7 @@ async fn writer_batches_pipelined_sends_in_order() {
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
             test_token_source(),
+            nonces_for_l1,
         )
         .await;
     });
diff --git a/core/server-ng/src/dedup.rs b/core/server-ng/src/dedup.rs
index 0022e9d85..cbc235129 100644
--- a/core/server-ng/src/dedup.rs
+++ b/core/server-ng/src/dedup.rs
@@ -17,25 +17,47 @@
  * under the License.
  */
 
-//! At-most-once request dedup window (IGGY-112, P0-T3).
+// `Dedup` is consumed only by the `#[cfg(test)]` tests below today; the
+// request dispatcher loop in server-ng is not yet wired up. Once the
+// dispatcher lands and calls `lookup` / `mark_in_flight` / `complete` /
+// `evict_client`, the `dead_code` warnings disappear without needing
+// this allow. See plan F6.
+#![allow(dead_code)]
+
+//! At-most-once **per-shard-lifetime** request dedup window (IGGY-112, P0-T3).
 //!
 //! Indexed by `(client_id, request)` pairs carried in the existing
 //! `RequestHeader.client` (u128) and `RequestHeader.request` (u64) fields.
 //! No wire-format change on the server-ng client plane was needed: the
 //! P0-T1 design note found the two fields already in place.
 //!
+//! # Lifetime scope (load-bearing)
+//!
+//! "At-most-once" applies for the lifetime of THIS shard's `Dedup`
+//! instance. Process restart drops every entry: a client that retries a
+//! request after a server restart re-applies side effects unless the
+//! request is idempotent at the handler level. Cross-process persistence
+//! would need a checkpoint file backed by the consensus WAL and is
+//! deferred. Operators sizing client-side retry policy must treat the
+//! dedup window as a *crash-window* mitigation only.
+//!
+//! # Per-client ring
+//!
 //! Per-client state is a fixed-capacity ring holding the last N
-//! `(request, Entry)` pairs.
+//! `(request, Entry)` pairs. A ring avoids unbounded growth under a
+//! chatty client without requiring TTL-driven purge ticks. Done entries
+//! also carry an `inserted_at_ns` timestamp; `lookup` treats entries
+//! older than `ttl_ns` as absent so a replay from far in the past sees
+//! `Fresh` rather than a stale cache hit.
 //!
-//! A ring avoids unbounded growth under a chatty client without
-//! requiring TTL-driven purge ticks. Done entries also carry an
-//! `inserted_at_ns` timestamp; `lookup` treats entries older than
-//! `ttl_ns` as absent so a replay from far in the past sees `Fresh`
-//! rather than a stale cache hit.
+//! # Threading
 //!
-//! The store is NOT thread-safe. Each server-ng shard owns a `Dedup`
-//! on its single-threaded compio runtime. Cross-shard routing happens
-//! above this layer.
+//! The store is NOT thread-safe. Each server-ng shard owns a `Dedup` on
+//! its single-threaded compio runtime. Cross-shard routing happens above
+//! this layer. Caller MUST invoke [`Dedup::evict_client`] on session
+//! disconnect / bind eviction so the per-client `AHashMap` entry is
+//! released; without that hook the outer map grows unbounded across the
+//! shard's session history.
 
 use ahash::AHashMap;
 use bytes::Bytes;
@@ -69,6 +91,11 @@ pub enum LookupResult<'a> {
     /// the cached reply once handler completes), or buffer.
     InFlight,
     /// Handler already ran; return the cached reply.
+    ///
+    /// The borrow ties to `&Dedup`. Callers MUST `clone()` the `Bytes`
+    /// (cheap, ref-counted) before invoking any `&mut Dedup` method on
+    /// the same instance; the borrow checker enforces this at compile
+    /// time but the contract is worth naming explicitly.
     Cached(&'a Bytes),
 }
 
@@ -93,7 +120,10 @@ struct PerClient {
 
 impl PerClient {
     fn with_capacity(capacity: usize) -> Self {
-        assert!(capacity > 0, "dedup ring capacity must be > 0");
+        // Capacity > 0 is validated at the public entry
+        // (`Dedup::with_config`); every callsite here goes through that
+        // path, so no second assert is needed.
+        debug_assert!(capacity > 0, "dedup ring capacity must be > 0");
         Self {
             slots: Vec::with_capacity(capacity),
             cursor: 0,
diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs
index 0aa424bba..d3849ab02 100644
--- a/core/server-ng/src/lib.rs
+++ b/core/server-ng/src/lib.rs
@@ -17,6 +17,10 @@
  * under the License.
  */
 
-pub mod dedup;
+// `dedup` is gated `pub(crate)` until the request dispatcher loop wires
+// up the lookup / mark / complete / evict_client API end-to-end. Marking
+// it crate-private now blocks downstream crates from coupling to a shape
+// the dispatcher PR will likely refine (see plan F6).
+pub(crate) mod dedup;
 pub mod login_register;
 pub mod session_manager;

Reply via email to