This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2c40d3c2caf3589228f2307d8b3d371e191f0cc5 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Apr 24 11:37:20 2026 +0200 feat(message_bus): authenticate the replica Ping handshake (IGGY-112) Replace the cluster_id-only handshake with a BLAKE3-keyed MAC carried in GenericHeader.reserved_command[0..57]. Encoding is versioned (1 = Blake3V1) with 32 B tag, 8 B sender-ns timestamp, 16 B random nonce, 1 B kind, and 71 B zero-checked reserved padding. The listener verifies the envelope before the directional tiebreak so unauthenticated peers never drive the nonce ring's eviction cursor. Replay protection is layered: a 30 s timestamp window uses IggyTimestamp::now().as_nanos(), and a 256-entry NonceRing catches in-window replays. Credential failures collapse to IggyError::Unauthenticated (reusing the existing variant rather than adding a new one and dragging another From<_, IggyError> candidate into every downstream crate's orphan-rule set). auth.rs exposes the TokenSource trait, a StaticSharedSecret impl for all-replicas-share-one-secret deployments (and the integration-test harness via StaticSharedSecret::zero()), and pure encode/decode/verify helpers so transport-specific listeners can wire the envelope into other planes (client-plane Phase 2+) without re-implementing the framing. Compio is single-threaded, so TokenSource carries no Send+Sync bounds and the bus holds it as Rc<dyn TokenSource>. GenericHeader layout is unchanged: the envelope occupies previously zero-filled bytes in reserved_command, so invariant I3 (256 B wire frame, size offset == 48) is preserved. Rebuilds on top of #3134's async fire-and-forget transport without altering writer_task batching (I4), zero-copy Frozen ownership (I8), or the replica-plane FIFO byte-stream model (I1). BREAKING: a replica upgraded to this commit rejects any peer running the pre-auth handshake: the zero-filled reserved_command fails BLAKE3 verification. All replicas in a cluster must be upgraded together and configured with matching cluster secrets; operators without a secret channel can bootstrap with StaticSharedSecret::zero() to preserve the old trust-the-network posture, acknowledging that it is no better than the previous state. Authentication policy (per-client secrets, SSO callouts, revocation) layers on top via additional TokenSource impls; this commit only lands the replica-plane static-secret baseline. Refs: Documents/silverhand/iggy/message_bus/transport-plan/ (P0-T2, designs/message-bus-auth.md). --- Cargo.lock | 1 + core/message_bus/Cargo.toml | 1 + core/message_bus/src/auth.rs | 450 +++++++++++++++++++++ core/message_bus/src/connector.rs | 53 ++- core/message_bus/src/lib.rs | 1 + core/message_bus/src/replica_io.rs | 7 + core/message_bus/src/replica_listener.rs | 111 +++-- core/message_bus/tests/backpressure.rs | 4 +- core/message_bus/tests/common/mod.rs | 8 + core/message_bus/tests/connection_lost_notify.rs | 4 +- core/message_bus/tests/directional_connection.rs | 16 +- core/message_bus/tests/handshake_auth.rs | 144 +++++++ core/message_bus/tests/head_of_line.rs | 4 +- core/message_bus/tests/reconnect.rs | 14 +- core/message_bus/tests/reconnect_skip_connected.rs | 14 +- core/message_bus/tests/replica_roundtrip.rs | 4 +- core/message_bus/tests/shard_zero_gating.rs | 5 +- core/message_bus/tests/vectored_batch.rs | 4 +- 18 files changed, 794 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe7d8cafc..2c66fb79c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7009,6 +7009,7 @@ name = "message_bus" version = "0.1.0" dependencies = [ "async-channel", + "blake3", "compio", "futures", "iggy_binary_protocol", diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml index 4d3569482..d0487db0f 100644 --- a/core/message_bus/Cargo.toml +++ b/core/message_bus/Cargo.toml @@ -30,6 +30,7 @@ publish = false [dependencies] async-channel = { workspace = true } +blake3 = { workspace = true } compio = { workspace = true } futures = { workspace = true } iggy_binary_protocol = { workspace = true } diff --git a/core/message_bus/src/auth.rs b/core/message_bus/src/auth.rs new file mode 100644 index 000000000..6c2353a6b --- /dev/null +++ b/core/message_bus/src/auth.rs @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bus-layer authenticated handshake (IGGY-112). +//! +//! The replica-plane handshake (`replica_listener::handshake`) and the +//! forthcoming SDK-client plane (Phase 2+) share one envelope, carried in +//! `GenericHeader.reserved_command[0..57]`: +//! +//! ```text +//! [ 0 .. 32) auth_tag BLAKE3 keyed_hash(secret, challenge) full output +//! [ 32 .. 40) auth_timestamp sender ns since UNIX_EPOCH, little-endian u64 +//! [ 40 .. 56) auth_nonce random u128, little-endian +//! [ 56 .. 57) auth_kind envelope version tag; 1 = Blake3V1 +//! [ 57 ..128) reserved zero on the wire; zero-checked on receive +//! ``` +//! +//! Kept out of `GenericHeader` itself so the 256 B wire layout is +//! preserved (invariant **I3**). Encode / decode operate on a +//! `&mut [u8; 128]` / `&[u8; 128]` view of `GenericHeader.reserved_command`. + +use iggy_common::IggyError; +use thiserror::Error; + +/// BLAKE3 `keyed_hash` key size, in bytes. +pub const SECRET_SIZE: usize = 32; + +/// Byte range of the envelope within `GenericHeader.reserved_command`. +pub const ENVELOPE_LEN: usize = 57; + +/// Acceptance window around `auth_timestamp`, in nanoseconds. 30 s. +pub const TIMESTAMP_WINDOW_NS: u128 = 30_000_000_000; + +/// Default nonce-dedup LRU capacity on the acceptor. +pub const NONCE_LRU_CAPACITY: usize = 256; + +const LABEL: &[u8; 16] = b"iggy-bus-auth-v1"; + +const KIND_OFFSET: usize = 56; +const TAG_RANGE: std::ops::Range<usize> = 0..32; +const TIMESTAMP_RANGE: std::ops::Range<usize> = 32..40; +const NONCE_RANGE: std::ops::Range<usize> = 40..56; +const RESERVED_RANGE: std::ops::Range<usize> = ENVELOPE_LEN..128; + +/// Envelope version tag. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum AuthKind { + Blake3V1 = 1, +} + +impl AuthKind { + const fn from_byte(b: u8) -> Result<Self, AuthError> { + match b { + 1 => Ok(Self::Blake3V1), + _ => Err(AuthError::UnsupportedKind), + } + } +} + +/// Fully reconstructed challenge input. All fields come from the header +/// (`GenericHeader.cluster`, `GenericHeader.replica`, `GenericHeader.release`) +/// plus the envelope timestamp and nonce. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AuthChallenge { + pub cluster: u128, + pub peer_id: u128, + pub timestamp_ns: u64, + pub release: u32, + pub nonce: u128, +} + +impl AuthChallenge { + /// Serialize the challenge into a fixed-size buffer. Domain-separated by + /// a constant label so the same secret cannot be misused to forge a + /// different protocol's MAC. + fn encode(&self) -> [u8; 16 + 16 + 16 + 8 + 4 + 16] { + let mut out = [0u8; 76]; + out[0..16].copy_from_slice(LABEL); + out[16..32].copy_from_slice(&self.cluster.to_le_bytes()); + out[32..48].copy_from_slice(&self.peer_id.to_le_bytes()); + out[48..56].copy_from_slice(&self.timestamp_ns.to_le_bytes()); + out[56..60].copy_from_slice(&self.release.to_le_bytes()); + out[60..76].copy_from_slice(&self.nonce.to_le_bytes()); + out + } +} + +/// MAC over an `AuthChallenge`. Wraps `blake3::Hash` for its +/// constant-time `PartialEq` implementation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AuthTag(pub blake3::Hash); + +impl AuthTag { + const fn as_bytes(&self) -> &[u8; 32] { + self.0.as_bytes() + } +} + +/// Credential-layer errors surfaced by the verify path. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)] +pub enum AuthError { + #[error("timestamp outside acceptance window")] + TimestampOutOfWindow, + #[error("nonce replay")] + NonceReplay, + #[error("tag mismatch")] + TagMismatch, + #[error("unknown peer")] + UnknownPeer, + #[error("unsupported envelope version")] + UnsupportedKind, + #[error("reserved bytes nonzero")] + ReservedNonzero, +} + +/// Map a credential-layer `AuthError` to the public `IggyError` surface. +/// +/// Not implemented as `From<AuthError> for IggyError` because the impl +/// would leak into every crate transitively linking `message_bus`, adding +/// an extra `impl From<_> for IggyError` candidate to the orphan-rule set +/// and tripping type inference (`?` ambiguity) in downstream crates. +#[must_use] +pub const fn to_iggy_error(e: AuthError) -> IggyError { + match e { + AuthError::UnsupportedKind | AuthError::ReservedNonzero => IggyError::InvalidCommand, + AuthError::TagMismatch + | AuthError::UnknownPeer + | AuthError::TimestampOutOfWindow + | AuthError::NonceReplay => IggyError::Unauthenticated, + } +} + +/// Signer + verifier of auth envelopes. The bus runs on a single-threaded +/// compio runtime, so no `Send + Sync` bounds. +pub trait TokenSource { + fn sign(&self, challenge: &AuthChallenge) -> AuthTag; + /// # Errors + /// Returns `AuthError::TagMismatch` when `tag` does not match the + /// MAC produced for `challenge`; may return other variants in + /// token-source impls that perform additional checks. + fn verify(&self, challenge: &AuthChallenge, tag: &AuthTag) -> Result<(), AuthError>; +} + +/// `TokenSource` that holds a single 32-byte cluster-wide secret. All +/// replicas share one secret on the replica plane; the integration-test +/// harness instantiates this with a fixed zero-byte secret. +pub struct StaticSharedSecret { + secret: [u8; SECRET_SIZE], +} + +impl StaticSharedSecret { + #[must_use] + pub const fn new(secret: [u8; SECRET_SIZE]) -> Self { + Self { secret } + } + + #[must_use] + pub const fn zero() -> Self { + Self::new([0u8; SECRET_SIZE]) + } +} + +impl TokenSource for StaticSharedSecret { + fn sign(&self, challenge: &AuthChallenge) -> AuthTag { + let input = challenge.encode(); + AuthTag(blake3::keyed_hash(&self.secret, &input)) + } + + fn verify(&self, challenge: &AuthChallenge, tag: &AuthTag) -> Result<(), AuthError> { + let expected = self.sign(challenge); + if expected.0 == tag.0 { + Ok(()) + } else { + Err(AuthError::TagMismatch) + } + } +} + +/// Fixed-capacity ring-buffer nonce dedup. +/// +/// Eviction is O(1) via a round-robin cursor; lookup is O(N) over +/// `NONCE_LRU_CAPACITY` entries, which is a 256-wide linear scan on the +/// hot path. Memory footprint is deterministic and bounded regardless of +/// peer behaviour. +#[derive(Debug)] +pub struct NonceRing { + slots: Vec<u128>, + cursor: usize, + capacity: usize, +} + +impl NonceRing { + #[must_use] + pub const fn new(capacity: usize) -> Self { + Self { + slots: Vec::new(), + cursor: 0, + capacity, + } + } + + /// Returns `true` if `nonce` was not present (and has now been + /// recorded); `false` if it was already seen. + pub fn insert(&mut self, nonce: u128) -> bool { + if self.slots.contains(&nonce) { + return false; + } + if self.slots.len() < self.capacity { + self.slots.push(nonce); + } else { + self.slots[self.cursor] = nonce; + self.cursor = (self.cursor + 1) % self.capacity; + } + true + } +} + +impl Default for NonceRing { + fn default() -> Self { + Self::new(NONCE_LRU_CAPACITY) + } +} + +/// Encode an auth envelope into `reserved_command`. +/// +/// Overwrites the first `ENVELOPE_LEN` bytes and zero-fills the trailing +/// reserved range. Source of the tag is `token_source`; `timestamp_ns` +/// and `nonce` are caller-controlled so the same routine can be used for +/// deterministic tests (fake clock, fixed nonce) and production (system +/// clock, CSPRNG). +pub fn encode_envelope( + reserved: &mut [u8; 128], + token_source: &dyn TokenSource, + challenge: &AuthChallenge, +) { + let tag = token_source.sign(challenge); + reserved[TAG_RANGE].copy_from_slice(tag.as_bytes()); + reserved[TIMESTAMP_RANGE].copy_from_slice(&challenge.timestamp_ns.to_le_bytes()); + reserved[NONCE_RANGE].copy_from_slice(&challenge.nonce.to_le_bytes()); + reserved[KIND_OFFSET] = AuthKind::Blake3V1 as u8; + for b in &mut reserved[RESERVED_RANGE] { + *b = 0; + } +} + +/// Parsed view of an envelope's raw bytes. No cryptographic checks. +#[derive(Debug, Clone, Copy)] +pub struct DecodedEnvelope { + pub tag: AuthTag, + pub timestamp_ns: u64, + pub nonce: u128, +} + +/// Decode an envelope and reject malformed layout. +/// +/// Rejects unknown envelope kinds and nonzero reserved bytes. Does not +/// check the tag; caller passes the returned `DecodedEnvelope` to +/// [`verify_envelope`] with the full challenge context. +/// +/// # Errors +/// +/// Returns `AuthError::UnsupportedKind` for unknown envelope versions or +/// `AuthError::ReservedNonzero` for non-zero reserved padding. +pub fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, AuthError> { + AuthKind::from_byte(reserved[KIND_OFFSET])?; + if reserved[RESERVED_RANGE].iter().any(|&b| b != 0) { + return Err(AuthError::ReservedNonzero); + } + let mut tag_bytes = [0u8; 32]; + tag_bytes.copy_from_slice(&reserved[TAG_RANGE]); + let mut ts_bytes = [0u8; 8]; + ts_bytes.copy_from_slice(&reserved[TIMESTAMP_RANGE]); + let mut nonce_bytes = [0u8; 16]; + nonce_bytes.copy_from_slice(&reserved[NONCE_RANGE]); + Ok(DecodedEnvelope { + tag: AuthTag(blake3::Hash::from_bytes(tag_bytes)), + timestamp_ns: u64::from_le_bytes(ts_bytes), + nonce: u128::from_le_bytes(nonce_bytes), + }) +} + +/// Full credential check: timestamp window -> nonce dedup -> tag. +/// +/// # Errors +/// +/// Returns the first `AuthError` triggered by the check sequence. +pub fn verify_envelope( + token_source: &dyn TokenSource, + challenge: &AuthChallenge, + decoded: &DecodedEnvelope, + now_ns: u128, + nonces: &mut NonceRing, +) -> Result<(), AuthError> { + let ts_u128 = u128::from(decoded.timestamp_ns); + let delta = now_ns.abs_diff(ts_u128); + if delta > TIMESTAMP_WINDOW_NS { + return Err(AuthError::TimestampOutOfWindow); + } + if !nonces.insert(decoded.nonce) { + return Err(AuthError::NonceReplay); + } + token_source.verify(challenge, &decoded.tag) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_challenge() -> AuthChallenge { + AuthChallenge { + cluster: 0xdead_beef_1234_5678_9abc_def0_1122_3344, + peer_id: 7, + timestamp_ns: 1_700_000_000_000_000_000, + release: 1, + nonce: 0x11_2233_4455_6677_8899_aabb_ccdd_eeff, + } + } + + #[test] + fn roundtrip_ok() { + let source = StaticSharedSecret::new([1u8; SECRET_SIZE]); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &source, &challenge); + + let decoded = decode_envelope(&reserved).expect("decode"); + let mut ring = NonceRing::new(32); + verify_envelope( + &source, + &challenge, + &decoded, + u128::from(challenge.timestamp_ns), + &mut ring, + ) + .expect("verify"); + } + + #[test] + fn wrong_secret_rejected() { + let signer = StaticSharedSecret::new([1u8; SECRET_SIZE]); + let verifier = StaticSharedSecret::new([2u8; SECRET_SIZE]); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &signer, &challenge); + + let decoded = decode_envelope(&reserved).unwrap(); + let mut ring = NonceRing::new(32); + let err = verify_envelope( + &verifier, + &challenge, + &decoded, + u128::from(challenge.timestamp_ns), + &mut ring, + ) + .unwrap_err(); + assert_eq!(err, AuthError::TagMismatch); + } + + #[test] + fn replay_rejected() { + let source = StaticSharedSecret::zero(); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &source, &challenge); + + let decoded = decode_envelope(&reserved).unwrap(); + let mut ring = NonceRing::new(32); + let now = u128::from(challenge.timestamp_ns); + verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap(); + let err = verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap_err(); + assert_eq!(err, AuthError::NonceReplay); + } + + #[test] + fn stale_timestamp_rejected() { + let source = StaticSharedSecret::zero(); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &source, &challenge); + + let decoded = decode_envelope(&reserved).unwrap(); + let mut ring = NonceRing::new(32); + let now = u128::from(challenge.timestamp_ns) + TIMESTAMP_WINDOW_NS + 1; + let err = verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap_err(); + assert_eq!(err, AuthError::TimestampOutOfWindow); + } + + #[test] + fn future_timestamp_rejected() { + let source = StaticSharedSecret::zero(); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &source, &challenge); + + let decoded = decode_envelope(&reserved).unwrap(); + let mut ring = NonceRing::new(32); + let now = u128::from(challenge.timestamp_ns).saturating_sub(TIMESTAMP_WINDOW_NS + 1); + let err = verify_envelope(&source, &challenge, &decoded, now, &mut ring).unwrap_err(); + assert_eq!(err, AuthError::TimestampOutOfWindow); + } + + #[test] + fn unsupported_kind_rejected() { + let source = StaticSharedSecret::zero(); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &source, &challenge); + reserved[KIND_OFFSET] = 99; + let err = decode_envelope(&reserved).unwrap_err(); + assert_eq!(err, AuthError::UnsupportedKind); + } + + #[test] + fn reserved_nonzero_rejected() { + let source = StaticSharedSecret::zero(); + let challenge = sample_challenge(); + let mut reserved = [0u8; 128]; + encode_envelope(&mut reserved, &source, &challenge); + reserved[100] = 0xff; + let err = decode_envelope(&reserved).unwrap_err(); + assert_eq!(err, AuthError::ReservedNonzero); + } + + #[test] + fn nonce_ring_fills_and_evicts_fifo() { + let mut ring = NonceRing::new(3); + assert!(ring.insert(1)); + assert!(ring.insert(2)); + assert!(ring.insert(3)); + assert!(!ring.insert(1)); + assert!(ring.insert(4)); + // Nonce 1 evicted; reinserting it should now succeed again. + assert!(ring.insert(1)); + } +} diff --git a/core/message_bus/src/connector.rs b/core/message_bus/src/connector.rs index 641d50683..01d18a255 100644 --- a/core/message_bus/src/connector.rs +++ b/core/message_bus/src/connector.rs @@ -25,12 +25,15 @@ //! (see `shard::coordinator::ShardZeroCoordinator`). use crate::IggyMessageBus; +use crate::auth::{self, TokenSource}; use crate::framing; use crate::lifecycle::ShutdownToken; use crate::socket_opts::apply_keepalive_for_connection; use crate::{AcceptedReplicaFn, GenericHeader, Message}; use compio::net::TcpStream; use iggy_binary_protocol::{Command2, HEADER_SIZE}; +use iggy_common::IggyTimestamp; +use rand::Rng; use std::mem::size_of; use std::net::SocketAddr; use std::rc::Rc; @@ -57,12 +60,14 @@ pub async fn start( peers: Vec<(u8, SocketAddr)>, on_dialed: AcceptedReplicaFn, reconnect_period: Duration, + token_source: Rc<dyn TokenSource>, ) { - connect_all(bus, cluster_id, self_id, &peers, &on_dialed).await; + connect_all(bus, cluster_id, self_id, &peers, &on_dialed, &token_source).await; let handler = on_dialed.clone(); let token = bus.token(); let bus_for_task = Rc::clone(bus); + let token_source_for_task = Rc::clone(&token_source); let handle = compio::runtime::spawn(async move { periodic_reconnect( &bus_for_task, @@ -72,6 +77,7 @@ pub async fn start( handler, reconnect_period, token, + token_source_for_task, ) .await; }); @@ -85,6 +91,7 @@ async fn connect_all( self_id: u8, peers: &[(u8, SocketAddr)], on_dialed: &AcceptedReplicaFn, + token_source: &Rc<dyn TokenSource>, ) { for &(peer_id, addr) in peers { if peer_id <= self_id { @@ -105,11 +112,21 @@ async fn connect_all( ); continue; } - connect_one(bus, cluster_id, self_id, peer_id, addr, on_dialed).await; + connect_one( + bus, + cluster_id, + self_id, + peer_id, + addr, + on_dialed, + token_source, + ) + .await; } } #[allow(clippy::future_not_send)] +#[allow(clippy::too_many_arguments)] async fn periodic_reconnect( bus: &Rc<IggyMessageBus>, cluster_id: u128, @@ -118,9 +135,10 @@ async fn periodic_reconnect( on_dialed: AcceptedReplicaFn, period: Duration, token: ShutdownToken, + token_source: Rc<dyn TokenSource>, ) { while token.sleep_or_shutdown(period).await { - connect_all(bus, cluster_id, self_id, &peers, &on_dialed).await; + connect_all(bus, cluster_id, self_id, &peers, &on_dialed, &token_source).await; } debug!("replica reconnect periodic task exiting"); } @@ -129,6 +147,7 @@ async fn periodic_reconnect( /// `on_dialed` on success. Dial / handshake failures are logged and /// swallowed; VSR tolerates missing peers and the periodic sweep retries. #[allow(clippy::future_not_send)] +#[allow(clippy::too_many_arguments)] async fn connect_one( bus: &Rc<IggyMessageBus>, cluster_id: u128, @@ -136,6 +155,7 @@ async fn connect_one( peer_id: u8, addr: SocketAddr, on_dialed: &AcceptedReplicaFn, + token_source: &Rc<dyn TokenSource>, ) { let mut stream = match TcpStream::connect(addr).await { Ok(s) => s, @@ -160,7 +180,7 @@ async fn connect_one( ); } - let ping = build_ping_message(cluster_id, self_id); + let ping = build_ping_message(cluster_id, self_id, token_source.as_ref()); if let Err(e) = framing::write_message(&mut stream, ping).await { warn!(replica = peer_id, %addr, "handshake write failed: {e}"); return; @@ -170,8 +190,21 @@ async fn connect_one( on_dialed(stream, peer_id); } -/// Build a Ping handshake message identifying our replica id. -fn build_ping_message(cluster_id: u128, replica_id: u8) -> Message<GenericHeader> { +/// Build a Ping handshake message identifying our replica id and carrying +/// a fresh auth envelope stamped by `token_source`. +fn build_ping_message( + cluster_id: u128, + replica_id: u8, + token_source: &dyn TokenSource, +) -> Message<GenericHeader> { + #[allow(clippy::cast_possible_truncation)] + let timestamp_ns = IggyTimestamp::now().as_nanos() as u64; + let nonce = { + let mut buf = [0u8; 16]; + rand::rng().fill_bytes(&mut buf); + u128::from_le_bytes(buf) + }; + #[allow(clippy::cast_possible_truncation)] Message::<GenericHeader>::new(size_of::<GenericHeader>()).transmute_header( |_, h: &mut GenericHeader| { @@ -179,6 +212,14 @@ fn build_ping_message(cluster_id: u128, replica_id: u8) -> Message<GenericHeader h.cluster = cluster_id; h.replica = replica_id; h.size = HEADER_SIZE as u32; + let challenge = auth::AuthChallenge { + cluster: cluster_id, + peer_id: u128::from(replica_id), + timestamp_ns, + release: h.release, + nonce, + }; + auth::encode_envelope(&mut h.reserved_command, token_source, &challenge); }, ) } diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index bb993fc8c..99c76633a 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod auth; pub mod cache; pub mod client_listener; pub mod config; diff --git a/core/message_bus/src/replica_io.rs b/core/message_bus/src/replica_io.rs index 5ef866ff3..3731cc6d8 100644 --- a/core/message_bus/src/replica_io.rs +++ b/core/message_bus/src/replica_io.rs @@ -36,6 +36,7 @@ use std::time::Duration; use iggy_common::IggyError; +use crate::auth::TokenSource; use crate::connector::start as start_connector; use crate::replica_listener::{bind as bind_replica_listener, run as run_replica_listener}; use crate::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, client_listener}; @@ -71,6 +72,7 @@ pub async fn start_on_shard_zero( on_accepted_replica: AcceptedReplicaFn, on_accepted_client: AcceptedClientFn, reconnect_period: Duration, + token_source: Rc<dyn TokenSource>, ) -> Result<Option<BoundPlanes>, IggyError> { if bus.shard_id() != 0 { return Ok(None); @@ -82,6 +84,7 @@ pub async fn start_on_shard_zero( let token_for_replica = bus.token(); let on_accepted_replica_for_listener = on_accepted_replica.clone(); let listener_max_message_size = bus.config().max_message_size; + let token_source_for_listener = Rc::clone(&token_source); let replica_handle = compio::runtime::spawn(async move { run_replica_listener( replica_listener, @@ -91,6 +94,7 @@ pub async fn start_on_shard_zero( replica_count, on_accepted_replica_for_listener, listener_max_message_size, + token_source_for_listener, ) .await; }); @@ -109,6 +113,7 @@ pub async fn start_on_shard_zero( peers, on_accepted_replica, reconnect_period, + token_source, ) .await; @@ -136,6 +141,7 @@ pub async fn start_on_shard_zero_default( peers: Vec<(u8, SocketAddr)>, on_accepted_replica: AcceptedReplicaFn, on_accepted_client: AcceptedClientFn, + token_source: Rc<dyn TokenSource>, ) -> Result<Option<BoundPlanes>, IggyError> { let reconnect_period = bus.config().reconnect_period; start_on_shard_zero( @@ -149,6 +155,7 @@ pub async fn start_on_shard_zero_default( on_accepted_replica, on_accepted_client, reconnect_period, + token_source, ) .await } diff --git a/core/message_bus/src/replica_listener.rs b/core/message_bus/src/replica_listener.rs index 9d9db9192..d2d82d586 100644 --- a/core/message_bus/src/replica_listener.rs +++ b/core/message_bus/src/replica_listener.rs @@ -29,20 +29,23 @@ //! //! # Security //! -//! The `Ping` handshake validates `cluster_id`, the directional bound, and -//! `replica_count`. There is NO shared secret, no mTLS, no version -//! negotiation. Deploy the replica listener on a trusted network boundary -//! (cluster-local VPC, private subnet, overlay, or equivalent) - NEVER -//! expose the replica port directly to the public internet. Follow-up work -//! on an authenticated handshake is tracked under IGGY-112. +//! The `Ping` handshake validates `cluster_id`, the directional bound, +//! `replica_count`, and a BLAKE3-keyed MAC carried in +//! `GenericHeader.reserved_command[0..57]` (see [`crate::auth`]). Peers +//! that cannot produce a valid tag are rejected. Even so, TLS/encryption is +//! NOT provided here: operators still need to deploy the replica port on a +//! trusted network boundary (cluster-local VPC, private subnet, overlay) +//! if wire confidentiality is required. +use crate::auth::{self, NonceRing, TokenSource}; use crate::framing; use crate::lifecycle::ShutdownToken; use crate::{AcceptedReplicaFn, GenericHeader, Message}; use compio::net::{SocketOpts, TcpListener, TcpStream}; use futures::FutureExt; use iggy_binary_protocol::Command2; -use iggy_common::IggyError; +use iggy_common::{IggyError, IggyTimestamp}; +use std::cell::RefCell; use std::net::SocketAddr; use std::rc::Rc; use tracing::{debug, error, info, warn}; @@ -87,11 +90,13 @@ pub async fn run( replica_count: u8, on_accepted: AcceptedReplicaFn, max_message_size: usize, + token_source: Rc<dyn TokenSource>, ) { info!( "Replica listener accepting on {:?}", listener.local_addr().ok() ); + let nonces = RefCell::new(NonceRing::default()); loop { futures::select! { () = token.wait().fuse() => { @@ -101,14 +106,23 @@ pub async fn run( result = listener.accept().fuse() => { match result { Ok((mut stream, peer_addr)) => { - match handshake( - &mut stream, - cluster_id, - self_id, - replica_count, - max_message_size, - ) - .await { + let read = handshake_read(&mut stream, cluster_id, max_message_size).await; + let outcome = match read { + Ok(parsed) => { + let now_ns = IggyTimestamp::now().as_nanos(); + let mut ring = nonces.borrow_mut(); + handshake_verify( + &parsed, + token_source.as_ref(), + &mut ring, + now_ns, + self_id, + replica_count, + ) + } + Err(e) => Err(e), + }; + match outcome { Ok(peer_id) => { on_accepted(stream, peer_id); } @@ -126,26 +140,23 @@ pub async fn run( } } -/// Read and validate the `Ping` handshake from an inbound peer connection. -/// -/// Returns the peer replica id on success. -// -// TODO(IGGY-112): authenticate the replica handshake. -// -// Current validation covers `cluster_id` and the directional id bound -// only: no shared secret, no mTLS, no protocol version negotiation. -// Any peer that reaches the replica port and knows the `cluster_id` -// can pose as a replica. Acceptable on the assumed trusted network -// boundary; hardening is tracked under IGGY-112 (authenticated -// handshake + version negotiation). +/// Parsed-but-unverified handshake captured from the wire. Held across +/// the await boundary between I/O and the sync verify step. +struct ParsedHandshake { + replica: u8, + decoded: auth::DecodedEnvelope, + challenge: auth::AuthChallenge, +} + +/// I/O-only portion of the handshake: read the 256 B `Ping` frame, enforce +/// command + cluster match, and parse the auth envelope's bytes. Does NOT +/// touch the nonce ring or verify the tag. #[allow(clippy::future_not_send)] -async fn handshake( +async fn handshake_read( stream: &mut TcpStream, our_cluster: u128, - self_id: u8, - replica_count: u8, max_message_size: usize, -) -> Result<u8, IggyError> { +) -> Result<ParsedHandshake, IggyError> { let msg = framing::read_message(stream, max_message_size).await?; let header = msg.header(); if header.command != Command2::Ping { @@ -154,11 +165,45 @@ async fn handshake( if header.cluster != our_cluster { return Err(IggyError::InvalidCommand); } + let decoded = auth::decode_envelope(&header.reserved_command).map_err(auth::to_iggy_error)?; + let challenge = auth::AuthChallenge { + cluster: header.cluster, + peer_id: u128::from(header.replica), + timestamp_ns: decoded.timestamp_ns, + release: header.release, + nonce: decoded.nonce, + }; + Ok(ParsedHandshake { + replica: header.replica, + decoded, + challenge, + }) +} + +/// Synchronous verifier: tag + replay, then the directional tiebreak. +/// Runs outside any await so the `&mut NonceRing` borrow cannot span I/O. +fn handshake_verify( + parsed: &ParsedHandshake, + token_source: &dyn TokenSource, + nonces: &mut NonceRing, + now_ns: u128, + self_id: u8, + replica_count: u8, +) -> Result<u8, IggyError> { + auth::verify_envelope( + token_source, + &parsed.challenge, + &parsed.decoded, + now_ns, + nonces, + ) + .map_err(auth::to_iggy_error)?; + // Directional rule: a replica only accepts inbound from peers with // strictly lower ids. The peer is responsible for not dialing us if // it has the higher id; this is just defensive. - if header.replica >= replica_count || header.replica >= self_id { + if parsed.replica >= replica_count || parsed.replica >= self_id { return Err(IggyError::InvalidCommand); } - Ok(header.replica) + Ok(parsed.replica) } diff --git a/core/message_bus/tests/backpressure.rs b/core/message_bus/tests/backpressure.rs index 1a399aa04..7e849a981 100644 --- a/core/message_bus/tests/backpressure.rs +++ b/core/message_bus/tests/backpressure.rs @@ -24,7 +24,7 @@ mod common; -use common::{header_only, install_replicas_locally, loopback}; +use common::{header_only, install_replicas_locally, loopback, test_token_source}; use iggy_binary_protocol::Command2; use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -62,6 +62,7 @@ async fn try_send_returns_backpressure_when_queue_full() { 2, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -77,6 +78,7 @@ async fn try_send_returns_backpressure_when_queue_full() { vec![(1, addr1)], dial_0, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await; diff --git a/core/message_bus/tests/common/mod.rs b/core/message_bus/tests/common/mod.rs index b00fbb310..81cdf1fa5 100644 --- a/core/message_bus/tests/common/mod.rs +++ b/core/message_bus/tests/common/mod.rs @@ -27,6 +27,7 @@ #![allow(dead_code)] // each test binary uses a subset use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE, Message}; +use message_bus::auth::{StaticSharedSecret, TokenSource}; use message_bus::client_listener::RequestHandler; use message_bus::replica_listener::MessageHandler; use message_bus::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, installer}; @@ -34,6 +35,13 @@ use std::cell::Cell; use std::net::SocketAddr; use std::rc::Rc; +/// Default zero-byte shared secret for integration tests. All replicas +/// authenticate against the same dummy source. +#[must_use] +pub fn test_token_source() -> Rc<dyn TokenSource> { + Rc::new(StaticSharedSecret::zero()) +} + /// Loopback address with OS-chosen port. #[must_use] pub fn loopback() -> SocketAddr { diff --git a/core/message_bus/tests/connection_lost_notify.rs b/core/message_bus/tests/connection_lost_notify.rs index 8babea3ad..00a05b5f7 100644 --- a/core/message_bus/tests/connection_lost_notify.rs +++ b/core/message_bus/tests/connection_lost_notify.rs @@ -22,7 +22,7 @@ mod common; -use common::{install_replicas_locally, loopback}; +use common::{install_replicas_locally, loopback, test_token_source}; use message_bus::IggyMessageBus; use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -64,6 +64,7 @@ async fn connection_lost_fires_exactly_once_per_peer_disconnect() { 2, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -77,6 +78,7 @@ async fn connection_lost_fires_exactly_once_per_peer_disconnect() { vec![(1u8, addr1)], dial_0, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await; diff --git a/core/message_bus/tests/directional_connection.rs b/core/message_bus/tests/directional_connection.rs index ac8a3a7d1..6251939ce 100644 --- a/core/message_bus/tests/directional_connection.rs +++ b/core/message_bus/tests/directional_connection.rs @@ -22,7 +22,7 @@ mod common; -use common::{install_replicas_locally, loopback}; +use common::{install_replicas_locally, loopback, test_token_source}; use message_bus::IggyMessageBus; use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -53,6 +53,7 @@ async fn lower_id_dials_higher_id_accepts() { 2, accept_0, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -69,6 +70,7 @@ async fn lower_id_dials_higher_id_accepts() { 2, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -85,10 +87,20 @@ async fn lower_id_dials_higher_id_accepts() { peers.clone(), dial_0, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await; let dial_1 = install_replicas_locally(bus1.clone(), on_message.clone()); - start_connector(&bus1, CLUSTER, 1, peers, dial_1, DEFAULT_RECONNECT_PERIOD).await; + start_connector( + &bus1, + CLUSTER, + 1, + peers, + dial_1, + DEFAULT_RECONNECT_PERIOD, + test_token_source(), + ) + .await; // Wait for the directional connection to settle. let deadline = std::time::Instant::now() + Duration::from_secs(2); diff --git a/core/message_bus/tests/handshake_auth.rs b/core/message_bus/tests/handshake_auth.rs new file mode 100644 index 000000000..b833354e7 --- /dev/null +++ b/core/message_bus/tests/handshake_auth.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration coverage for the authenticated replica handshake. +//! +//! Complements the in-crate unit tests in `core/message_bus/src/auth.rs` +//! by exercising the full end-to-end path (dial -> Ping frame -> listener +//! verify) through `replica_listener::run` + `connector::start`. + +mod common; + +use common::{install_replicas_locally, loopback}; +use message_bus::IggyMessageBus; +use message_bus::auth::{StaticSharedSecret, TokenSource}; +use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; +use message_bus::replica_listener::{MessageHandler, bind, run}; +use std::rc::Rc; +use std::time::Duration; + +const CLUSTER: u128 = 0xCAFE; + +/// Matching secrets on both sides: handshake succeeds, peer registers. +#[compio::test] +async fn handshake_with_matching_secret_registers_peer() { + let bus_listener = Rc::new(IggyMessageBus::new(0)); + let bus_dialer = Rc::new(IggyMessageBus::new(0)); + + let secret: Rc<dyn TokenSource> = Rc::new(StaticSharedSecret::new([7u8; 32])); + + let on_msg: MessageHandler = Rc::new(|_, _| {}); + + let (listener, addr) = bind(loopback()).await.unwrap(); + let token_for_listener = bus_listener.token(); + let accept = install_replicas_locally(bus_listener.clone(), on_msg.clone()); + let secret_for_listener = Rc::clone(&secret); + let listener_handle = compio::runtime::spawn(async move { + run( + listener, + token_for_listener, + CLUSTER, + 1, + 2, + accept, + message_bus::framing::MAX_MESSAGE_SIZE, + secret_for_listener, + ) + .await; + }); + bus_listener.track_background(listener_handle); + + let dial = install_replicas_locally(bus_dialer.clone(), on_msg); + start_connector( + &bus_dialer, + CLUSTER, + 0, + vec![(1u8, addr)], + dial, + DEFAULT_RECONNECT_PERIOD, + Rc::clone(&secret), + ) + .await; + + let deadline = std::time::Instant::now() + Duration::from_secs(2); + while !bus_dialer.replicas().contains(1) { + assert!( + std::time::Instant::now() < deadline, + "matching-secret handshake did not register peer" + ); + compio::time::sleep(Duration::from_millis(5)).await; + } + + bus_dialer.shutdown(Duration::from_secs(2)).await; + bus_listener.shutdown(Duration::from_secs(2)).await; +} + +/// Mismatched secrets: the listener rejects the dialed Ping, so neither +/// side ends up with a registered peer within the test window. +#[compio::test] +async fn handshake_with_mismatched_secret_rejects_peer() { + let bus_listener = Rc::new(IggyMessageBus::new(0)); + let bus_dialer = Rc::new(IggyMessageBus::new(0)); + + let listener_secret: Rc<dyn TokenSource> = Rc::new(StaticSharedSecret::new([1u8; 32])); + let dialer_secret: Rc<dyn TokenSource> = Rc::new(StaticSharedSecret::new([2u8; 32])); + + let on_msg: MessageHandler = Rc::new(|_, _| {}); + + let (listener, addr) = bind(loopback()).await.unwrap(); + let token_for_listener = bus_listener.token(); + let accept = install_replicas_locally(bus_listener.clone(), on_msg.clone()); + let listener_handle = compio::runtime::spawn(async move { + run( + listener, + token_for_listener, + CLUSTER, + 1, + 2, + accept, + message_bus::framing::MAX_MESSAGE_SIZE, + listener_secret, + ) + .await; + }); + bus_listener.track_background(listener_handle); + + let dial = install_replicas_locally(bus_dialer.clone(), on_msg); + start_connector( + &bus_dialer, + CLUSTER, + 0, + vec![(1u8, addr)], + dial, + Duration::from_millis(50), + dialer_secret, + ) + .await; + + compio::time::sleep(Duration::from_millis(400)).await; + assert!( + !bus_dialer.replicas().contains(1), + "mismatched-secret handshake must not register the peer" + ); + assert!( + !bus_listener.replicas().contains(0), + "mismatched-secret handshake must not register the peer on the listener either" + ); + + bus_dialer.shutdown(Duration::from_secs(2)).await; + bus_listener.shutdown(Duration::from_secs(2)).await; +} diff --git a/core/message_bus/tests/head_of_line.rs b/core/message_bus/tests/head_of_line.rs index 57c644971..b756897ce 100644 --- a/core/message_bus/tests/head_of_line.rs +++ b/core/message_bus/tests/head_of_line.rs @@ -25,7 +25,7 @@ mod common; -use common::{header_only, install_replicas_locally, loopback}; +use common::{header_only, install_replicas_locally, loopback, test_token_source}; use compio::net::TcpListener; use iggy_binary_protocol::Command2; use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; @@ -58,6 +58,7 @@ async fn slow_peer_does_not_block_other_peers() { 3, accept_a, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -91,6 +92,7 @@ async fn slow_peer_does_not_block_other_peers() { vec![(1, addr_a), (2, addr_b)], dial_delegate, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await; diff --git a/core/message_bus/tests/reconnect.rs b/core/message_bus/tests/reconnect.rs index 0eef98e94..93ac64332 100644 --- a/core/message_bus/tests/reconnect.rs +++ b/core/message_bus/tests/reconnect.rs @@ -21,7 +21,7 @@ mod common; -use common::{install_replicas_locally, loopback}; +use common::{install_replicas_locally, loopback, test_token_source}; use message_bus::IggyMessageBus; use message_bus::connector::start as start_connector; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -43,7 +43,16 @@ async fn periodic_retry_picks_up_late_listener() { let on_message: MessageHandler = Rc::new(|_, _| {}); let period = Duration::from_millis(100); let dial_delegate = install_replicas_locally(bus0.clone(), on_message.clone()); - start_connector(&bus0, CLUSTER, 0, vec![(1, addr)], dial_delegate, period).await; + start_connector( + &bus0, + CLUSTER, + 0, + vec![(1, addr)], + dial_delegate, + period, + test_token_source(), + ) + .await; assert!(!bus0.replicas().contains(1), "first connect should fail"); // Bring bus 1 online on the same address. @@ -60,6 +69,7 @@ async fn periodic_retry_picks_up_late_listener() { 2, accept_delegate, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); diff --git a/core/message_bus/tests/reconnect_skip_connected.rs b/core/message_bus/tests/reconnect_skip_connected.rs index 5d9a77b3b..31159e1db 100644 --- a/core/message_bus/tests/reconnect_skip_connected.rs +++ b/core/message_bus/tests/reconnect_skip_connected.rs @@ -22,7 +22,7 @@ mod common; -use common::{install_replicas_locally, loopback}; +use common::{install_replicas_locally, loopback, test_token_source}; use message_bus::IggyMessageBus; use message_bus::connector::start as start_connector; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -59,6 +59,7 @@ async fn periodic_reconnect_skips_already_connected_peer() { 2, accept_delegate, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -69,7 +70,16 @@ async fn periodic_reconnect_skips_already_connected_peer() { // missing, bus1's listener will see N extra accepts. let period = Duration::from_millis(50); let dial_delegate = install_replicas_locally(bus0.clone(), on_message.clone()); - start_connector(&bus0, CLUSTER, 0, vec![(1u8, addr1)], dial_delegate, period).await; + start_connector( + &bus0, + CLUSTER, + 0, + vec![(1u8, addr1)], + dial_delegate, + period, + test_token_source(), + ) + .await; // Wait for the initial connection to settle on both sides. let deadline = std::time::Instant::now() + Duration::from_secs(2); diff --git a/core/message_bus/tests/replica_roundtrip.rs b/core/message_bus/tests/replica_roundtrip.rs index e557b501a..8d8fde80a 100644 --- a/core/message_bus/tests/replica_roundtrip.rs +++ b/core/message_bus/tests/replica_roundtrip.rs @@ -23,7 +23,7 @@ mod common; use async_channel::Receiver; -use common::{header_only, install_replicas_locally, loopback}; +use common::{header_only, install_replicas_locally, loopback, test_token_source}; use iggy_binary_protocol::Command2; use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -55,6 +55,7 @@ async fn two_replicas_exchange_prepare_and_ack() { 2, accept_delegate_1, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -76,6 +77,7 @@ async fn two_replicas_exchange_prepare_and_ack() { vec![(1, addr1)], dial_delegate_0, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await; diff --git a/core/message_bus/tests/shard_zero_gating.rs b/core/message_bus/tests/shard_zero_gating.rs index a65fd84d7..1dc0cc339 100644 --- a/core/message_bus/tests/shard_zero_gating.rs +++ b/core/message_bus/tests/shard_zero_gating.rs @@ -21,7 +21,7 @@ mod common; -use common::{install_clients_locally, install_replicas_locally, loopback}; +use common::{install_clients_locally, install_replicas_locally, loopback, test_token_source}; use message_bus::IggyMessageBus; use message_bus::client_listener::RequestHandler; use message_bus::connector::DEFAULT_RECONNECT_PERIOD; @@ -51,6 +51,7 @@ async fn shard_zero_binds_listener_and_starts_connector() { 2, peer_accept, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -76,6 +77,7 @@ async fn shard_zero_binds_listener_and_starts_connector() { accepted_replica, accepted_client, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await .expect("start_on_shard_zero must succeed on shard 0"); @@ -127,6 +129,7 @@ async fn non_zero_shard_skips_io() { accepted_replica, accepted_client, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await .expect("start_on_shard_zero must succeed on non-zero shard (no-op)"); diff --git a/core/message_bus/tests/vectored_batch.rs b/core/message_bus/tests/vectored_batch.rs index c8fc10c6b..b079725a5 100644 --- a/core/message_bus/tests/vectored_batch.rs +++ b/core/message_bus/tests/vectored_batch.rs @@ -20,7 +20,7 @@ mod common; -use common::{header_only, install_replicas_locally, loopback}; +use common::{header_only, install_replicas_locally, loopback, test_token_source}; use iggy_binary_protocol::Command2; use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as start_connector}; use message_bus::replica_listener::{MessageHandler, bind, run}; @@ -57,6 +57,7 @@ async fn writer_batches_pipelined_sends_in_order() { 2, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, + test_token_source(), ) .await; }); @@ -72,6 +73,7 @@ async fn writer_batches_pipelined_sends_in_order() { vec![(1, addr1)], dial_0, DEFAULT_RECONNECT_PERIOD, + test_token_source(), ) .await;
