This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit de42708c96dfaf24de4130b91093a43b68e25ac8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Apr 24 11:37:20 2026 +0200

    feat(message_bus): authenticate the replica Ping handshake (IGGY-112)
    
    Replace the cluster_id-only handshake with a BLAKE3-keyed MAC carried
    in GenericHeader.reserved_command[0..57]. Encoding is versioned (1 =
    Blake3V1) with 32 B tag, 8 B sender-ns timestamp, 16 B random nonce,
    1 B kind, and 71 B zero-checked reserved padding.
    
    The listener verifies the envelope before the directional tiebreak so
    unauthenticated peers never drive the nonce ring's eviction cursor.
    Replay protection is layered: a 30 s timestamp window uses
    IggyTimestamp::now().as_nanos(), and a 256-entry NonceRing catches
    in-window replays. Credential failures collapse to
    IggyError::Unauthenticated (reusing the existing variant rather than
    adding a new one and dragging another From<_, IggyError> candidate
    into every downstream crate's orphan-rule set).
    
    auth.rs exposes the TokenSource trait, a StaticSharedSecret impl for
    all-replicas-share-one-secret deployments (and the integration-test
    harness via StaticSharedSecret::zero()), and pure encode/decode/verify
    helpers so transport-specific listeners can wire the envelope into
    other planes (client-plane Phase 2+) without re-implementing the
    framing. Compio is single-threaded, so TokenSource carries no
    Send+Sync bounds and the bus holds it as Rc<dyn TokenSource>.
    
    GenericHeader layout is unchanged: the envelope occupies previously
    zero-filled bytes in reserved_command, so invariant I3 (256 B wire
    frame, size offset == 48) is preserved.
    
    Rebuilds on top of #3134's async fire-and-forget transport without
    altering writer_task batching (I4), zero-copy Frozen ownership (I8),
    or the replica-plane FIFO byte-stream model (I1).
    
    BREAKING: a replica upgraded to this commit rejects any peer running
    the pre-auth handshake: the zero-filled reserved_command fails BLAKE3
    verification. All replicas in a cluster must be upgraded together and
    configured with matching cluster secrets; operators without a secret
    channel can bootstrap with StaticSharedSecret::zero() to preserve the
    old trust-the-network posture, acknowledging that it is no better than
    the previous state. Authentication policy (per-client secrets, SSO
    callouts, revocation) layers on top via additional TokenSource impls;
    this commit only lands the replica-plane static-secret baseline.
    
    Refs: Documents/silverhand/iggy/message_bus/transport-plan/ (P0-T2,
          designs/message-bus-auth.md).
---
 Cargo.lock                                         |   1 +
 core/message_bus/Cargo.toml                        |   1 +
 core/message_bus/src/auth.rs                       | 450 +++++++++++++++++++++
 core/message_bus/src/connector.rs                  |  53 ++-
 core/message_bus/src/lib.rs                        |   1 +
 core/message_bus/src/replica_io.rs                 |   7 +
 core/message_bus/src/replica_listener.rs           | 111 +++--
 core/message_bus/tests/backpressure.rs             |   4 +-
 core/message_bus/tests/common/mod.rs               |   8 +
 core/message_bus/tests/connection_lost_notify.rs   |   4 +-
 core/message_bus/tests/directional_connection.rs   |  16 +-
 core/message_bus/tests/handshake_auth.rs           | 144 +++++++
 core/message_bus/tests/head_of_line.rs             |   4 +-
 core/message_bus/tests/reconnect.rs                |  14 +-
 core/message_bus/tests/reconnect_skip_connected.rs |  14 +-
 core/message_bus/tests/replica_roundtrip.rs        |   4 +-
 core/message_bus/tests/shard_zero_gating.rs        |   5 +-
 core/message_bus/tests/vectored_batch.rs           |   4 +-
 18 files changed, 794 insertions(+), 51 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index fe7d8cafc..2c66fb79c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7009,6 +7009,7 @@ name = "message_bus"
 version = "0.1.0"
 dependencies = [
  "async-channel",
+ "blake3",
  "compio",
  "futures",
  "iggy_binary_protocol",
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index 4d3569482..d0487db0f 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -30,6 +30,7 @@ publish = false
 
 [dependencies]
 async-channel = { workspace = true }
+blake3 = { workspace = true }
 compio = { workspace = true }
 futures = { workspace = true }
 iggy_binary_protocol = { workspace = true }
diff --git a/core/message_bus/src/auth.rs b/core/message_bus/src/auth.rs
new file mode 100644
index 000000000..6c2353a6b
--- /dev/null
+++ b/core/message_bus/src/auth.rs
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bus-layer authenticated handshake (IGGY-112).
+//!
+//! The replica-plane handshake (`replica_listener::handshake`) and the
+//! forthcoming SDK-client plane (Phase 2+) share one envelope, carried in
+//! `GenericHeader.reserved_command[0..57]`:
+//!
+//! ```text
+//! [  0 .. 32) auth_tag        BLAKE3 keyed_hash(secret, challenge) full 
output
+//! [ 32 .. 40) auth_timestamp  sender ns since UNIX_EPOCH, little-endian u64
+//! [ 40 .. 56) auth_nonce      random u128, little-endian
+//! [ 56 .. 57) auth_kind       envelope version tag; 1 = Blake3V1
+//! [ 57 ..128) reserved        zero on the wire; zero-checked on receive
+//! ```
+//!
+//! Kept out of `GenericHeader` itself so the 256 B wire layout is
+//! preserved (invariant **I3**). Encode / decode operate on a
+//! `&mut [u8; 128]` / `&[u8; 128]` view of `GenericHeader.reserved_command`.
+
+use iggy_common::IggyError;
+use thiserror::Error;
+
+/// BLAKE3 `keyed_hash` key size, in bytes.
+pub const SECRET_SIZE: usize = 32;
+
+/// Byte range of the envelope within `GenericHeader.reserved_command`.
+pub const ENVELOPE_LEN: usize = 57;
+
+/// Acceptance window around `auth_timestamp`, in nanoseconds. 30 s.
+pub const TIMESTAMP_WINDOW_NS: u128 = 30_000_000_000;
+
+/// Default nonce-dedup LRU capacity on the acceptor.
+pub const NONCE_LRU_CAPACITY: usize = 256;
+
+const LABEL: &[u8; 16] = b"iggy-bus-auth-v1";
+
+const KIND_OFFSET: usize = 56;
+const TAG_RANGE: std::ops::Range<usize> = 0..32;
+const TIMESTAMP_RANGE: std::ops::Range<usize> = 32..40;
+const NONCE_RANGE: std::ops::Range<usize> = 40..56;
+const RESERVED_RANGE: std::ops::Range<usize> = ENVELOPE_LEN..128;
+
+/// Envelope version tag.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(u8)]
+pub enum AuthKind {
+    Blake3V1 = 1,
+}
+
+impl AuthKind {
+    const fn from_byte(b: u8) -> Result<Self, AuthError> {
+        match b {
+            1 => Ok(Self::Blake3V1),
+            _ => Err(AuthError::UnsupportedKind),
+        }
+    }
+}
+
+/// Fully reconstructed challenge input. All fields come from the header
+/// (`GenericHeader.cluster`, `GenericHeader.replica`, `GenericHeader.release`)
+/// plus the envelope timestamp and nonce.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct AuthChallenge {
+    pub cluster: u128,
+    pub peer_id: u128,
+    pub timestamp_ns: u64,
+    pub release: u32,
+    pub nonce: u128,
+}
+
+impl AuthChallenge {
+    /// Serialize the challenge into a fixed-size buffer. Domain-separated by
+    /// a constant label so the same secret cannot be misused to forge a
+    /// different protocol's MAC.
+    fn encode(&self) -> [u8; 16 + 16 + 16 + 8 + 4 + 16] {
+        let mut out = [0u8; 76];
+        out[0..16].copy_from_slice(LABEL);
+        out[16..32].copy_from_slice(&self.cluster.to_le_bytes());
+        out[32..48].copy_from_slice(&self.peer_id.to_le_bytes());
+        out[48..56].copy_from_slice(&self.timestamp_ns.to_le_bytes());
+        out[56..60].copy_from_slice(&self.release.to_le_bytes());
+        out[60..76].copy_from_slice(&self.nonce.to_le_bytes());
+        out
+    }
+}
+
+/// MAC over an `AuthChallenge`. Wraps `blake3::Hash` for its
+/// constant-time `PartialEq` implementation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct AuthTag(pub blake3::Hash);
+
+impl AuthTag {
+    const fn as_bytes(&self) -> &[u8; 32] {
+        self.0.as_bytes()
+    }
+}
+
+/// Credential-layer errors surfaced by the verify path.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
+pub enum AuthError {
+    #[error("timestamp outside acceptance window")]
+    TimestampOutOfWindow,
+    #[error("nonce replay")]
+    NonceReplay,
+    #[error("tag mismatch")]
+    TagMismatch,
+    #[error("unknown peer")]
+    UnknownPeer,
+    #[error("unsupported envelope version")]
+    UnsupportedKind,
+    #[error("reserved bytes nonzero")]
+    ReservedNonzero,
+}
+
+/// Map a credential-layer `AuthError` to the public `IggyError` surface.
+///
+/// Not implemented as `From<AuthError> for IggyError` because the impl
+/// would leak into every crate transitively linking `message_bus`, adding
+/// an extra `impl From<_> for IggyError` candidate to the orphan-rule set
+/// and tripping type inference (`?` ambiguity) in downstream crates.
+#[must_use]
+pub const fn to_iggy_error(e: AuthError) -> IggyError {
+    match e {
+        AuthError::UnsupportedKind | AuthError::ReservedNonzero => 
IggyError::InvalidCommand,
+        AuthError::TagMismatch
+        | AuthError::UnknownPeer
+        | AuthError::TimestampOutOfWindow
+        | AuthError::NonceReplay => IggyError::Unauthenticated,
+    }
+}
+
+/// Signer + verifier of auth envelopes. The bus runs on a single-threaded
+/// compio runtime, so no `Send + Sync` bounds.
+pub trait TokenSource {
+    fn sign(&self, challenge: &AuthChallenge) -> AuthTag;
+    /// # Errors
+    /// Returns `AuthError::TagMismatch` when `tag` does not match the
+    /// MAC produced for `challenge`; may return other variants in
+    /// token-source impls that perform additional checks.
+    fn verify(&self, challenge: &AuthChallenge, tag: &AuthTag) -> Result<(), 
AuthError>;
+}
+
+/// `TokenSource` that holds a single 32-byte cluster-wide secret. All
+/// replicas share one secret on the replica plane; the integration-test
+/// harness instantiates this with a fixed zero-byte secret.
+pub struct StaticSharedSecret {
+    secret: [u8; SECRET_SIZE],
+}
+
+impl StaticSharedSecret {
+    #[must_use]
+    pub const fn new(secret: [u8; SECRET_SIZE]) -> Self {
+        Self { secret }
+    }
+
+    #[must_use]
+    pub const fn zero() -> Self {
+        Self::new([0u8; SECRET_SIZE])
+    }
+}
+
+impl TokenSource for StaticSharedSecret {
+    fn sign(&self, challenge: &AuthChallenge) -> AuthTag {
+        let input = challenge.encode();
+        AuthTag(blake3::keyed_hash(&self.secret, &input))
+    }
+
+    fn verify(&self, challenge: &AuthChallenge, tag: &AuthTag) -> Result<(), 
AuthError> {
+        let expected = self.sign(challenge);
+        if expected.0 == tag.0 {
+            Ok(())
+        } else {
+            Err(AuthError::TagMismatch)
+        }
+    }
+}
+
+/// Fixed-capacity ring-buffer nonce dedup.
+///
+/// Eviction is O(1) via a round-robin cursor; lookup is O(N) over
+/// `NONCE_LRU_CAPACITY` entries, which is a 256-wide linear scan on the
+/// hot path. Memory footprint is deterministic and bounded regardless of
+/// peer behaviour.
+#[derive(Debug)]
+pub struct NonceRing {
+    slots: Vec<u128>,
+    cursor: usize,
+    capacity: usize,
+}
+
+impl NonceRing {
+    #[must_use]
+    pub const fn new(capacity: usize) -> Self {
+        Self {
+            slots: Vec::new(),
+            cursor: 0,
+            capacity,
+        }
+    }
+
+    /// Returns `true` if `nonce` was not present (and has now been
+    /// recorded); `false` if it was already seen.
+    pub fn insert(&mut self, nonce: u128) -> bool {
+        if self.slots.contains(&nonce) {
+            return false;
+        }
+        if self.slots.len() < self.capacity {
+            self.slots.push(nonce);
+        } else {
+            self.slots[self.cursor] = nonce;
+            self.cursor = (self.cursor + 1) % self.capacity;
+        }
+        true
+    }
+}
+
+impl Default for NonceRing {
+    fn default() -> Self {
+        Self::new(NONCE_LRU_CAPACITY)
+    }
+}
+
+/// Encode an auth envelope into `reserved_command`.
+///
+/// Overwrites the first `ENVELOPE_LEN` bytes and zero-fills the trailing
+/// reserved range. Source of the tag is `token_source`; `timestamp_ns`
+/// and `nonce` are caller-controlled so the same routine can be used for
+/// deterministic tests (fake clock, fixed nonce) and production (system
+/// clock, CSPRNG).
+pub fn encode_envelope(
+    reserved: &mut [u8; 128],
+    token_source: &dyn TokenSource,
+    challenge: &AuthChallenge,
+) {
+    let tag = token_source.sign(challenge);
+    reserved[TAG_RANGE].copy_from_slice(tag.as_bytes());
+    
reserved[TIMESTAMP_RANGE].copy_from_slice(&challenge.timestamp_ns.to_le_bytes());
+    reserved[NONCE_RANGE].copy_from_slice(&challenge.nonce.to_le_bytes());
+    reserved[KIND_OFFSET] = AuthKind::Blake3V1 as u8;
+    for b in &mut reserved[RESERVED_RANGE] {
+        *b = 0;
+    }
+}
+
+/// Parsed view of an envelope's raw bytes. No cryptographic checks.
+#[derive(Debug, Clone, Copy)]
+pub struct DecodedEnvelope {
+    pub tag: AuthTag,
+    pub timestamp_ns: u64,
+    pub nonce: u128,
+}
+
+/// Decode an envelope and reject malformed layout.
+///
+/// Rejects unknown envelope kinds and nonzero reserved bytes. Does not
+/// check the tag; caller passes the returned `DecodedEnvelope` to
+/// [`verify_envelope`] with the full challenge context.
+///
+/// # Errors
+///
+/// Returns `AuthError::UnsupportedKind` for unknown envelope versions or
+/// `AuthError::ReservedNonzero` for non-zero reserved padding.
+pub fn decode_envelope(reserved: &[u8; 128]) -> Result<DecodedEnvelope, 
AuthError> {
+    AuthKind::from_byte(reserved[KIND_OFFSET])?;
+    if reserved[RESERVED_RANGE].iter().any(|&b| b != 0) {
+        return Err(AuthError::ReservedNonzero);
+    }
+    let mut tag_bytes = [0u8; 32];
+    tag_bytes.copy_from_slice(&reserved[TAG_RANGE]);
+    let mut ts_bytes = [0u8; 8];
+    ts_bytes.copy_from_slice(&reserved[TIMESTAMP_RANGE]);
+    let mut nonce_bytes = [0u8; 16];
+    nonce_bytes.copy_from_slice(&reserved[NONCE_RANGE]);
+    Ok(DecodedEnvelope {
+        tag: AuthTag(blake3::Hash::from_bytes(tag_bytes)),
+        timestamp_ns: u64::from_le_bytes(ts_bytes),
+        nonce: u128::from_le_bytes(nonce_bytes),
+    })
+}
+
+/// Full credential check: timestamp window -> nonce dedup -> tag.
+///
+/// # Errors
+///
+/// Returns the first `AuthError` triggered by the check sequence.
+pub fn verify_envelope(
+    token_source: &dyn TokenSource,
+    challenge: &AuthChallenge,
+    decoded: &DecodedEnvelope,
+    now_ns: u128,
+    nonces: &mut NonceRing,
+) -> Result<(), AuthError> {
+    let ts_u128 = u128::from(decoded.timestamp_ns);
+    let delta = now_ns.abs_diff(ts_u128);
+    if delta > TIMESTAMP_WINDOW_NS {
+        return Err(AuthError::TimestampOutOfWindow);
+    }
+    if !nonces.insert(decoded.nonce) {
+        return Err(AuthError::NonceReplay);
+    }
+    token_source.verify(challenge, &decoded.tag)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn sample_challenge() -> AuthChallenge {
+        AuthChallenge {
+            cluster: 0xdead_beef_1234_5678_9abc_def0_1122_3344,
+            peer_id: 7,
+            timestamp_ns: 1_700_000_000_000_000_000,
+            release: 1,
+            nonce: 0x11_2233_4455_6677_8899_aabb_ccdd_eeff,
+        }
+    }
+
+    #[test]
+    fn roundtrip_ok() {
+        let source = StaticSharedSecret::new([1u8; SECRET_SIZE]);
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &source, &challenge);
+
+        let decoded = decode_envelope(&reserved).expect("decode");
+        let mut ring = NonceRing::new(32);
+        verify_envelope(
+            &source,
+            &challenge,
+            &decoded,
+            u128::from(challenge.timestamp_ns),
+            &mut ring,
+        )
+        .expect("verify");
+    }
+
+    #[test]
+    fn wrong_secret_rejected() {
+        let signer = StaticSharedSecret::new([1u8; SECRET_SIZE]);
+        let verifier = StaticSharedSecret::new([2u8; SECRET_SIZE]);
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &signer, &challenge);
+
+        let decoded = decode_envelope(&reserved).unwrap();
+        let mut ring = NonceRing::new(32);
+        let err = verify_envelope(
+            &verifier,
+            &challenge,
+            &decoded,
+            u128::from(challenge.timestamp_ns),
+            &mut ring,
+        )
+        .unwrap_err();
+        assert_eq!(err, AuthError::TagMismatch);
+    }
+
+    #[test]
+    fn replay_rejected() {
+        let source = StaticSharedSecret::zero();
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &source, &challenge);
+
+        let decoded = decode_envelope(&reserved).unwrap();
+        let mut ring = NonceRing::new(32);
+        let now = u128::from(challenge.timestamp_ns);
+        verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap();
+        let err = verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap_err();
+        assert_eq!(err, AuthError::NonceReplay);
+    }
+
+    #[test]
+    fn stale_timestamp_rejected() {
+        let source = StaticSharedSecret::zero();
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &source, &challenge);
+
+        let decoded = decode_envelope(&reserved).unwrap();
+        let mut ring = NonceRing::new(32);
+        let now = u128::from(challenge.timestamp_ns) + TIMESTAMP_WINDOW_NS + 1;
+        let err = verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap_err();
+        assert_eq!(err, AuthError::TimestampOutOfWindow);
+    }
+
+    #[test]
+    fn future_timestamp_rejected() {
+        let source = StaticSharedSecret::zero();
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &source, &challenge);
+
+        let decoded = decode_envelope(&reserved).unwrap();
+        let mut ring = NonceRing::new(32);
+        let now = 
u128::from(challenge.timestamp_ns).saturating_sub(TIMESTAMP_WINDOW_NS + 1);
+        let err = verify_envelope(&source, &challenge, &decoded, now, &mut 
ring).unwrap_err();
+        assert_eq!(err, AuthError::TimestampOutOfWindow);
+    }
+
+    #[test]
+    fn unsupported_kind_rejected() {
+        let source = StaticSharedSecret::zero();
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &source, &challenge);
+        reserved[KIND_OFFSET] = 99;
+        let err = decode_envelope(&reserved).unwrap_err();
+        assert_eq!(err, AuthError::UnsupportedKind);
+    }
+
+    #[test]
+    fn reserved_nonzero_rejected() {
+        let source = StaticSharedSecret::zero();
+        let challenge = sample_challenge();
+        let mut reserved = [0u8; 128];
+        encode_envelope(&mut reserved, &source, &challenge);
+        reserved[100] = 0xff;
+        let err = decode_envelope(&reserved).unwrap_err();
+        assert_eq!(err, AuthError::ReservedNonzero);
+    }
+
+    #[test]
+    fn nonce_ring_fills_and_evicts_fifo() {
+        let mut ring = NonceRing::new(3);
+        assert!(ring.insert(1));
+        assert!(ring.insert(2));
+        assert!(ring.insert(3));
+        assert!(!ring.insert(1));
+        assert!(ring.insert(4));
+        // Nonce 1 evicted; reinserting it should now succeed again.
+        assert!(ring.insert(1));
+    }
+}
diff --git a/core/message_bus/src/connector.rs 
b/core/message_bus/src/connector.rs
index 641d50683..01d18a255 100644
--- a/core/message_bus/src/connector.rs
+++ b/core/message_bus/src/connector.rs
@@ -25,12 +25,15 @@
 //! (see `shard::coordinator::ShardZeroCoordinator`).
 
 use crate::IggyMessageBus;
