This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 61676604542396ce0056cae57ff51a8d1f294442 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Apr 24 14:20:25 2026 +0200 fix(message_bus): tag-verify before nonce insert + review hardening verify_envelope inserted the nonce ring slot BEFORE verifying the BLAKE3 tag. An off-secret attacker with TCP reachability to the listener could flush the ring with forged-tag Pings and open a 30 s replay window for a captured legit Ping. Reorder to timestamp -> tag -> nonce-insert; add regression test. Triple-confirmed by independent multi-expert review. Same pass landed adjacent hardening: NonceRing per-peer + bus-resident with inline [u128; CAP] storage; LABEL_REPLICA / LABEL_CLIENT plane separation in AuthChallenge; UnsupportedKind -> Unauthenticated for mixed-version triage; max_batch boot-asserted in 1..=IOV_MAX/2; auth + TcpTransport* + dedup surfaces shrunk to pub(crate); Unpin dropped from transport traits; InvalidHex carries byte; Y2554 debug_assert in build_ping_message; notify_connection_lost reentrant-safe; framing.rs reserve_exact diagnostic log; dedup docstring clarified to "at-most-once per-shard-lifetime"; installer.rs race comments tightened. 10 new regression tests; 120 pass. --- core/message_bus/src/auth.rs | 274 +++++++++++++++++---- core/message_bus/src/auth_config.rs | 20 +- core/message_bus/src/config.rs | 11 + core/message_bus/src/connector.rs | 18 +- core/message_bus/src/framing.rs | 13 +- core/message_bus/src/installer.rs | 8 +- core/message_bus/src/lib.rs | 164 +++++++++++- core/message_bus/src/replica_io.rs | 2 + core/message_bus/src/replica_listener.rs | 44 +++- core/message_bus/src/transports/mod.rs | 23 +- core/message_bus/src/transports/tcp.rs | 26 +- core/message_bus/src/writer_task.rs | 4 +- core/message_bus/tests/backpressure.rs | 2 + core/message_bus/tests/connection_lost_notify.rs | 2 + core/message_bus/tests/directional_connection.rs | 4 + core/message_bus/tests/handshake_auth.rs | 4 + core/message_bus/tests/head_of_line.rs | 2 + core/message_bus/tests/reconnect.rs | 2 + core/message_bus/tests/reconnect_skip_connected.rs | 2 + core/message_bus/tests/replica_roundtrip.rs | 2 + core/message_bus/tests/shard_zero_gating.rs | 2 + core/message_bus/tests/vectored_batch.rs | 2 + core/server-ng/src/dedup.rs | 52 +++- core/server-ng/src/lib.rs | 6 +- 24 files changed, 586 insertions(+), 103 deletions(-) diff --git a/core/message_bus/src/auth.rs b/core/message_bus/src/auth.rs index 1205a4578..dd90b18d2 100644 --- a/core/message_bus/src/auth.rs +++ b/core/message_bus/src/auth.rs @@ -40,15 +40,39 @@ use thiserror::Error; pub const SECRET_SIZE: usize = 32; /// Byte range of the envelope within `GenericHeader.reserved_command`. -pub const ENVELOPE_LEN: usize = 57; +pub(crate) const ENVELOPE_LEN: usize = 57; /// Acceptance window around `auth_timestamp`, in nanoseconds. 30 s. -pub const TIMESTAMP_WINDOW_NS: u128 = 30_000_000_000; +pub(crate) const TIMESTAMP_WINDOW_NS: u128 = 30_000_000_000; -/// Default nonce-dedup LRU capacity on the acceptor. -pub const NONCE_LRU_CAPACITY: usize = 256; - -const LABEL: &[u8; 16] = b"iggy-bus-auth-v1"; +/// Per-peer nonce-dedup ring capacity on the acceptor. +/// +/// 8 slots × 5-replica clusters × 16 B = 640 B inline; survives the +/// 30 s acceptance window even at the worst-case dial cadence +/// (`reconnect_period = 5 s`). Per-peer (rather than a single global +/// ring) means a flapping or hostile peer cannot evict honest peers' +/// nonce slots, so the verify-order guard in [`verify_envelope`] holds +/// up against an attacker who can reach the listener but lacks the +/// secret. See plan F2. +#[doc(hidden)] +pub const PER_PEER_NONCE_CAPACITY: usize = 8; + +/// Plane-tagged 16-byte label prefixed onto every challenge before the MAC. +/// +/// Domain separation: same cluster secret produces distinct MACs across +/// the replica plane and the (Phase 2) SDK-client plane, so a captured +/// replica handshake cannot be replayed onto the client plane and vice +/// versa even when both planes share a [`StaticSharedSecret`]. Bump the +/// trailing version digits when the challenge layout itself changes; +/// bump [`AuthKind`] when the MAC algorithm changes. +pub(crate) const LABEL_REPLICA: &[u8; 16] = b"iggy-bus-rep-v01"; + +/// Plane label for the SDK-client plane (reserved for Phase 2; not +/// exercised on the wire today). Defined now so the cross-plane MAC +/// guard test in this module can exercise both labels and the test +/// catches a regression before Phase 2 wiring lands. +#[allow(dead_code)] +pub(crate) const LABEL_CLIENT: &[u8; 16] = b"iggy-bus-cli-v01"; const KIND_OFFSET: usize = 56; const TAG_RANGE: std::ops::Range<usize> = 0..32; @@ -59,7 +83,7 @@ const RESERVED_RANGE: std::ops::Range<usize> = ENVELOPE_LEN..128; /// Envelope version tag. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] -pub enum AuthKind { +pub(crate) enum AuthKind { Blake3V1 = 1, } @@ -72,9 +96,13 @@ impl AuthKind { } } -/// Fully reconstructed challenge input. All fields come from the header -/// (`GenericHeader.cluster`, `GenericHeader.replica`, `GenericHeader.release`) -/// plus the envelope timestamp and nonce. +/// Fully reconstructed challenge input. +/// +/// All fields come from the header (`GenericHeader.cluster`, +/// `GenericHeader.replica`, `GenericHeader.release`) plus the envelope +/// timestamp and nonce. `label` is the plane-tagged domain-separation +/// prefix; pick [`LABEL_REPLICA`] for the consensus plane and +/// [`LABEL_CLIENT`] for the SDK-client plane (Phase 2). #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct AuthChallenge { pub cluster: u128, @@ -82,15 +110,17 @@ pub struct AuthChallenge { pub timestamp_ns: u64, pub release: u32, pub nonce: u128, + pub label: &'static [u8; 16], } impl AuthChallenge { - /// Serialize the challenge into a fixed-size buffer. Domain-separated by - /// a constant label so the same secret cannot be misused to forge a - /// different protocol's MAC. + /// Serialize the challenge into a fixed-size buffer. The leading 16 B + /// are the plane-tagged label, so the same secret cannot be misused + /// to forge a different plane's MAC even with otherwise-identical + /// inputs. fn encode(&self) -> [u8; 16 + 16 + 16 + 8 + 4 + 16] { let mut out = [0u8; 76]; - out[0..16].copy_from_slice(LABEL); + out[0..16].copy_from_slice(self.label); out[16..32].copy_from_slice(&self.cluster.to_le_bytes()); out[32..48].copy_from_slice(&self.peer_id.to_le_bytes()); out[48..56].copy_from_slice(&self.timestamp_ns.to_le_bytes()); @@ -102,11 +132,15 @@ impl AuthChallenge { /// MAC over an `AuthChallenge`. Wraps `blake3::Hash` for its /// constant-time `PartialEq` implementation. +/// +/// The inner `blake3::Hash` is `pub(crate)` so downstream callers cannot +/// reach in for byte-level `==` comparisons that would bypass the +/// constant-time `PartialEq` on the wrapper. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct AuthTag(pub blake3::Hash); +pub struct AuthTag(pub(crate) blake3::Hash); impl AuthTag { - const fn as_bytes(&self) -> &[u8; 32] { + pub(crate) const fn as_bytes(&self) -> &[u8; 32] { self.0.as_bytes() } } @@ -130,15 +164,25 @@ pub enum AuthError { /// Map a credential-layer `AuthError` to the public `IggyError` surface. /// +/// `UnsupportedKind` is routed to `Unauthenticated` rather than +/// `InvalidCommand` so that pre-auth peers in a mixed-version cluster +/// (whose all-zero `reserved_command` decodes to `AuthKind::from_byte(0) +/// -> UnsupportedKind`) surface in the operator's auth-bucket triage, +/// matching the actual deployment failure mode. `ReservedNonzero` keeps +/// the `InvalidCommand` mapping because the peer DID stamp a recognized +/// kind byte and then violated the envelope's reserved-zero contract: +/// that is a genuine protocol bug, not an auth gap. +/// /// Not implemented as `From<AuthError> for IggyError` because the impl /// would leak into every crate transitively linking `message_bus`, adding /// an extra `impl From<_> for IggyError` candidate to the orphan-rule set /// and tripping type inference (`?` ambiguity) in downstream crates. #[must_use] -pub const fn to_iggy_error(e: AuthError) -> IggyError { +pub(crate) const fn to_iggy_error(e: AuthError) -> IggyError { match e { - AuthError::UnsupportedKind | AuthError::ReservedNonzero => IggyError::InvalidCommand, - AuthError::TagMismatch + AuthError::ReservedNonzero => IggyError::InvalidCommand, + AuthError::UnsupportedKind + | AuthError::TagMismatch | AuthError::UnknownPeer | AuthError::TimestampOutOfWindow | AuthError::NonceReplay => IggyError::Unauthenticated, @@ -193,51 +237,65 @@ impl TokenSource for StaticSharedSecret { } } -/// Fixed-capacity ring-buffer nonce dedup. +/// Fixed-capacity ring-buffer nonce dedup. Inline `[u128; CAP]` storage — +/// no heap allocation, deterministic memory footprint. /// -/// Eviction is O(1) via a round-robin cursor; lookup is O(N) over -/// `NONCE_LRU_CAPACITY` entries, which is a 256-wide linear scan on the -/// hot path. Memory footprint is deterministic and bounded regardless of -/// peer behaviour. +/// Eviction is O(1) via a round-robin cursor; lookup is O(N) over `CAP` +/// entries (linear scan, but `CAP * 16 B` fits a single cache line for +/// `CAP <= 4` and stays L1-resident through `CAP = 256`). Memory bound +/// is `CAP * 16 B + 2 * sizeof(usize)` per ring; per-peer instances cap +/// the cluster-wide cost. +#[doc(hidden)] #[derive(Debug)] -pub struct NonceRing { - slots: Vec<u128>, +pub struct NonceRing<const CAP: usize> { + slots: [u128; CAP], + len: usize, cursor: usize, - capacity: usize, } -impl NonceRing { +impl<const CAP: usize> NonceRing<CAP> { #[must_use] - pub const fn new(capacity: usize) -> Self { + #[doc(hidden)] + pub const fn new() -> Self { Self { - slots: Vec::new(), + slots: [0u128; CAP], + len: 0, cursor: 0, - capacity, } } /// Returns `true` if `nonce` was not present (and has now been /// recorded); `false` if it was already seen. + /// + /// `slots[..self.len]` is the only range scanned for membership, so + /// the `[0; CAP]` initial state never collides with a real `0` + /// nonce: until the ring fills, untouched slots are out-of-range. + #[doc(hidden)] pub fn insert(&mut self, nonce: u128) -> bool { - if self.slots.contains(&nonce) { + if self.slots[..self.len].contains(&nonce) { return false; } - if self.slots.len() < self.capacity { - self.slots.push(nonce); + if self.len < CAP { + self.slots[self.len] = nonce; + self.len += 1; } else { self.slots[self.cursor] = nonce; - self.cursor = (self.cursor + 1) % self.capacity; + self.cursor = (self.cursor + 1) % CAP; } true } } -impl Default for NonceRing { +impl<const CAP: usize> Default for NonceRing<CAP> { fn default() -> Self { - Self::new(NONCE_LRU_CAPACITY) + Self::new() } } +/// Per-peer nonce ring with the production [`PER_PEER_NONCE_CAPACITY`]. +#[doc(hidden)] +pub type ReplicaNonceRing = NonceRing<PER_PEER_NONCE_CAPACITY>; + /// Encode an auth envelope into `reserved_command`. /// /// Overwrites the first `ENVELOPE_LEN` bytes and zero-fills the trailing @@ -245,7 +303,7 @@ impl Default for NonceRing { /// and `nonce` are caller-controlled so the same routine can be used for /// deterministic tests (fake clock, fixed nonce) and production (system /// clock, CSPRNG). -pub fn encode_envelope( +pub(crate) fn encode_envelope( reserved: &mut [u8; 128], token_source: &dyn TokenSource, challenge: &AuthChallenge, @@ -262,10 +320,10 @@ pub fn encode_envelope( /// Parsed view of an envelope's raw bytes. No cryptographic checks. #[derive(Debug, Clone, Copy)] -pub struct DecodedEnvelope { - pub tag: AuthTag, - pub timestamp_ns: u64, - pub nonce: u128, +pub(crate) struct DecodedEnvelope { + pub(crate) tag: AuthTag, + pub(crate) timestamp_ns: u64, + pub(crate) nonce: u128, } /// Decode an envelope and reject malformed layout. @@ -278,7 +336,7 @@ pub struct DecodedEnvelope { /// /// Returns `AuthError::UnsupportedKind` for unknown envelope versions or /// `AuthError::ReservedNonzero` for non-zero reserved padding. -pub fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, AuthError> { +pub(crate) fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, AuthError> { AuthKind::from_byte(reserved[KIND_OFFSET])?; if reserved[RESERVED_RANGE].iter().any(|&b| b != 0) { return Err(AuthError::ReservedNonzero); @@ -296,27 +354,34 @@ pub fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, AuthErro }) } -/// Full credential check: timestamp window -> nonce dedup -> tag. +/// Full credential check: timestamp window -> tag verify -> nonce dedup. +/// +/// **Order is load-bearing.** A tag-failed envelope must NOT consume a +/// nonce-ring slot, otherwise an off-secret attacker with TCP reachability +/// to the listener could flush the ring with forged-tag Pings (valid +/// timestamps + random nonces) and open a replay window for any captured +/// legit Ping inside the remaining timestamp slack. /// /// # Errors /// /// Returns the first `AuthError` triggered by the check sequence. -pub fn verify_envelope( +pub(crate) fn verify_envelope<const CAP: usize>( token_source: &dyn TokenSource, challenge: &AuthChallenge, decoded: &DecodedEnvelope, now_ns: u128, - nonces: &mut NonceRing, + nonces: &mut NonceRing<CAP>, ) -> Result<(), AuthError> { let ts_u128 = u128::from(decoded.timestamp_ns); let delta = now_ns.abs_diff(ts_u128); if delta > TIMESTAMP_WINDOW_NS { return Err(AuthError::TimestampOutOfWindow); } + token_source.verify(challenge, &decoded.tag)?; if !nonces.insert(decoded.nonce) { return Err(AuthError::NonceReplay); } - token_source.verify(challenge, &decoded.tag) + Ok(()) } #[cfg(test)] @@ -330,6 +395,7 @@ mod tests { timestamp_ns: 1_700_000_000_000_000_000, release: 1, nonce: 0x11_2233_4455_6677_8899_aabb_ccdd_eeff, + label: LABEL_REPLICA, } } @@ -341,7 +407,7 @@ mod tests { encode_envelope(&mut reserved, &source, &challenge); let decoded = decode_envelope(&reserved).expect("decode"); - let mut ring = NonceRing::new(32); + let mut ring = NonceRing::<32>::new(); verify_envelope( &source, &challenge, @@ -361,7 +427,7 @@ mod tests { encode_envelope(&mut reserved, &signer, &challenge); let decoded = decode_envelope(&reserved).unwrap(); - let mut ring = NonceRing::new(32); + let mut ring = NonceRing::<32>::new(); let err = verify_envelope( &verifier, &challenge, @@ -381,7 +447,7 @@ mod tests { encode_envelope(&mut reserved, &source, &challenge); let decoded = decode_envelope(&reserved).unwrap(); - let mut ring = NonceRing::new(32); + let mut ring = NonceRing::<32>::new(); let now = u128::from(challenge.timestamp_ns); verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap(); let err = verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap_err(); @@ -396,7 +462,7 @@ mod tests { encode_envelope(&mut reserved, &source, &challenge); let decoded = decode_envelope(&reserved).unwrap(); - let mut ring = NonceRing::new(32); + let mut ring = NonceRing::<32>::new(); let now = u128::from(challenge.timestamp_ns) + TIMESTAMP_WINDOW_NS + 1; let err = verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap_err(); assert_eq!(err, AuthError::TimestampOutOfWindow); @@ -410,7 +476,7 @@ mod tests { encode_envelope(&mut reserved, &source, &challenge); let decoded = decode_envelope(&reserved).unwrap(); - let mut ring = NonceRing::new(32); + let mut ring = NonceRing::<32>::new(); let now = u128::from(challenge.timestamp_ns).saturating_sub(TIMESTAMP_WINDOW_NS + 1); let err = verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap_err(); assert_eq!(err, AuthError::TimestampOutOfWindow); @@ -438,9 +504,93 @@ mod tests { assert_eq!(err, AuthError::ReservedNonzero); } + #[test] + fn forged_tag_does_not_consume_ring_slot() { + // Sender signs with secret A; verifier holds secret B. Verifier's + // tag check fails. The ring MUST NOT have recorded the nonce - + // otherwise an attacker without the secret could flood forged + // Pings to evict honest entries (auth.rs verify_envelope ordering + // bug; review F1 critical). + let signer = StaticSharedSecret::new([1u8; SECRET_SIZE]); + let verifier = StaticSharedSecret::new([2u8; SECRET_SIZE]); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &signer, &challenge); + + let decoded = decode_envelope(&reserved).unwrap(); + let mut ring = NonceRing::<4>::new(); + let err = verify_envelope( + &verifier, + &challenge, + &decoded, + u128::from(challenge.timestamp_ns), + &mut ring, + ) + .unwrap_err(); + assert_eq!(err, AuthError::TagMismatch); + + // Same nonce now signed correctly with secret B must succeed - + // proves the failed-tag attempt did NOT burn the ring slot. + let mut reserved_legit = [0u8; 128]; + encode_envelope(&mut reserved_legit, &verifier, &challenge); + let decoded_legit = decode_envelope(&reserved_legit).unwrap(); + verify_envelope( + &verifier, + &challenge, + &decoded_legit, + u128::from(challenge.timestamp_ns), + &mut ring, + ) + .expect("legit verify must succeed; nonce slot must have been free"); + } + + #[test] + fn cross_plane_mac_does_not_verify() { + // Same secret, same numeric inputs - only label differs. Replica- + // plane signer must NOT pass client-plane verification (and vice + // versa). Otherwise a captured replica handshake could be replayed + // onto the client plane once Phase 2 ships. Review F4. + let secret = StaticSharedSecret::new([3u8; SECRET_SIZE]); + let replica_challenge = sample_challenge(); + let mut client_challenge = replica_challenge; + client_challenge.label = LABEL_CLIENT; + + let mut replica_envelope = [0u8; 128]; + encode_envelope(&mut replica_envelope, &secret, &replica_challenge); + let decoded = decode_envelope(&replica_envelope).unwrap(); + let mut ring = NonceRing::<4>::new(); + let err = verify_envelope( + &secret, + &client_challenge, + &decoded, + u128::from(replica_challenge.timestamp_ns), + &mut ring, + ) + .unwrap_err(); + assert_eq!(err, AuthError::TagMismatch); + } + + #[test] + fn unsupported_kind_maps_to_unauthenticated() { + // Mixed-version cluster: pre-auth peer sends zeroed + // reserved_command -> AuthKind::from_byte(0) -> UnsupportedKind. + // Operator log triage must hit the auth bucket, not the + // protocol-violation bucket. Review F3. + assert_eq!( + to_iggy_error(AuthError::UnsupportedKind), + IggyError::Unauthenticated + ); + // ReservedNonzero stays InvalidCommand: peer DID stamp a known + // kind, then violated the envelope's reserved-zero contract. + assert_eq!( + to_iggy_error(AuthError::ReservedNonzero), + IggyError::InvalidCommand + ); + } + #[test] fn nonce_ring_fills_and_evicts_fifo() { - let mut ring = NonceRing::new(3); + let mut ring = NonceRing::<3>::new(); assert!(ring.insert(1)); assert!(ring.insert(2)); assert!(ring.insert(3)); @@ -449,4 +599,22 @@ mod tests { // Nonce 1 evicted; reinserting it should now succeed again. assert!(ring.insert(1)); } + + #[test] + fn nonce_ring_zero_initial_does_not_false_positive() { + // [0; CAP] initial state must NOT match a real `0` nonce until + // the slot has been written. Edge case for the const-generic + // refactor (review F2). + let mut ring = NonceRing::<4>::new(); + assert!(ring.insert(0)); + assert!(!ring.insert(0)); + assert!(ring.insert(1)); + assert!(ring.insert(2)); + } + + #[test] + fn nonce_ring_default_is_empty() { + let mut ring = NonceRing::<4>::default(); + assert!(ring.insert(42)); + } } diff --git a/core/message_bus/src/auth_config.rs b/core/message_bus/src/auth_config.rs index af715c6d3..dfba8bb89 100644 --- a/core/message_bus/src/auth_config.rs +++ b/core/message_bus/src/auth_config.rs @@ -66,8 +66,8 @@ pub enum SecretLoadError { BothSet, #[error("secret has {got} hex characters; expected {HEX_SECRET_LEN}")] WrongLength { got: usize }, - #[error("secret contains non-hex character at byte offset {0}")] - InvalidHex(usize), + #[error("secret contains non-hex character {byte:#04x} at byte offset {offset}")] + InvalidHex { offset: usize, byte: u8 }, #[error("failed to read secret file {path}")] FileRead { path: String, @@ -135,8 +135,14 @@ fn decode_hex_secret(input: &str) -> Result<[u8; SECRET_SIZE], SecretLoadError> let bytes = input.as_bytes(); let mut out = [0u8; SECRET_SIZE]; for (i, chunk) in bytes.chunks_exact(2).enumerate() { - let hi = decode_hex_nibble(chunk[0]).ok_or(SecretLoadError::InvalidHex(i * 2))?; - let lo = decode_hex_nibble(chunk[1]).ok_or(SecretLoadError::InvalidHex(i * 2 + 1))?; + let hi = decode_hex_nibble(chunk[0]).ok_or(SecretLoadError::InvalidHex { + offset: i * 2, + byte: chunk[0], + })?; + let lo = decode_hex_nibble(chunk[1]).ok_or(SecretLoadError::InvalidHex { + offset: i * 2 + 1, + byte: chunk[1], + })?; out[i] = (hi << 4) | lo; } Ok(out) @@ -211,7 +217,10 @@ mod tests { bad.replace_range(4..5, "Z"); let err = resolve(Some(&bad), None).unwrap_err(); match err { - SecretLoadError::InvalidHex(offset) => assert_eq!(offset, 4), + SecretLoadError::InvalidHex { offset, byte } => { + assert_eq!(offset, 4); + assert_eq!(byte, b'Z'); + } other => panic!("unexpected error: {other:?}"), } } @@ -260,6 +269,7 @@ mod tests { timestamp_ns: 3, release: 4, nonce: 5, + label: crate::auth::LABEL_REPLICA, }; let from_loader = source.sign(&challenge); let from_reference = reference.sign(&challenge); diff --git a/core/message_bus/src/config.rs b/core/message_bus/src/config.rs index f82114461..ea50a817d 100644 --- a/core/message_bus/src/config.rs +++ b/core/message_bus/src/config.rs @@ -35,6 +35,17 @@ use std::time::Duration; +/// Hard upper bound on `max_batch`, in iovecs. +/// +/// Linux's `IOV_MAX` is 1024 (`/usr/include/bits/uio_lim.h`). Future WS +/// transports emit one iovec for the header and one for the body, so a +/// batch of N messages costs `2 * N` iovecs; cap `max_batch` at +/// `IOV_MAX / 2 = 512` to keep that worst case below the syscall limit. +/// Bus construction asserts this in [`crate::IggyMessageBus::with_config`]; +/// breaching it at boot panics rather than silently delivering writev +/// `EMSGSIZE` errors on every batch. +pub const IOV_MAX_LIMIT: usize = 512; + /// Aggregated runtime configuration for an `IggyMessageBus` instance. /// /// All fields map onto constants that previously lived inline across diff --git a/core/message_bus/src/connector.rs b/core/message_bus/src/connector.rs index 01d18a255..a3b5a1c54 100644 --- a/core/message_bus/src/connector.rs +++ b/core/message_bus/src/connector.rs @@ -25,7 +25,7 @@ //! (see `shard::coordinator::ShardZeroCoordinator`). use crate::IggyMessageBus; -use crate::auth::{self, TokenSource}; +use crate::auth::{self, LABEL_REPLICA, TokenSource}; use crate::framing; use crate::lifecycle::ShutdownToken; use crate::socket_opts::apply_keepalive_for_connection; @@ -102,9 +102,11 @@ async fn connect_all( // connection lives on shard 0; `owning_shard` covers multi-shard // deployments where the fd was delegated to a peer shard but the // mapping broadcast reached shard 0. Either hit means a previous - // sweep (or the inbound listener) already installed this peer and - // we must not dial again - redialing would tear down the live - // socket via the `AlreadyRegistered` race and flap the mapping. + // sweep (or the inbound listener) already installed this peer. + // Redialing wastes a TCP round-trip; the live entry stays intact + // (the loser of the registry insert race in `install_replica_conn` + // gets `RejectedRegistration` back and tears DOWN ITS OWN orphan + // tasks via `drain_rejected_registration`, never the winner's). if bus.replicas().contains(peer_id) || bus.owning_shard(peer_id).is_some() { debug!( replica = peer_id, @@ -197,8 +199,13 @@ fn build_ping_message( replica_id: u8, token_source: &dyn TokenSource, ) -> Message<GenericHeader> { + let now_ns = IggyTimestamp::now().as_nanos(); + debug_assert!( + now_ns <= u128::from(u64::MAX), + "auth_timestamp overflows u64 ns (year ~2554) - envelope TIMESTAMP_RANGE is 8 bytes by design" + ); #[allow(clippy::cast_possible_truncation)] - let timestamp_ns = IggyTimestamp::now().as_nanos() as u64; + let timestamp_ns = now_ns as u64; let nonce = { let mut buf = [0u8; 16]; rand::rng().fill_bytes(&mut buf); @@ -218,6 +225,7 @@ fn build_ping_message( timestamp_ns, release: h.release, nonce, + label: LABEL_REPLICA, }; auth::encode_envelope(&mut h.reserved_command, token_source, &challenge); }, diff --git a/core/message_bus/src/framing.rs b/core/message_bus/src/framing.rs index 391a86aa2..0f3af8576 100644 --- a/core/message_bus/src/framing.rs +++ b/core/message_bus/src/framing.rs @@ -28,6 +28,7 @@ use iggy_binary_protocol::consensus::MESSAGE_ALIGN; use iggy_binary_protocol::consensus::iobuf::Owned; use iggy_binary_protocol::{GenericHeader, HEADER_SIZE, Message}; use iggy_common::IggyError; +use tracing::error; /// Default hard ceiling on a single wire frame. Frames above this are /// almost certainly a protocol violation or a malicious peer; we drop @@ -126,7 +127,17 @@ pub async fn read_message<S: AsyncReadExt + Unpin>( // (e.g. capacity overflow, unsupported buffer kind) and silently // ignoring it would leave `owned` at header-only capacity, causing the // subsequent `read_exact` to read into an ungrown buffer. - IoBufMut::reserve_exact(&mut owned, body_size).map_err(|_| IggyError::TcpError)?; + // + // No `IggyError::OutOfMemory` variant exists; `TcpError` is the closest + // bucket. The `error!` log gives operators the actionable diagnostic + // (size, OOM-vs-overflow signal) the error code itself loses. + IoBufMut::reserve_exact(&mut owned, body_size).map_err(|_| { + error!( + body_size, + "framing reserve_exact failed (OOM or capacity overflow); dropping connection" + ); + IggyError::TcpError + })?; let BufResult(result, slice) = stream .read_exact(owned.slice(HEADER_SIZE..total_size)) .await; diff --git a/core/message_bus/src/installer.rs b/core/message_bus/src/installer.rs index 21d381296..4d274860c 100644 --- a/core/message_bus/src/installer.rs +++ b/core/message_bus/src/installer.rs @@ -157,9 +157,11 @@ pub fn install_replica_conn<C: TransportConn>( let notified = Rc::new(Cell::new(false)); // If the registry insert below races with a concurrent install for // the same peer id and loses, both spawned halves must skip their - // post-loop cleanup: calling `replicas().remove` / `close_peer` or - // `notify_connection_lost` would evict the winner's entry and - // clobber its mapping broadcast. `compio::runtime::JoinHandle::drop` + // post-loop cleanup: the loser's `replicas().remove` / + // `close_peer_if_token_matches` calls would no-op against the winner's + // generation token (so they can't evict the live entry), but + // `notify_connection_lost` has no token guard and would still broadcast + // a spurious mapping-clear round. `compio::runtime::JoinHandle::drop` // does not cancel the spawned task, so we have to tell the tasks to // stand down in-band. let install_aborted = Rc::new(Cell::new(false)); diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index 96d87d961..811102e51 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -40,6 +40,13 @@ //! timestamp, 16 B nonce, 1 B kind — so the 256 B header layout is //! unchanged. //! +//! Each plane prepends a distinct 16-byte LABEL into the MAC input +//! (`auth::LABEL_REPLICA` for the consensus plane; +//! `auth::LABEL_CLIENT` reserved for Phase 2). The label is part of +//! the cryptographic challenge, so a captured handshake on one plane +//! cannot be replayed onto the other even if both share one +//! `StaticSharedSecret`. +//! //! # Invariants worth naming //! //! - [`send_to_client`](IggyMessageBus::send_to_client) and @@ -87,7 +94,7 @@ pub(crate) mod socket_opts; pub mod transports; pub mod writer_task; -pub use config::MessageBusConfig; +pub use config::{IOV_MAX_LIMIT, MessageBusConfig}; pub use error::SendError; pub use installer::ConnectionInstaller; pub use lifecycle::{ @@ -95,14 +102,31 @@ pub use lifecycle::{ ReplicaRegistry, Shutdown, ShutdownToken, }; +use auth::ReplicaNonceRing; use compio::runtime::JoinHandle; use iggy_binary_protocol::consensus::MESSAGE_ALIGN; use iggy_binary_protocol::consensus::iobuf::Frozen; use iggy_binary_protocol::{GenericHeader, Message}; use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Rc; use std::time::Duration; +/// Per-replica nonce dedup store, owned by [`IggyMessageBus`] and +/// shared with the listener task via [`Rc`]. Keyed by `replica_id` so +/// each peer's ring is isolated; a flapping or hostile peer cannot +/// evict honest peers' nonce slots. Bus-resident (rather than +/// task-local on the listener) so the nonce window survives a listener +/// task respawn within one process. Process restart still drops the +/// store; cross-process persistence is out of scope. +/// +/// Type is `pub` (rather than `pub(crate)`) only because it appears in +/// the public signature of [`replica_listener::run`]; downstream code +/// should obtain a clone via [`IggyMessageBus::replica_nonces`] rather +/// than constructing one directly. +#[doc(hidden)] +pub type ReplicaNonceStore = RefCell<HashMap<u8, ReplicaNonceRing>>; + /// Callback for forwarding a consensus message to a remote shard. /// /// Provided by the shard layer at bus construction and installed via @@ -194,7 +218,10 @@ pub trait MessageBus { /// - a [`ConnectionRegistry<u128>`] for clients (`u128` id is shard-packed), /// - a [`ReplicaRegistry`] for replicas (`u8` id from the Ping handshake, /// backed by a fixed-size array to avoid hash lookup on every send), -/// - the `JoinHandle`s of background tasks (accept loops, reconnect periodic). +/// - the `JoinHandle`s of background tasks (accept loops, reconnect periodic), +/// - a per-peer [`ReplicaNonceStore`] used by the replica-plane handshake +/// to dedup nonces inside the 30 s timestamp window (see +/// [`Self::replica_nonces`]). /// /// Send semantics: /// - `send_to_*` clones the per-peer `Sender` out of the registry, calls @@ -229,6 +256,10 @@ pub struct IggyMessageBus { /// when they exit abnormally. `None` when running without a shard-0 /// coordinator (single-shard deployments and tests). connection_lost_fn: RefCell<Option<ConnectionLostFn>>, + /// Per-peer replica handshake nonce dedup store. Held in an `Rc` so + /// the listener task can take a clone across the spawn boundary + /// without stealing the bus's only handle. See [`ReplicaNonceStore`]. + replica_nonces: Rc<ReplicaNonceStore>, } impl IggyMessageBus { @@ -253,8 +284,20 @@ impl IggyMessageBus { } /// Construct a bus with the given [`MessageBusConfig`]. + /// + /// # Panics + /// + /// Panics if `config.max_batch == 0` or + /// `config.max_batch > IOV_MAX_LIMIT`. Boot-time validation; surfaces + /// operator misconfiguration loudly rather than letting every writev + /// fail silently with `EMSGSIZE` once traffic starts. #[must_use] pub fn with_config(shard_id: u16, config: MessageBusConfig) -> Self { + assert!( + config.max_batch > 0 && config.max_batch <= IOV_MAX_LIMIT, + "MessageBusConfig::max_batch must be in 1..={IOV_MAX_LIMIT} (writev IOV_MAX/2 cap); got {}", + config.max_batch, + ); let (shutdown, token) = Shutdown::new(); Self { shard_id, @@ -268,9 +311,24 @@ impl IggyMessageBus { replica_forward_fn: RefCell::new(None), client_forward_fn: RefCell::new(None), connection_lost_fn: RefCell::new(None), + replica_nonces: Rc::new(RefCell::new(HashMap::new())), } } + /// Cheap clone of the bus-resident replica-plane nonce store. + /// + /// The listener task takes one clone at boot and reaches into it + /// from `handshake_verify`; the `Rc` keeps the store alive across + /// the listener task's lifetime and survives a respawn within one + /// process. Tests that drive `replica_listener::run` directly use + /// this accessor to obtain the same store the production bootstrap + /// would. + #[doc(hidden)] + #[must_use] + pub fn replica_nonces(&self) -> Rc<ReplicaNonceStore> { + Rc::clone(&self.replica_nonces) + } + /// Install the notifier used by delegated replica connections to tell /// shard 0 that a connection died. Single place to inject in tests too. pub fn set_connection_lost_fn(&self, f: ConnectionLostFn) { @@ -278,8 +336,18 @@ impl IggyMessageBus { } /// Invoke the registered connection-lost notifier, if any. + /// + /// Clones the `Rc` out of the `RefCell` borrow before invoking the + /// closure so the closure body is free to call + /// [`Self::set_connection_lost_fn`] (which takes a `borrow_mut`) + /// without tripping the runtime borrow check. pub(crate) fn notify_connection_lost(&self, replica_id: u8) { - if let Some(f) = self.connection_lost_fn.borrow().as_ref() { + let cb = self + .connection_lost_fn + .borrow() + .as_ref() + .map(std::rc::Rc::clone); + if let Some(f) = cb { f(replica_id); } } @@ -688,6 +756,96 @@ mod tests { assert_eq!(bus.background_tasks.borrow().len(), 2); } + #[compio::test] + #[allow(clippy::future_not_send)] + async fn replica_nonce_store_isolates_peers() { + // Per-peer rings: two peers replaying the same nonce must both + // be admitted (different scopes), and a same-peer replay must + // be rejected. Review F2. + let bus = IggyMessageBus::new(0); + let nonces = bus.replica_nonces(); + let mut store = nonces.borrow_mut(); + assert!(store.entry(1).or_default().insert(0xdead_beef)); + assert!( + store.entry(2).or_default().insert(0xdead_beef), + "different peer with same nonce must not be a replay" + ); + assert!( + !store.entry(1).or_default().insert(0xdead_beef), + "same peer + same nonce is a replay" + ); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn replica_nonce_store_survives_handle_drop() { + // Listener task drop must NOT clear the nonce window; the store + // is `Rc`-shared and the bus retains a clone. Proves the + // restart-survival property the per-task `RefCell<NonceRing>` + // could not deliver. Review F2. + let bus = IggyMessageBus::new(0); + let nonces1 = bus.replica_nonces(); + { + let mut store = nonces1.borrow_mut(); + assert!(store.entry(1).or_default().insert(0xfeed_face)); + } + drop(nonces1); // simulates listener task exit + + let nonces2 = bus.replica_nonces(); + let mut store = nonces2.borrow_mut(); + assert!( + !store.entry(1).or_default().insert(0xfeed_face), + "post-restart replay must be rejected" + ); + } + + #[test] + #[should_panic(expected = "MessageBusConfig::max_batch must be in")] + fn max_batch_oversize_rejected() { + let cfg = MessageBusConfig { + max_batch: 4096, + ..MessageBusConfig::default() + }; + let _ = IggyMessageBus::with_config(0, cfg); + } + + #[test] + #[should_panic(expected = "MessageBusConfig::max_batch must be in")] + fn max_batch_zero_rejected() { + let cfg = MessageBusConfig { + max_batch: 0, + ..MessageBusConfig::default() + }; + let _ = IggyMessageBus::with_config(0, cfg); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn notify_connection_lost_handles_reentrant_install() { + // Closure swaps itself out via `set_connection_lost_fn`, which + // calls `borrow_mut`. The pre-fix code held a `Ref` across the + // closure invocation and panicked here. + let bus = std::rc::Rc::new(IggyMessageBus::new(0)); + let bus_for_closure = bus.clone(); + let counter: std::rc::Rc<std::cell::Cell<u8>> = std::rc::Rc::new(std::cell::Cell::new(0)); + let counter_inner = counter.clone(); + let cb: ConnectionLostFn = std::rc::Rc::new(move |_replica: u8| { + counter_inner.set(counter_inner.get() + 1); + // Reentrant install: replace the closure mid-invoke. + let counter_replacement = counter_inner.clone(); + bus_for_closure.set_connection_lost_fn(std::rc::Rc::new(move |_| { + counter_replacement.set(counter_replacement.get() + 10); + })); + }); + bus.set_connection_lost_fn(cb); + + bus.notify_connection_lost(1); // first closure runs (+1) and swaps + assert_eq!(counter.get(), 1); + + bus.notify_connection_lost(1); // second closure runs (+10) + assert_eq!(counter.get(), 11); + } + #[compio::test] #[allow(clippy::future_not_send)] async fn shutdown_loop_drains_tasks_added_during_shutdown() { diff --git a/core/message_bus/src/replica_io.rs b/core/message_bus/src/replica_io.rs index 3731cc6d8..cc2eca8c1 100644 --- a/core/message_bus/src/replica_io.rs +++ b/core/message_bus/src/replica_io.rs @@ -85,6 +85,7 @@ pub async fn start_on_shard_zero( let on_accepted_replica_for_listener = on_accepted_replica.clone(); let listener_max_message_size = bus.config().max_message_size; let token_source_for_listener = Rc::clone(&token_source); + let nonces_for_listener = bus.replica_nonces(); let replica_handle = compio::runtime::spawn(async move { run_replica_listener( replica_listener, @@ -95,6 +96,7 @@ pub async fn start_on_shard_zero( on_accepted_replica_for_listener, listener_max_message_size, token_source_for_listener, + nonces_for_listener, ) .await; }); diff --git a/core/message_bus/src/replica_listener.rs b/core/message_bus/src/replica_listener.rs index b056f30ea..6a0789369 100644 --- a/core/message_bus/src/replica_listener.rs +++ b/core/message_bus/src/replica_listener.rs @@ -50,16 +50,35 @@ //! NOT provided here: operators still need to deploy the replica port on a //! trusted network boundary (cluster-local VPC, private subnet, overlay) //! if wire confidentiality is required. +//! +//! # Trust model +//! +//! The cluster ships with one shared 32 B secret across all replicas +//! (see [`crate::auth_config`]). Any holder of the secret can mint a +//! valid MAC for ANY `peer_id` in either direction. The dialer-side +//! directional rule (`peer_id > self_id` only dials) plus the +//! acceptor-side check (`parsed.replica < self_id`) blocks UPWARD +//! impersonation - a low-id peer cannot pretend to be a high-id peer +//! during dial - but cross-impersonation in the OTHER direction is open +//! by design under a single shared secret. A compromised replica B can +//! authenticate as any replica A with `A < B`. Mitigation comes from +//! the trusted-LAN deployment assumption + the fact that the +//! authenticated identity is only used for VSR consensus framing on the +//! same plane. +//! +//! Per-peer secrets (each replica gets its own key, the acceptor looks +//! up the right one for the announced `peer_id`) is the documented +//! Phase 2+ hardening. Until then, the cluster operator must treat +//! "secret compromise" as equivalent to "any-replica compromise." -use crate::auth::{self, NonceRing, TokenSource}; +use crate::auth::{self, LABEL_REPLICA, TokenSource}; use crate::framing; use crate::lifecycle::ShutdownToken; -use crate::{AcceptedReplicaFn, GenericHeader, Message}; +use crate::{AcceptedReplicaFn, GenericHeader, Message, ReplicaNonceStore}; use compio::net::{SocketOpts, TcpListener, TcpStream}; use futures::FutureExt; use iggy_binary_protocol::Command2; use iggy_common::{IggyError, IggyTimestamp}; -use std::cell::RefCell; use std::net::SocketAddr; use std::rc::Rc; use tracing::{debug, error, info, warn}; @@ -94,6 +113,12 @@ pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyErr /// Run the inbound replica listener accept loop until the shutdown token /// fires. Every successful handshake fires the `on_accepted` callback; the /// callback owns the accepted stream from that point on. +/// +/// `nonces` is the bus-resident per-peer nonce dedup store. The listener +/// reaches into it via `borrow_mut().entry(peer).or_default()` for each +/// handshake; the borrow is held only across the synchronous +/// [`handshake_verify`], never across an `await`, so the `RefCell` is +/// safe under the single-threaded compio runtime. #[allow(clippy::future_not_send)] #[allow(clippy::too_many_arguments)] pub async fn run( @@ -105,12 +130,12 @@ pub async fn run( on_accepted: AcceptedReplicaFn, max_message_size: usize, token_source: Rc<dyn TokenSource>, + nonces: Rc<ReplicaNonceStore>, ) { info!( "Replica listener accepting on {:?}", listener.local_addr().ok() ); - let nonces = RefCell::new(NonceRing::default()); loop { futures::select! { () = token.wait().fuse() => { @@ -124,11 +149,12 @@ pub async fn run( let outcome = match read { Ok(parsed) => { let now_ns = IggyTimestamp::now().as_nanos(); - let mut ring = nonces.borrow_mut(); + let mut store = nonces.borrow_mut(); + let ring = store.entry(parsed.replica).or_default(); handshake_verify( &parsed, token_source.as_ref(), - &mut ring, + ring, now_ns, self_id, replica_count, @@ -186,6 +212,7 @@ async fn handshake_read( timestamp_ns: decoded.timestamp_ns, release: header.release, nonce: decoded.nonce, + label: LABEL_REPLICA, }; Ok(ParsedHandshake { replica: header.replica, @@ -195,11 +222,12 @@ async fn handshake_read( } /// Synchronous verifier: tag + replay, then the directional tiebreak. -/// Runs outside any await so the `&mut NonceRing` borrow cannot span I/O. +/// Runs outside any await so the `&mut ReplicaNonceRing` borrow cannot +/// span I/O. fn handshake_verify( parsed: &ParsedHandshake, token_source: &dyn TokenSource, - nonces: &mut NonceRing, + nonces: &mut auth::ReplicaNonceRing, now_ns: u128, self_id: u8, replica_count: u8, diff --git a/core/message_bus/src/transports/mod.rs b/core/message_bus/src/transports/mod.rs index c98faf467..157a5780a 100644 --- a/core/message_bus/src/transports/mod.rs +++ b/core/message_bus/src/transports/mod.rs @@ -67,7 +67,14 @@ pub mod tcp; -pub use tcp::{TcpTransportConn, TcpTransportListener, TcpTransportReader, TcpTransportWriter}; +// Only `Conn` and `Writer` have crate-internal callers today +// (`installer.rs` wraps the dialed/accepted stream in a `TcpTransportConn`; +// `writer_task.rs` wraps the owned write half in a `TcpTransportWriter`). +// `TcpTransportListener` and `TcpTransportReader` stay reachable via the +// `tcp` submodule and exist primarily so the trait surface compiles +// end-to-end against a real impl; future WSS / QUIC listeners drop in +// alongside without touching call sites. +pub(crate) use tcp::{TcpTransportConn, TcpTransportWriter}; use iggy_binary_protocol::consensus::MESSAGE_ALIGN; use iggy_binary_protocol::consensus::iobuf::Frozen; @@ -118,8 +125,14 @@ pub trait TransportConn: 'static { } /// Read half. Produces one framed [`Message<GenericHeader>`] per call. +/// +/// `'static` so the half can be moved into a `compio::runtime::spawn`'d +/// task. `Unpin` is NOT required at the trait level: TCP impls satisfy it +/// trivially via `OwnedReadHalf<TcpStream>`, and a future +/// `compio-quic::RecvStream` impl that is not `Unpin` can pin internally +/// without forcing every other impl to `Box::pin`. #[allow(async_fn_in_trait)] -pub trait TransportReader: Unpin + 'static { +pub trait TransportReader: 'static { /// Read the next `GenericHeader`-framed message off the wire. /// /// Validates the 256 B header and bounds the total frame size to @@ -141,8 +154,12 @@ pub trait TransportReader: Unpin + 'static { } /// Write half. Atomic-or-error batched writer. +/// +/// `'static` so the half can be moved into a `compio::runtime::spawn`'d +/// task. `Unpin` is NOT required at the trait level (see +/// [`TransportReader`] for the rationale). #[allow(async_fn_in_trait)] -pub trait TransportWriter: Unpin + 'static { +pub trait TransportWriter: 'static { /// Send every buffer in `batch` atomically. /// /// On success the Vec is drained (empty on return); the same diff --git a/core/message_bus/src/transports/tcp.rs b/core/message_bus/src/transports/tcp.rs index 7478c99b4..79694b06d 100644 --- a/core/message_bus/src/transports/tcp.rs +++ b/core/message_bus/src/transports/tcp.rs @@ -49,13 +49,22 @@ use std::net::SocketAddr; /// `keepalive`) via `compio::net::SocketOpts`. See /// [`crate::replica_listener::bind`] and /// [`crate::client_listener::bind`] for the canonical construction. -pub struct TcpTransportListener { +/// +/// Currently only used by trait-conformance tests; production `accept` +/// loops in [`crate::replica_listener`] / [`crate::client_listener`] +/// drive a raw `TcpListener` directly. Kept `pub(crate)` so the +/// `TransportListener` impl below is actually exercisable; the +/// `dead_code` allow is intentional and will fall away when the listener +/// loops migrate behind the trait (Phase 2+). +#[allow(dead_code)] +pub(crate) struct TcpTransportListener { inner: TcpListener, } impl TcpTransportListener { #[must_use] - pub const fn new(inner: TcpListener) -> Self { + #[allow(dead_code)] + pub(crate) const fn new(inner: TcpListener) -> Self { Self { inner } } } @@ -76,13 +85,13 @@ impl TransportListener for TcpTransportListener { /// result of a `TcpStream::connect` on the dialer path. Takes ownership /// of the stream; [`Self::into_split`] transfers that ownership into /// the read and write halves bound to the per-connection tasks. -pub struct TcpTransportConn { +pub(crate) struct TcpTransportConn { stream: TcpStream, } impl TcpTransportConn { #[must_use] - pub const fn new(stream: TcpStream) -> Self { + pub(crate) const fn new(stream: TcpStream) -> Self { Self { stream } } } @@ -106,13 +115,14 @@ impl TransportConn for TcpTransportConn { /// [`framing::read_message`]; the two paths share the same header /// decode, bounds check, and zero-copy `Owned<MESSAGE_ALIGN>` /// allocation strategy. -pub struct TcpTransportReader { +pub(crate) struct TcpTransportReader { inner: OwnedReadHalf<TcpStream>, } impl TcpTransportReader { #[must_use] - pub const fn new(inner: OwnedReadHalf<TcpStream>) -> Self { + #[allow(dead_code)] + pub(crate) const fn new(inner: OwnedReadHalf<TcpStream>) -> Self { Self { inner } } } @@ -136,13 +146,13 @@ impl TransportReader for TcpTransportReader { /// `max_batch <= IOV_MAX / 2 = 512`; this impl does not enforce a cap /// because the Vec is already drained by the caller's admission /// control. -pub struct TcpTransportWriter { +pub(crate) struct TcpTransportWriter { inner: OwnedWriteHalf<TcpStream>, } impl TcpTransportWriter { #[must_use] - pub const fn new(inner: OwnedWriteHalf<TcpStream>) -> Self { + pub(crate) const fn new(inner: OwnedWriteHalf<TcpStream>) -> Self { Self { inner } } } diff --git a/core/message_bus/src/writer_task.rs b/core/message_bus/src/writer_task.rs index 75b09c827..14d292bab 100644 --- a/core/message_bus/src/writer_task.rs +++ b/core/message_bus/src/writer_task.rs @@ -43,8 +43,8 @@ use tracing::{debug, error, trace}; /// Larger batches reduce syscalls per N messages at the cost of memory /// per batch and worst-case latency for the head-of-batch message. /// -/// TCP entry point. Wraps the owned write half in a -/// [`TcpTransportWriter`] and delegates the drain loop to +/// TCP entry point. Wraps the owned write half in the crate-internal +/// `TcpTransportWriter` and delegates the drain loop to /// [`run_transport`]; every syscall still flows through /// [`TransportWriter::send_batch`] so future transports drop in behind /// the same drain logic. diff --git a/core/message_bus/tests/backpressure.rs b/core/message_bus/tests/backpressure.rs index 7e849a981..a92e94090 100644 --- a/core/message_bus/tests/backpressure.rs +++ b/core/message_bus/tests/backpressure.rs @@ -52,6 +52,7 @@ async fn try_send_returns_backpressure_when_queue_full() { }); let (l1, addr1) = bind(loopback()).await.unwrap(); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let accept_1 = install_replicas_locally(bus1.clone(), on_message1); let l1_handle = compio::runtime::spawn(async move { run( @@ -63,6 +64,7 @@ async fn try_send_returns_backpressure_when_queue_full() { accept_1, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/message_bus/tests/connection_lost_notify.rs b/core/message_bus/tests/connection_lost_notify.rs index 00a05b5f7..fcb2cce65 100644 --- a/core/message_bus/tests/connection_lost_notify.rs +++ b/core/message_bus/tests/connection_lost_notify.rs @@ -54,6 +54,7 @@ async fn connection_lost_fires_exactly_once_per_peer_disconnect() { // bus1 listens; bus0 dials. let (l1, addr1) = bind(loopback()).await.unwrap(); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let accept_1 = install_replicas_locally(bus1.clone(), on_message.clone()); let l1_handle = compio::runtime::spawn(async move { run( @@ -65,6 +66,7 @@ async fn connection_lost_fires_exactly_once_per_peer_disconnect() { accept_1, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/message_bus/tests/directional_connection.rs b/core/message_bus/tests/directional_connection.rs index 6251939ce..9f80f352c 100644 --- a/core/message_bus/tests/directional_connection.rs +++ b/core/message_bus/tests/directional_connection.rs @@ -43,6 +43,7 @@ async fn lower_id_dials_higher_id_accepts() { let (l1, addr1) = bind(loopback()).await.unwrap(); let token_for_l0 = bus0.token(); + let nonces_for_l0 = bus0.replica_nonces(); let accept_0 = install_replicas_locally(bus0.clone(), on_message.clone()); let l0_handle = compio::runtime::spawn(async move { run( @@ -54,12 +55,14 @@ async fn lower_id_dials_higher_id_accepts() { accept_0, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l0, ) .await; }); bus0.track_background(l0_handle); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let accept_1 = install_replicas_locally(bus1.clone(), on_message.clone()); let l1_handle = compio::runtime::spawn(async move { run( @@ -71,6 +74,7 @@ async fn lower_id_dials_higher_id_accepts() { accept_1, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/message_bus/tests/handshake_auth.rs b/core/message_bus/tests/handshake_auth.rs index b833354e7..6ed94257b 100644 --- a/core/message_bus/tests/handshake_auth.rs +++ b/core/message_bus/tests/handshake_auth.rs @@ -47,6 +47,7 @@ async fn handshake_with_matching_secret_registers_peer() { let token_for_listener = bus_listener.token(); let accept = install_replicas_locally(bus_listener.clone(), on_msg.clone()); let secret_for_listener = Rc::clone(&secret); + let nonces_for_listener = bus_listener.replica_nonces(); let listener_handle = compio::runtime::spawn(async move { run( listener, @@ -57,6 +58,7 @@ async fn handshake_with_matching_secret_registers_peer() { accept, message_bus::framing::MAX_MESSAGE_SIZE, secret_for_listener, + nonces_for_listener, ) .await; }); @@ -102,6 +104,7 @@ async fn handshake_with_mismatched_secret_rejects_peer() { let (listener, addr) = bind(loopback()).await.unwrap(); let token_for_listener = bus_listener.token(); let accept = install_replicas_locally(bus_listener.clone(), on_msg.clone()); + let nonces_for_listener = bus_listener.replica_nonces(); let listener_handle = compio::runtime::spawn(async move { run( listener, @@ -112,6 +115,7 @@ async fn handshake_with_mismatched_secret_rejects_peer() { accept, message_bus::framing::MAX_MESSAGE_SIZE, listener_secret, + nonces_for_listener, ) .await; }); diff --git a/core/message_bus/tests/head_of_line.rs b/core/message_bus/tests/head_of_line.rs index b756897ce..d046715a3 100644 --- a/core/message_bus/tests/head_of_line.rs +++ b/core/message_bus/tests/head_of_line.rs @@ -48,6 +48,7 @@ async fn slow_peer_does_not_block_other_peers() { }); let (la, addr_a) = bind(loopback()).await.unwrap(); let token_a = bus_a.token(); + let nonces_a = bus_a.replica_nonces(); let accept_a = install_replicas_locally(bus_a.clone(), on_a.clone()); let la_handle = compio::runtime::spawn(async move { run( @@ -59,6 +60,7 @@ async fn slow_peer_does_not_block_other_peers() { accept_a, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_a, ) .await; }); diff --git a/core/message_bus/tests/reconnect.rs b/core/message_bus/tests/reconnect.rs index 93ac64332..20add2649 100644 --- a/core/message_bus/tests/reconnect.rs +++ b/core/message_bus/tests/reconnect.rs @@ -59,6 +59,7 @@ async fn periodic_retry_picks_up_late_listener() { let bus1 = Rc::new(IggyMessageBus::new(0)); let (l1, _) = bind(addr).await.unwrap(); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let accept_delegate = install_replicas_locally(bus1.clone(), on_message.clone()); let l1_handle = compio::runtime::spawn(async move { run( @@ -70,6 +71,7 @@ async fn periodic_retry_picks_up_late_listener() { accept_delegate, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/message_bus/tests/reconnect_skip_connected.rs b/core/message_bus/tests/reconnect_skip_connected.rs index 31159e1db..635250a28 100644 --- a/core/message_bus/tests/reconnect_skip_connected.rs +++ b/core/message_bus/tests/reconnect_skip_connected.rs @@ -50,6 +50,7 @@ async fn periodic_reconnect_skips_already_connected_peer() { accept_inner(stream, peer_id); }); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let l1_handle = compio::runtime::spawn(async move { run( l1, @@ -60,6 +61,7 @@ async fn periodic_reconnect_skips_already_connected_peer() { accept_delegate, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/message_bus/tests/replica_roundtrip.rs b/core/message_bus/tests/replica_roundtrip.rs index 8d8fde80a..8b609c0a4 100644 --- a/core/message_bus/tests/replica_roundtrip.rs +++ b/core/message_bus/tests/replica_roundtrip.rs @@ -45,6 +45,7 @@ async fn two_replicas_exchange_prepare_and_ack() { let (l1, addr1) = bind(loopback()).await.expect("bind r1"); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let accept_delegate_1 = install_replicas_locally(bus1.clone(), on_message1.clone()); let l1_handle = compio::runtime::spawn(async move { run( @@ -56,6 +57,7 @@ async fn two_replicas_exchange_prepare_and_ack() { accept_delegate_1, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/message_bus/tests/shard_zero_gating.rs b/core/message_bus/tests/shard_zero_gating.rs index 1dc0cc339..1b99d0a5b 100644 --- a/core/message_bus/tests/shard_zero_gating.rs +++ b/core/message_bus/tests/shard_zero_gating.rs @@ -41,6 +41,7 @@ async fn shard_zero_binds_listener_and_starts_connector() { let peer_handler: MessageHandler = Rc::new(|_, _| {}); let (peer_listener, peer_addr) = bind(loopback()).await.expect("bind peer listener"); let peer_token = peer_bus.token(); + let peer_nonces = peer_bus.replica_nonces(); let peer_accept = install_replicas_locally(peer_bus.clone(), peer_handler.clone()); let peer_listen_handle = compio::runtime::spawn(async move { run( @@ -52,6 +53,7 @@ async fn shard_zero_binds_listener_and_starts_connector() { peer_accept, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + peer_nonces, ) .await; }); diff --git a/core/message_bus/tests/vectored_batch.rs b/core/message_bus/tests/vectored_batch.rs index b079725a5..95deac326 100644 --- a/core/message_bus/tests/vectored_batch.rs +++ b/core/message_bus/tests/vectored_batch.rs @@ -47,6 +47,7 @@ async fn writer_batches_pipelined_sends_in_order() { }); let (l1, addr1) = bind(loopback()).await.unwrap(); let token_for_l1 = bus1.token(); + let nonces_for_l1 = bus1.replica_nonces(); let accept_1 = install_replicas_locally(bus1.clone(), on_message.clone()); let l1_handle = compio::runtime::spawn(async move { run( @@ -58,6 +59,7 @@ async fn writer_batches_pipelined_sends_in_order() { accept_1, message_bus::framing::MAX_MESSAGE_SIZE, test_token_source(), + nonces_for_l1, ) .await; }); diff --git a/core/server-ng/src/dedup.rs b/core/server-ng/src/dedup.rs index 0022e9d85..cbc235129 100644 --- a/core/server-ng/src/dedup.rs +++ b/core/server-ng/src/dedup.rs @@ -17,25 +17,47 @@ * under the License. */ -//! At-most-once request dedup window (IGGY-112, P0-T3). +// `Dedup` is consumed only by the `#[cfg(test)]` tests below today; the +// request dispatcher loop in server-ng is not yet wired up. Once the +// dispatcher lands and calls `lookup` / `mark_in_flight` / `complete` / +// `evict_client`, the `dead_code` warnings disappear without needing +// this allow. See plan F6. +#![allow(dead_code)] + +//! At-most-once **per-shard-lifetime** request dedup window (IGGY-112, P0-T3). //! //! Indexed by `(client_id, request)` pairs carried in the existing //! `RequestHeader.client` (u128) and `RequestHeader.request` (u64) fields. //! No wire-format change on the server-ng client plane was needed: the //! P0-T1 design note found the two fields already in place. //! +//! # Lifetime scope (load-bearing) +//! +//! "At-most-once" applies for the lifetime of THIS shard's `Dedup` +//! instance. Process restart drops every entry: a client that retries a +//! request after a server restart re-applies side effects unless the +//! request is idempotent at the handler level. Cross-process persistence +//! would need a checkpoint file backed by the consensus WAL and is +//! deferred. Operators sizing client-side retry policy must treat the +//! dedup window as a *crash-window* mitigation only. +//! +//! # Per-client ring +//! //! Per-client state is a fixed-capacity ring holding the last N -//! `(request, Entry)` pairs. +//! `(request, Entry)` pairs. A ring avoids unbounded growth under a +//! chatty client without requiring TTL-driven purge ticks. Done entries +//! also carry an `inserted_at_ns` timestamp; `lookup` treats entries +//! older than `ttl_ns` as absent so a replay from far in the past sees +//! `Fresh` rather than a stale cache hit. //! -//! A ring avoids unbounded growth under a chatty client without -//! requiring TTL-driven purge ticks. Done entries also carry an -//! `inserted_at_ns` timestamp; `lookup` treats entries older than -//! `ttl_ns` as absent so a replay from far in the past sees `Fresh` -//! rather than a stale cache hit. +//! # Threading //! -//! The store is NOT thread-safe. Each server-ng shard owns a `Dedup` -//! on its single-threaded compio runtime. Cross-shard routing happens -//! above this layer. +//! The store is NOT thread-safe. Each server-ng shard owns a `Dedup` on +//! its single-threaded compio runtime. Cross-shard routing happens above +//! this layer. Caller MUST invoke [`Dedup::evict_client`] on session +//! disconnect / bind eviction so the per-client `AHashMap` entry is +//! released; without that hook the outer map grows unbounded across the +//! shard's session history. use ahash::AHashMap; use bytes::Bytes; @@ -69,6 +91,11 @@ pub enum LookupResult<'a> { /// the cached reply once handler completes), or buffer. InFlight, /// Handler already ran; return the cached reply. + /// + /// The borrow ties to `&Dedup`. Callers MUST `clone()` the `Bytes` + /// (cheap, ref-counted) before invoking any `&mut Dedup` method on + /// the same instance; the borrow checker enforces this at compile + /// time but the contract is worth naming explicitly. Cached(&'a Bytes), } @@ -93,7 +120,10 @@ struct PerClient { impl PerClient { fn with_capacity(capacity: usize) -> Self { - assert!(capacity > 0, "dedup ring capacity must be > 0"); + // Capacity > 0 is validated at the public entry + // (`Dedup::with_config`); every callsite here goes through that + // path, so no second assert is needed. + debug_assert!(capacity > 0, "dedup ring capacity must be > 0"); Self { slots: Vec::with_capacity(capacity), cursor: 0, diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs index 0aa424bba..d3849ab02 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/lib.rs @@ -17,6 +17,10 @@ * under the License. */ -pub mod dedup; +// `dedup` is gated `pub(crate)` until the request dispatcher loop wires +// up the lookup / mark / complete / evict_client API end-to-end. Marking +// it crate-private now blocks downstream crates from coupling to a shape +// the dispatcher PR will likely refine (see plan F6). +pub(crate) mod dedup; pub mod login_register; pub mod session_manager;