+use crate::auth::{self, TokenSource};
 use crate::framing;
 use crate::lifecycle::ShutdownToken;
 use crate::socket_opts::apply_keepalive_for_connection;
 use crate::{AcceptedReplicaFn, GenericHeader, Message};
 use compio::net::TcpStream;
 use iggy_binary_protocol::{Command2, HEADER_SIZE};
+use iggy_common::IggyTimestamp;
+use rand::Rng;
 use std::mem::size_of;
 use std::net::SocketAddr;
 use std::rc::Rc;
@@ -57,12 +60,14 @@ pub async fn start(
     peers: Vec<(u8, SocketAddr)>,
     on_dialed: AcceptedReplicaFn,
     reconnect_period: Duration,
+    token_source: Rc<dyn TokenSource>,
 ) {
-    connect_all(bus, cluster_id, self_id, &peers, &on_dialed).await;
+    connect_all(bus, cluster_id, self_id, &peers, &on_dialed, 
&token_source).await;
 
     let handler = on_dialed.clone();
     let token = bus.token();
     let bus_for_task = Rc::clone(bus);
+    let token_source_for_task = Rc::clone(&token_source);
     let handle = compio::runtime::spawn(async move {
         periodic_reconnect(
             &bus_for_task,
@@ -72,6 +77,7 @@ pub async fn start(
             handler,
             reconnect_period,
             token,
+            token_source_for_task,
         )
         .await;
     });
@@ -85,6 +91,7 @@ async fn connect_all(
     self_id: u8,
     peers: &[(u8, SocketAddr)],
     on_dialed: &AcceptedReplicaFn,
+    token_source: &Rc<dyn TokenSource>,
 ) {
     for &(peer_id, addr) in peers {
         if peer_id <= self_id {
@@ -105,11 +112,21 @@ async fn connect_all(
             );
             continue;
         }
-        connect_one(bus, cluster_id, self_id, peer_id, addr, on_dialed).await;
+        connect_one(
+            bus,
+            cluster_id,
+            self_id,
+            peer_id,
+            addr,
+            on_dialed,
+            token_source,
+        )
+        .await;
     }
 }
 
 #[allow(clippy::future_not_send)]
+#[allow(clippy::too_many_arguments)]
 async fn periodic_reconnect(
     bus: &Rc<IggyMessageBus>,
     cluster_id: u128,
@@ -118,9 +135,10 @@ async fn periodic_reconnect(
     on_dialed: AcceptedReplicaFn,
     period: Duration,
     token: ShutdownToken,
+    token_source: Rc<dyn TokenSource>,
 ) {
     while token.sleep_or_shutdown(period).await {
-        connect_all(bus, cluster_id, self_id, &peers, &on_dialed).await;
+        connect_all(bus, cluster_id, self_id, &peers, &on_dialed, 
&token_source).await;
     }
     debug!("replica reconnect periodic task exiting");
 }
@@ -129,6 +147,7 @@ async fn periodic_reconnect(
 /// `on_dialed` on success. Dial / handshake failures are logged and
 /// swallowed; VSR tolerates missing peers and the periodic sweep retries.
 #[allow(clippy::future_not_send)]
+#[allow(clippy::too_many_arguments)]
 async fn connect_one(
     bus: &Rc<IggyMessageBus>,
     cluster_id: u128,
@@ -136,6 +155,7 @@ async fn connect_one(
     peer_id: u8,
     addr: SocketAddr,
     on_dialed: &AcceptedReplicaFn,
+    token_source: &Rc<dyn TokenSource>,
 ) {
     let mut stream = match TcpStream::connect(addr).await {
         Ok(s) => s,
@@ -160,7 +180,7 @@ async fn connect_one(
         );
     }
 
-    let ping = build_ping_message(cluster_id, self_id);
+    let ping = build_ping_message(cluster_id, self_id, token_source.as_ref());
     if let Err(e) = framing::write_message(&mut stream, ping).await {
         warn!(replica = peer_id, %addr, "handshake write failed: {e}");
         return;
@@ -170,8 +190,21 @@ async fn connect_one(
     on_dialed(stream, peer_id);
 }
 
-/// Build a Ping handshake message identifying our replica id.
-fn build_ping_message(cluster_id: u128, replica_id: u8) -> 
Message<GenericHeader> {
+/// Build a Ping handshake message identifying our replica id and carrying
+/// a fresh auth envelope stamped by `token_source`.
+fn build_ping_message(
+    cluster_id: u128,
+    replica_id: u8,
+    token_source: &dyn TokenSource,
+) -> Message<GenericHeader> {
+    #[allow(clippy::cast_possible_truncation)]
+    let timestamp_ns = IggyTimestamp::now().as_nanos() as u64;
+    let nonce = {
+        let mut buf = [0u8; 16];
+        rand::rng().fill_bytes(&mut buf);
+        u128::from_le_bytes(buf)
+    };
+
     #[allow(clippy::cast_possible_truncation)]
     Message::<GenericHeader>::new(size_of::<GenericHeader>()).transmute_header(
         |_, h: &mut GenericHeader| {
@@ -179,6 +212,14 @@ fn build_ping_message(cluster_id: u128, replica_id: u8) -> 
Message<GenericHeader
             h.cluster = cluster_id;
             h.replica = replica_id;
             h.size = HEADER_SIZE as u32;
+            let challenge = auth::AuthChallenge {
+                cluster: cluster_id,
+                peer_id: u128::from(replica_id),
+                timestamp_ns,
+                release: h.release,
+                nonce,
+            };
+            auth::encode_envelope(&mut h.reserved_command, token_source, 
&challenge);
         },
     )
 }
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index bb993fc8c..99c76633a 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod auth;
 pub mod cache;
 pub mod client_listener;
 pub mod config;
diff --git a/core/message_bus/src/replica_io.rs 
b/core/message_bus/src/replica_io.rs
index 5ef866ff3..3731cc6d8 100644
--- a/core/message_bus/src/replica_io.rs
+++ b/core/message_bus/src/replica_io.rs
@@ -36,6 +36,7 @@ use std::time::Duration;
 
 use iggy_common::IggyError;
 
+use crate::auth::TokenSource;
 use crate::connector::start as start_connector;
 use crate::replica_listener::{bind as bind_replica_listener, run as 
run_replica_listener};
 use crate::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, 
client_listener};
@@ -71,6 +72,7 @@ pub async fn start_on_shard_zero(
     on_accepted_replica: AcceptedReplicaFn,
     on_accepted_client: AcceptedClientFn,
     reconnect_period: Duration,
+    token_source: Rc<dyn TokenSource>,
 ) -> Result<Option<BoundPlanes>, IggyError> {
     if bus.shard_id() != 0 {
         return Ok(None);
@@ -82,6 +84,7 @@ pub async fn start_on_shard_zero(
     let token_for_replica = bus.token();
     let on_accepted_replica_for_listener = on_accepted_replica.clone();
     let listener_max_message_size = bus.config().max_message_size;
+    let token_source_for_listener = Rc::clone(&token_source);
     let replica_handle = compio::runtime::spawn(async move {
         run_replica_listener(
             replica_listener,
@@ -91,6 +94,7 @@ pub async fn start_on_shard_zero(
             replica_count,
             on_accepted_replica_for_listener,
             listener_max_message_size,
+            token_source_for_listener,
         )
         .await;
     });
@@ -109,6 +113,7 @@ pub async fn start_on_shard_zero(
         peers,
         on_accepted_replica,
         reconnect_period,
+        token_source,
     )
     .await;
 
@@ -136,6 +141,7 @@ pub async fn start_on_shard_zero_default(
     peers: Vec<(u8, SocketAddr)>,
     on_accepted_replica: AcceptedReplicaFn,
     on_accepted_client: AcceptedClientFn,
+    token_source: Rc<dyn TokenSource>,
 ) -> Result<Option<BoundPlanes>, IggyError> {
     let reconnect_period = bus.config().reconnect_period;
     start_on_shard_zero(
@@ -149,6 +155,7 @@ pub async fn start_on_shard_zero_default(
         on_accepted_replica,
         on_accepted_client,
         reconnect_period,
+        token_source,
     )
     .await
 }
diff --git a/core/message_bus/src/replica_listener.rs 
b/core/message_bus/src/replica_listener.rs
index 9d9db9192..d2d82d586 100644
--- a/core/message_bus/src/replica_listener.rs
+++ b/core/message_bus/src/replica_listener.rs
@@ -29,20 +29,23 @@
 //!
 //! # Security
 //!
-//! The `Ping` handshake validates `cluster_id`, the directional bound, and
-//! `replica_count`. There is NO shared secret, no mTLS, no version
-//! negotiation. Deploy the replica listener on a trusted network boundary
-//! (cluster-local VPC, private subnet, overlay, or equivalent) - NEVER
-//! expose the replica port directly to the public internet. Follow-up work
-//! on an authenticated handshake is tracked under IGGY-112.
+//! The `Ping` handshake validates `cluster_id`, the directional bound,
+//! `replica_count`, and a BLAKE3-keyed MAC carried in
+//! `GenericHeader.reserved_command[0..57]` (see [`crate::auth`]). Peers
+//! that cannot produce a valid tag are rejected. Even so, TLS/encryption is
+//! NOT provided here: operators still need to deploy the replica port on a
+//! trusted network boundary (cluster-local VPC, private subnet, overlay)
+//! if wire confidentiality is required.
 
+use crate::auth::{self, NonceRing, TokenSource};
 use crate::framing;
 use crate::lifecycle::ShutdownToken;
 use crate::{AcceptedReplicaFn, GenericHeader, Message};
 use compio::net::{SocketOpts, TcpListener, TcpStream};
 use futures::FutureExt;
 use iggy_binary_protocol::Command2;
-use iggy_common::IggyError;
+use iggy_common::{IggyError, IggyTimestamp};
+use std::cell::RefCell;
 use std::net::SocketAddr;
 use std::rc::Rc;
 use tracing::{debug, error, info, warn};
@@ -87,11 +90,13 @@ pub async fn run(
     replica_count: u8,
     on_accepted: AcceptedReplicaFn,
     max_message_size: usize,
+    token_source: Rc<dyn TokenSource>,
 ) {
     info!(
         "Replica listener accepting on {:?}",
         listener.local_addr().ok()
     );
+    let nonces = RefCell::new(NonceRing::default());
     loop {
         futures::select! {
             () = token.wait().fuse() => {
@@ -101,14 +106,23 @@ pub async fn run(
             result = listener.accept().fuse() => {
                 match result {
                     Ok((mut stream, peer_addr)) => {
-                        match handshake(
-                            &mut stream,
-                            cluster_id,
-                            self_id,
-                            replica_count,
-                            max_message_size,
-                        )
-                        .await {
+                        let read = handshake_read(&mut stream, cluster_id, 
max_message_size).await;
+                        let outcome = match read {
+                            Ok(parsed) => {
+                                let now_ns = IggyTimestamp::now().as_nanos();
+                                let mut ring = nonces.borrow_mut();
+                                handshake_verify(
+                                    &parsed,
+                                    token_source.as_ref(),
+                                    &mut ring,
+                                    now_ns,
+                                    self_id,
+                                    replica_count,
+                                )
+                            }
+                            Err(e) => Err(e),
+                        };
+                        match outcome {
                             Ok(peer_id) => {
                                 on_accepted(stream, peer_id);
                             }
@@ -126,26 +140,23 @@ pub async fn run(
     }
 }
 
-/// Read and validate the `Ping` handshake from an inbound peer connection.
-///
-/// Returns the peer replica id on success.
-//
-// TODO(IGGY-112): authenticate the replica handshake.
-//
-// Current validation covers `cluster_id` and the directional id bound
-// only: no shared secret, no mTLS, no protocol version negotiation.
-// Any peer that reaches the replica port and knows the `cluster_id`
-// can pose as a replica. Acceptable on the assumed trusted network
-// boundary; hardening is tracked under IGGY-112 (authenticated
-// handshake + version negotiation).
+/// Parsed-but-unverified handshake captured from the wire. Held across
+/// the await boundary between I/O and the sync verify step.
+struct ParsedHandshake {
+    replica: u8,
+    decoded: auth::DecodedEnvelope,
+    challenge: auth::AuthChallenge,
+}
+
+/// I/O-only portion of the handshake: read the 256 B `Ping` frame, enforce
+/// command + cluster match, and parse the auth envelope's bytes. Does NOT
+/// touch the nonce ring or verify the tag.
 #[allow(clippy::future_not_send)]
-async fn handshake(
+async fn handshake_read(
     stream: &mut TcpStream,
     our_cluster: u128,
-    self_id: u8,
-    replica_count: u8,
     max_message_size: usize,
-) -> Result<u8, IggyError> {
+) -> Result<ParsedHandshake, IggyError> {
     let msg = framing::read_message(stream, max_message_size).await?;
     let header = msg.header();
     if header.command != Command2::Ping {
@@ -154,11 +165,45 @@ async fn handshake(
     if header.cluster != our_cluster {
         return Err(IggyError::InvalidCommand);
     }
+    let decoded = 
auth::decode_envelope(&header.reserved_command).map_err(auth::to_iggy_error)?;
+    let challenge = auth::AuthChallenge {
+        cluster: header.cluster,
+        peer_id: u128::from(header.replica),
+        timestamp_ns: decoded.timestamp_ns,
+        release: header.release,
+        nonce: decoded.nonce,
+    };
+    Ok(ParsedHandshake {
+        replica: header.replica,
+        decoded,
+        challenge,
+    })
+}
+
+/// Synchronous verifier: tag + replay, then the directional tiebreak.
+/// Runs outside any await so the `&mut NonceRing` borrow cannot span I/O.
+fn handshake_verify(
+    parsed: &ParsedHandshake,
+    token_source: &dyn TokenSource,
+    nonces: &mut NonceRing,
+    now_ns: u128,
+    self_id: u8,
+    replica_count: u8,
+) -> Result<u8, IggyError> {
+    auth::verify_envelope(
+        token_source,
+        &parsed.challenge,
+        &parsed.decoded,
+        now_ns,
+        nonces,
+    )
+    .map_err(auth::to_iggy_error)?;
+
     // Directional rule: a replica only accepts inbound from peers with
     // strictly lower ids. The peer is responsible for not dialing us if
     // it has the higher id; this is just defensive.
-    if header.replica >= replica_count || header.replica >= self_id {
+    if parsed.replica >= replica_count || parsed.replica >= self_id {
         return Err(IggyError::InvalidCommand);
     }
-    Ok(header.replica)
+    Ok(parsed.replica)
 }
diff --git a/core/message_bus/tests/backpressure.rs 
b/core/message_bus/tests/backpressure.rs
index 1a399aa04..7e849a981 100644
--- a/core/message_bus/tests/backpressure.rs
+++ b/core/message_bus/tests/backpressure.rs
@@ -24,7 +24,7 @@
 
 mod common;
 
-use common::{header_only, install_replicas_locally, loopback};
+use common::{header_only, install_replicas_locally, loopback, 
test_token_source};
 use iggy_binary_protocol::Command2;
 use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -62,6 +62,7 @@ async fn try_send_returns_backpressure_when_queue_full() {
             2,
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -77,6 +78,7 @@ async fn try_send_returns_backpressure_when_queue_full() {
         vec![(1, addr1)],
         dial_0,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await;
 
diff --git a/core/message_bus/tests/common/mod.rs 
b/core/message_bus/tests/common/mod.rs
index b00fbb310..81cdf1fa5 100644
--- a/core/message_bus/tests/common/mod.rs
+++ b/core/message_bus/tests/common/mod.rs
@@ -27,6 +27,7 @@
 #![allow(dead_code)] // each test binary uses a subset
 
 use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE, Message};
+use message_bus::auth::{StaticSharedSecret, TokenSource};
 use message_bus::client_listener::RequestHandler;
 use message_bus::replica_listener::MessageHandler;
 use message_bus::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, 
installer};
@@ -34,6 +35,13 @@ use std::cell::Cell;
 use std::net::SocketAddr;
 use std::rc::Rc;
 
+/// Default zero-byte shared secret for integration tests. All replicas
+/// authenticate against the same dummy source.
+#[must_use]
+pub fn test_token_source() -> Rc<dyn TokenSource> {
+    Rc::new(StaticSharedSecret::zero())
+}
+
 /// Loopback address with OS-chosen port.
 #[must_use]
 pub fn loopback() -> SocketAddr {
diff --git a/core/message_bus/tests/connection_lost_notify.rs 
b/core/message_bus/tests/connection_lost_notify.rs
index 8babea3ad..00a05b5f7 100644
--- a/core/message_bus/tests/connection_lost_notify.rs
+++ b/core/message_bus/tests/connection_lost_notify.rs
@@ -22,7 +22,7 @@
 
 mod common;
 
-use common::{install_replicas_locally, loopback};
+use common::{install_replicas_locally, loopback, test_token_source};
 use message_bus::IggyMessageBus;
 use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -64,6 +64,7 @@ async fn 
connection_lost_fires_exactly_once_per_peer_disconnect() {
             2,
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -77,6 +78,7 @@ async fn 
connection_lost_fires_exactly_once_per_peer_disconnect() {
         vec![(1u8, addr1)],
         dial_0,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await;
 
diff --git a/core/message_bus/tests/directional_connection.rs 
b/core/message_bus/tests/directional_connection.rs
index ac8a3a7d1..6251939ce 100644
--- a/core/message_bus/tests/directional_connection.rs
+++ b/core/message_bus/tests/directional_connection.rs
@@ -22,7 +22,7 @@
 
 mod common;
 
-use common::{install_replicas_locally, loopback};
+use common::{install_replicas_locally, loopback, test_token_source};
 use message_bus::IggyMessageBus;
 use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -53,6 +53,7 @@ async fn lower_id_dials_higher_id_accepts() {
             2,
             accept_0,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -69,6 +70,7 @@ async fn lower_id_dials_higher_id_accepts() {
             2,
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -85,10 +87,20 @@ async fn lower_id_dials_higher_id_accepts() {
         peers.clone(),
         dial_0,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await;
     let dial_1 = install_replicas_locally(bus1.clone(), on_message.clone());
-    start_connector(&bus1, CLUSTER, 1, peers, dial_1, 
DEFAULT_RECONNECT_PERIOD).await;
+    start_connector(
+        &bus1,
+        CLUSTER,
+        1,
+        peers,
+        dial_1,
+        DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
+    )
+    .await;
 
     // Wait for the directional connection to settle.
     let deadline = std::time::Instant::now() + Duration::from_secs(2);
diff --git a/core/message_bus/tests/handshake_auth.rs 
b/core/message_bus/tests/handshake_auth.rs
new file mode 100644
index 000000000..b833354e7
--- /dev/null
+++ b/core/message_bus/tests/handshake_auth.rs
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration coverage for the authenticated replica handshake.
+//!
+//! Complements the in-crate unit tests in `core/message_bus/src/auth.rs`
+//! by exercising the full end-to-end path (dial -> Ping frame -> listener
+//! verify) through `replica_listener::run` + `connector::start`.
+
+mod common;
+
+use common::{install_replicas_locally, loopback};
+use message_bus::IggyMessageBus;
+use message_bus::auth::{StaticSharedSecret, TokenSource};
+use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
+use message_bus::replica_listener::{MessageHandler, bind, run};
+use std::rc::Rc;
+use std::time::Duration;
+
+const CLUSTER: u128 = 0xCAFE;
+
+/// Matching secrets on both sides: handshake succeeds, peer registers.
+#[compio::test]
+async fn handshake_with_matching_secret_registers_peer() {
+    let bus_listener = Rc::new(IggyMessageBus::new(0));
+    let bus_dialer = Rc::new(IggyMessageBus::new(0));
+
+    let secret: Rc<dyn TokenSource> = Rc::new(StaticSharedSecret::new([7u8; 
32]));
+
+    let on_msg: MessageHandler = Rc::new(|_, _| {});
+
+    let (listener, addr) = bind(loopback()).await.unwrap();
+    let token_for_listener = bus_listener.token();
+    let accept = install_replicas_locally(bus_listener.clone(), 
on_msg.clone());
+    let secret_for_listener = Rc::clone(&secret);
+    let listener_handle = compio::runtime::spawn(async move {
+        run(
+            listener,
+            token_for_listener,
+            CLUSTER,
+            1,
+            2,
+            accept,
+            message_bus::framing::MAX_MESSAGE_SIZE,
+            secret_for_listener,
+        )
+        .await;
+    });
+    bus_listener.track_background(listener_handle);
+
+    let dial = install_replicas_locally(bus_dialer.clone(), on_msg);
+    start_connector(
+        &bus_dialer,
+        CLUSTER,
+        0,
+        vec![(1u8, addr)],
+        dial,
+        DEFAULT_RECONNECT_PERIOD,
+        Rc::clone(&secret),
+    )
+    .await;
+
+    let deadline = std::time::Instant::now() + Duration::from_secs(2);
+    while !bus_dialer.replicas().contains(1) {
+        assert!(
+            std::time::Instant::now() < deadline,
+            "matching-secret handshake did not register peer"
+        );
+        compio::time::sleep(Duration::from_millis(5)).await;
+    }
+
+    bus_dialer.shutdown(Duration::from_secs(2)).await;
+    bus_listener.shutdown(Duration::from_secs(2)).await;
+}
+
+/// Mismatched secrets: the listener rejects the dialed Ping, so neither
+/// side ends up with a registered peer within the test window.
+#[compio::test]
+async fn handshake_with_mismatched_secret_rejects_peer() {
+    let bus_listener = Rc::new(IggyMessageBus::new(0));
+    let bus_dialer = Rc::new(IggyMessageBus::new(0));
+
+    let listener_secret: Rc<dyn TokenSource> = 
Rc::new(StaticSharedSecret::new([1u8; 32]));
+    let dialer_secret: Rc<dyn TokenSource> = 
Rc::new(StaticSharedSecret::new([2u8; 32]));
+
+    let on_msg: MessageHandler = Rc::new(|_, _| {});
+
+    let (listener, addr) = bind(loopback()).await.unwrap();
+    let token_for_listener = bus_listener.token();
+    let accept = install_replicas_locally(bus_listener.clone(), 
on_msg.clone());
+    let listener_handle = compio::runtime::spawn(async move {
+        run(
+            listener,
+            token_for_listener,
+            CLUSTER,
+            1,
+            2,
+            accept,
+            message_bus::framing::MAX_MESSAGE_SIZE,
+            listener_secret,
+        )
+        .await;
+    });
+    bus_listener.track_background(listener_handle);
+
+    let dial = install_replicas_locally(bus_dialer.clone(), on_msg);
+    start_connector(
+        &bus_dialer,
+        CLUSTER,
+        0,
+        vec![(1u8, addr)],
+        dial,
+        Duration::from_millis(50),
+        dialer_secret,
+    )
+    .await;
+
+    compio::time::sleep(Duration::from_millis(400)).await;
+    assert!(
+        !bus_dialer.replicas().contains(1),
+        "mismatched-secret handshake must not register the peer"
+    );
+    assert!(
+        !bus_listener.replicas().contains(0),
+        "mismatched-secret handshake must not register the peer on the 
listener either"
+    );
+
+    bus_dialer.shutdown(Duration::from_secs(2)).await;
+    bus_listener.shutdown(Duration::from_secs(2)).await;
+}
diff --git a/core/message_bus/tests/head_of_line.rs 
b/core/message_bus/tests/head_of_line.rs
index 57c644971..b756897ce 100644
--- a/core/message_bus/tests/head_of_line.rs
+++ b/core/message_bus/tests/head_of_line.rs
@@ -25,7 +25,7 @@
 
 mod common;
 
-use common::{header_only, install_replicas_locally, loopback};
+use common::{header_only, install_replicas_locally, loopback, 
test_token_source};
 use compio::net::TcpListener;
 use iggy_binary_protocol::Command2;
 use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
@@ -58,6 +58,7 @@ async fn slow_peer_does_not_block_other_peers() {
             3,
             accept_a,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -91,6 +92,7 @@ async fn slow_peer_does_not_block_other_peers() {
         vec![(1, addr_a), (2, addr_b)],
         dial_delegate,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await;
 
diff --git a/core/message_bus/tests/reconnect.rs 
b/core/message_bus/tests/reconnect.rs
index 0eef98e94..93ac64332 100644
--- a/core/message_bus/tests/reconnect.rs
+++ b/core/message_bus/tests/reconnect.rs
@@ -21,7 +21,7 @@
 
 mod common;
 
-use common::{install_replicas_locally, loopback};
+use common::{install_replicas_locally, loopback, test_token_source};
 use message_bus::IggyMessageBus;
 use message_bus::connector::start as start_connector;
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -43,7 +43,16 @@ async fn periodic_retry_picks_up_late_listener() {
     let on_message: MessageHandler = Rc::new(|_, _| {});
     let period = Duration::from_millis(100);
     let dial_delegate = install_replicas_locally(bus0.clone(), 
on_message.clone());
-    start_connector(&bus0, CLUSTER, 0, vec![(1, addr)], dial_delegate, 
period).await;
+    start_connector(
+        &bus0,
+        CLUSTER,
+        0,
+        vec![(1, addr)],
+        dial_delegate,
+        period,
+        test_token_source(),
+    )
+    .await;
     assert!(!bus0.replicas().contains(1), "first connect should fail");
 
     // Bring bus 1 online on the same address.
@@ -60,6 +69,7 @@ async fn periodic_retry_picks_up_late_listener() {
             2,
             accept_delegate,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
diff --git a/core/message_bus/tests/reconnect_skip_connected.rs 
b/core/message_bus/tests/reconnect_skip_connected.rs
index 5d9a77b3b..31159e1db 100644
--- a/core/message_bus/tests/reconnect_skip_connected.rs
+++ b/core/message_bus/tests/reconnect_skip_connected.rs
@@ -22,7 +22,7 @@
 
 mod common;
 
-use common::{install_replicas_locally, loopback};
+use common::{install_replicas_locally, loopback, test_token_source};
 use message_bus::IggyMessageBus;
 use message_bus::connector::start as start_connector;
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -59,6 +59,7 @@ async fn periodic_reconnect_skips_already_connected_peer() {
             2,
             accept_delegate,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -69,7 +70,16 @@ async fn periodic_reconnect_skips_already_connected_peer() {
     // missing, bus1's listener will see N extra accepts.
     let period = Duration::from_millis(50);
     let dial_delegate = install_replicas_locally(bus0.clone(), 
on_message.clone());
-    start_connector(&bus0, CLUSTER, 0, vec![(1u8, addr1)], dial_delegate, 
period).await;
+    start_connector(
+        &bus0,
+        CLUSTER,
+        0,
+        vec![(1u8, addr1)],
+        dial_delegate,
+        period,
+        test_token_source(),
+    )
+    .await;
 
     // Wait for the initial connection to settle on both sides.
     let deadline = std::time::Instant::now() + Duration::from_secs(2);
diff --git a/core/message_bus/tests/replica_roundtrip.rs 
b/core/message_bus/tests/replica_roundtrip.rs
index e557b501a..8d8fde80a 100644
--- a/core/message_bus/tests/replica_roundtrip.rs
+++ b/core/message_bus/tests/replica_roundtrip.rs
@@ -23,7 +23,7 @@
 mod common;
 
 use async_channel::Receiver;
-use common::{header_only, install_replicas_locally, loopback};
+use common::{header_only, install_replicas_locally, loopback, 
test_token_source};
 use iggy_binary_protocol::Command2;
 use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -55,6 +55,7 @@ async fn two_replicas_exchange_prepare_and_ack() {
             2,
             accept_delegate_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -76,6 +77,7 @@ async fn two_replicas_exchange_prepare_and_ack() {
         vec![(1, addr1)],
         dial_delegate_0,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await;
 
diff --git a/core/message_bus/tests/shard_zero_gating.rs 
b/core/message_bus/tests/shard_zero_gating.rs
index a65fd84d7..1dc0cc339 100644
--- a/core/message_bus/tests/shard_zero_gating.rs
+++ b/core/message_bus/tests/shard_zero_gating.rs
@@ -21,7 +21,7 @@
 
 mod common;
 
-use common::{install_clients_locally, install_replicas_locally, loopback};
+use common::{install_clients_locally, install_replicas_locally, loopback, 
test_token_source};
 use message_bus::IggyMessageBus;
 use message_bus::client_listener::RequestHandler;
 use message_bus::connector::DEFAULT_RECONNECT_PERIOD;
@@ -51,6 +51,7 @@ async fn shard_zero_binds_listener_and_starts_connector() {
             2,
             peer_accept,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -76,6 +77,7 @@ async fn shard_zero_binds_listener_and_starts_connector() {
         accepted_replica,
         accepted_client,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await
     .expect("start_on_shard_zero must succeed on shard 0");
@@ -127,6 +129,7 @@ async fn non_zero_shard_skips_io() {
         accepted_replica,
         accepted_client,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await
     .expect("start_on_shard_zero must succeed on non-zero shard (no-op)");
diff --git a/core/message_bus/tests/vectored_batch.rs 
b/core/message_bus/tests/vectored_batch.rs
index c8fc10c6b..b079725a5 100644
--- a/core/message_bus/tests/vectored_batch.rs
+++ b/core/message_bus/tests/vectored_batch.rs
@@ -20,7 +20,7 @@
 
 mod common;
 
-use common::{header_only, install_replicas_locally, loopback};
+use common::{header_only, install_replicas_locally, loopback, 
test_token_source};
 use iggy_binary_protocol::Command2;
 use message_bus::connector::{DEFAULT_RECONNECT_PERIOD, start as 
start_connector};
 use message_bus::replica_listener::{MessageHandler, bind, run};
@@ -57,6 +57,7 @@ async fn writer_batches_pipelined_sends_in_order() {
             2,
             accept_1,
             message_bus::framing::MAX_MESSAGE_SIZE,
+            test_token_source(),
         )
         .await;
     });
@@ -72,6 +73,7 @@ async fn writer_batches_pipelined_sends_in_order() {
         vec![(1, addr1)],
         dial_0,
         DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
     )
     .await;
 

Reply via email to