This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 6f99acdadc4c94830c27e623123a9058fa844c27
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Apr 27 11:22:49 2026 +0200

    feat(message_bus): add QUIC transport for SDK-client plane
    
    The client plane was TCP-only. Phase 4 of the transport plan calls
    for QUIC parity with legacy `core/server`; this lands the message-
    bus side behind the existing `Transport*` trait family.
    
    `QuicTransport{Listener,Conn,Reader,Writer}` in
    `transports/quic.rs` wrap `compio-quic 0.7.2` so the runtime stays
    single-stack (no tokio bridge, invariant I10). The listener accepts
    the connection AND drains the first bidi `accept_bi` pair before
    yielding, so the install path on the owning shard sees a fully-
    established data plane.
    
    Reusing `framing::read_message` against `RecvStream` required
    dropping the `Unpin` bound from the framing helpers; `RecvStream`
    is `!Unpin` in compio-quic. The bound was incidental - neither
    `read_exact` nor `write_all` actually need it.
    
    QUIC is always-on, not feature-gated, matching legacy server's
    posture and shrinking the build matrix.
    
    Defenses: 0-RTT off at the rustls layer
    (`max_early_data_size = 0`); listener refuses any 0-RTT-tagged
    stream with `QUIC_PROTOCOL_VIOLATION`. ALPN `iggy.consensus.v1`.
    Default `TransportConfig`: 1 bidi/peer, 30 s idle timeout, 64 MiB
    windows, CUBIC congestion (upstream default).
    
    Tag-along: demote the only stray broken intra-doc link
    (`ClientTable` in `core/server-ng/src/session_manager.rs:26`,
    inherited from master) to a code span so `cargo doc` runs warning-
    free on this branch.
    
    Tests: +3 in `transports::quic::tests` (round-trip three frames,
    oversize-frame surfacing `IggyError::InvalidCommand`, default
    config smoke). 123/123 pass on `cargo nextest run -p message_bus
    -p server-ng`. Workspace clippy + cargo doc clean.
---
 Cargo.lock                              |   3 +
 Cargo.toml                              |   1 +
 core/message_bus/Cargo.toml             |   3 +
 core/message_bus/src/framing.rs         |   4 +-
 core/message_bus/src/transports/mod.rs  |   1 +
 core/message_bus/src/transports/quic.rs | 483 ++++++++++++++++++++++++++++++++
 core/server-ng/src/session_manager.rs   |   2 +-
 7 files changed, 494 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index af920f2d3..cb1463495 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7011,11 +7011,14 @@ dependencies = [
  "async-channel",
  "blake3",
  "compio",
+ "compio-quic",
  "futures",
  "iggy_binary_protocol",
  "iggy_common",
  "libc",
  "rand 0.10.1",
+ "rcgen",
+ "rustls",
  "socket2 0.6.3",
  "tempfile",
  "thiserror 2.0.18",
diff --git a/Cargo.toml b/Cargo.toml
index dbb14f5ff..87d8878d1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -118,6 +118,7 @@ compio = { version = "0.18.0", features = [
 compio-buf = "0.8.1"
 # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668)
 compio-driver = "0.11.4"
+compio-quic = "=0.7.2"
 configs = { path = "core/configs", version = "0.1.0" }
 configs_derive = { path = "core/configs_derive", version = "0.1.0" }
 consensus = { path = "core/consensus" }
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index 6fa1ae1f4..ace090f16 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -32,16 +32,19 @@ publish = false
 async-channel = { workspace = true }
 blake3 = { workspace = true }
 compio = { workspace = true }
+compio-quic = { workspace = true }
 futures = { workspace = true }
 iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 libc = { workspace = true }
 rand = { workspace = true }
+rustls = { workspace = true }
 socket2 = { workspace = true, features = ["all"] }
 thiserror = { workspace = true }
 tracing = { workspace = true }
 
 [dev-dependencies]
+rcgen = { workspace = true }
 tempfile = { workspace = true }
 
 [lints.clippy]
diff --git a/core/message_bus/src/framing.rs b/core/message_bus/src/framing.rs
index 0f3af8576..315234e32 100644
--- a/core/message_bus/src/framing.rs
+++ b/core/message_bus/src/framing.rs
@@ -61,7 +61,7 @@ const _: () = {
 ///
 /// Returns `IggyError::TcpError` if the write fails.
 #[allow(clippy::future_not_send)]
-pub async fn write_message<S: AsyncWriteExt + Unpin>(
+pub async fn write_message<S: AsyncWriteExt>(
     stream: &mut S,
     message: Message<GenericHeader>,
 ) -> Result<(), IggyError> {
@@ -90,7 +90,7 @@ pub async fn write_message<S: AsyncWriteExt + Unpin>(
 /// Returns `IggyError::TcpError` on I/O errors.
 /// Returns `IggyError::InvalidCommand` if the header fails validation.
 #[allow(clippy::future_not_send)]
-pub async fn read_message<S: AsyncReadExt + Unpin>(
+pub async fn read_message<S: AsyncReadExt>(
     stream: &mut S,
     max_message_size: usize,
 ) -> Result<Message<GenericHeader>, IggyError> {
diff --git a/core/message_bus/src/transports/mod.rs 
b/core/message_bus/src/transports/mod.rs
index 157a5780a..76e1965f3 100644
--- a/core/message_bus/src/transports/mod.rs
+++ b/core/message_bus/src/transports/mod.rs
@@ -65,6 +65,7 @@
 //! plug in behind the same surface; see
 //! `Documents/silverhand/iggy/message_bus/transport-plan/`.
 
+pub mod quic;
 pub mod tcp;
 
 // Only `Conn` and `Writer` have crate-internal callers today
diff --git a/core/message_bus/src/transports/quic.rs 
b/core/message_bus/src/transports/quic.rs
new file mode 100644
index 000000000..a407b3954
--- /dev/null
+++ b/core/message_bus/src/transports/quic.rs
@@ -0,0 +1,483 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! QUIC impls of the [`super`] transport traits.
+//!
+//! SDK-client plane only; replica plane stays TCP forever (invariant I1).
+//! `compio-quic 0.7.2` binds `quinn-proto` natively to the compio runtime,
+//! so no tokio bridge (invariant I10).
+//!
+//! # Connection model
+//!
+//! One bidirectional stream per peer (no multiplexing). The connection's
+//! first `accept_bi` pair carries every consensus frame. The
+//! [`QuicTransportListener`] accept loop drives the QUIC handshake to
+//! completion AND drains the first bidi pair before yielding the
+//! [`QuicTransportConn`], so the owning shard's install path doesn't
+//! need an additional `await` boundary before data flows. See
+//! 
`Documents/silverhand/iggy/message_bus/transport-plan/designs/message-bus-quic-install.md`
+//! for the full rationale.
+//!
+//! # Zero-copy
+//!
+//! `compio_quic::SendStream::write<T: IoBuf>` accepts
+//! `Frozen<MESSAGE_ALIGN>` directly; `QuicTransportWriter::send_batch`
+//! loops one `write` per frame (QUIC has no `sendmmsg` analog). Per-message
+//! syscalls are the documented trade-off versus the TCP `writev` path; small
+//! high-RPS workloads stay on TCP. See plan §4 risks.
+//!
+//! # 0-RTT
+//!
+//! 0-RTT is off by default at the rustls layer (`max_early_data_size = 0`).
+//! `RecvStream::is_0rtt()` exists for defense-in-depth; the listener treats
+//! a `true` here as a misconfiguration and refuses the connection. Any
+//! future per-command 0-RTT enablement requires an audit per
+//! `transport-plan/designs/quic-0rtt-audit-template.md` (P4-T5).
+
+use super::{TransportConn, TransportListener, TransportReader, 
TransportWriter};
+use crate::framing;
+use compio::BufResult;
+use compio::io::AsyncWrite;
+use compio_quic::{
+    Connection, Endpoint, Incoming, RecvStream, SendStream, VarInt, congestion,
+    crypto::rustls::QuicServerConfig,
+};
+use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
+use iggy_binary_protocol::consensus::iobuf::Frozen;
+use iggy_binary_protocol::{GenericHeader, Message};
+use iggy_common::IggyError;
+use std::io;
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{debug, warn};
+
+/// ALPN protocol identifier carried by every Iggy QUIC client + server.
+///
+/// Bump the trailing version digits when the wire format changes (e.g. a
+/// new `GenericHeader` layout). See invariant I3 for the framing contract.
+pub const ALPN_IGGY_CONSENSUS_V1: &[u8] = b"iggy.consensus.v1";
+
+/// QUIC application close codes. Sized so the table fits a single cache
+/// line; matches the design doc allocation (P4-T2 §8).
+pub const QUIC_HANDSHAKE_FAILED: u32 = 0x1001;
+pub const QUIC_AUTH_FAILED: u32 = 0x1002;
+pub const QUIC_SHUTDOWN: u32 = 0x1003;
+pub const QUIC_PROTOCOL_VIOLATION: u32 = 0x1004;
+
+/// Idle-timeout default for client-plane connections. Matches the 30 s
+/// replica handshake / dedup window so operators see one number across
+/// the bus. Override at endpoint construction time if needed.
+const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
+
+/// Keepalive interval default. Half of [`DEFAULT_IDLE_TIMEOUT`]'s lower
+/// bound so a single dropped keepalive never closes the connection.
+const DEFAULT_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
+
+/// Stream / receive / send window default. Matches
+/// `MessageBusConfig::default().max_message_size = 64 MiB` so a single
+/// in-flight max-size frame fits without head-of-line wait.
+const DEFAULT_WINDOW: u32 = 64 * 1024 * 1024;
+
+/// Build a [`compio_quic::TransportConfig`] tuned for the SDK-client plane.
+///
+/// Defaults: 1 bidi stream per peer, 30 s idle timeout, CUBIC congestion
+/// (upstream default), 64 MiB send/receive windows. Settings match the
+/// audit's knob table (`findings/compio-quic-audit.md` §2).
+///
+/// # Panics
+///
+/// Panics only if `DEFAULT_IDLE_TIMEOUT` is changed to a value outside
+/// quinn-proto's `IdleTimeout` range (its `TryFrom<Duration>` rejects
+/// values above `VarInt::MAX_U64` ms). 30 s is well inside the range.
+#[must_use]
+pub fn default_transport_config() -> compio_quic::TransportConfig {
+    let mut cfg = compio_quic::TransportConfig::default();
+    cfg.max_concurrent_bidi_streams(VarInt::from_u32(1))
+        .max_concurrent_uni_streams(VarInt::from_u32(0))
+        .stream_receive_window(VarInt::from_u32(DEFAULT_WINDOW))
+        .receive_window(VarInt::from_u32(DEFAULT_WINDOW))
+        .send_window(u64::from(DEFAULT_WINDOW))
+        .max_idle_timeout(Some(
+            DEFAULT_IDLE_TIMEOUT
+                .try_into()
+                .expect("30 s fits in IdleTimeout"),
+        ))
+        .keep_alive_interval(Some(DEFAULT_KEEP_ALIVE_INTERVAL))
+        
.congestion_controller_factory(Arc::new(congestion::CubicConfig::default()));
+    cfg
+}
+
+/// Inbound QUIC listener.
+///
+/// Wraps a bound [`Endpoint`]. `accept` drives the QUIC handshake and the
+/// first `accept_bi` pair to completion before yielding a
+/// [`QuicTransportConn`], so the caller (`replica_listener::run`'s QUIC
+/// twin in P4-T2's design) hands a fully-set-up data plane to the install
+/// path.
+pub struct QuicTransportListener {
+    endpoint: Endpoint,
+}
+
+impl QuicTransportListener {
+    /// Wrap a pre-bound [`Endpoint`].
+    ///
+    /// Caller is responsible for the [`Endpoint`]'s ALPN set
+    /// ([`ALPN_IGGY_CONSENSUS_V1`]), [`compio_quic::ServerConfig`] crypto
+    /// material, and [`default_transport_config`] hookup.
+    #[must_use]
+    pub const fn new(endpoint: Endpoint) -> Self {
+        Self { endpoint }
+    }
+
+    /// Borrow the underlying endpoint for `local_addr`, `set_server_config`,
+    /// `close`, or `shutdown` calls.
+    #[must_use]
+    pub const fn endpoint(&self) -> &Endpoint {
+        &self.endpoint
+    }
+}
+
+impl TransportListener for QuicTransportListener {
+    type Conn = QuicTransportConn;
+
+    #[allow(clippy::future_not_send)]
+    async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)> {
+        loop {
+            let Some(incoming) = self.endpoint.wait_incoming().await else {
+                return Err(io::Error::new(
+                    io::ErrorKind::ConnectionAborted,
+                    "QUIC endpoint closed",
+                ));
+            };
+            match accept_one(incoming).await {
+                Ok(Some((conn, addr))) => return Ok((conn, addr)),
+                Ok(None) => {}
+                Err(e) => return Err(e),
+            }
+        }
+    }
+}
+
+#[allow(clippy::future_not_send)]
+async fn accept_one(incoming: Incoming) -> 
io::Result<Option<(QuicTransportConn, SocketAddr)>> {
+    let connecting = match incoming.accept() {
+        Ok(c) => c,
+        Err(e) => {
+            warn!("QUIC incoming.accept failed: {e}");
+            return Ok(None);
+        }
+    };
+    let connection = match connecting.await {
+        Ok(c) => c,
+        Err(e) => {
+            warn!("QUIC handshake failed: {e}");
+            return Ok(None);
+        }
+    };
+    let addr = connection.remote_address();
+
+    let (send, recv) = match connection.accept_bi().await {
+        Ok(streams) => streams,
+        Err(e) => {
+            warn!(%addr, "QUIC accept_bi failed: {e}");
+            connection.close(VarInt::from_u32(QUIC_HANDSHAKE_FAILED), 
b"accept_bi failed");
+            return Ok(None);
+        }
+    };
+
+    if recv.is_0rtt() {
+        warn!(
+            %addr,
+            "QUIC stream accepted in 0-RTT window; refusing (audit P4-T5 not 
run)"
+        );
+        connection.close(
+            VarInt::from_u32(QUIC_PROTOCOL_VIOLATION),
+            b"0-RTT not permitted",
+        );
+        return Ok(None);
+    }
+
+    debug!(%addr, "QUIC connection accepted, first bidi stream ready");
+    Ok(Some((
+        QuicTransportConn::new(connection, (send, recv)),
+        addr,
+    )))
+}
+
+/// A single QUIC connection plus its first bidirectional stream.
+///
+/// The owned `Connection` handle is retained on the writer half so
+/// graceful shutdown can fire `Connection::close(QUIC_SHUTDOWN, _)`
+/// independent of stream drop order.
+pub struct QuicTransportConn {
+    connection: Connection,
+    streams: (SendStream, RecvStream),
+}
+
+impl QuicTransportConn {
+    /// Construct from an already-established connection + bidi pair.
+    #[must_use]
+    pub const fn new(connection: Connection, streams: (SendStream, 
RecvStream)) -> Self {
+        Self {
+            connection,
+            streams,
+        }
+    }
+}
+
+impl TransportConn for QuicTransportConn {
+    type Reader = QuicTransportReader;
+    type Writer = QuicTransportWriter;
+
+    fn into_split(self) -> (Self::Reader, Self::Writer) {
+        let (send, recv) = self.streams;
+        (
+            QuicTransportReader { recv },
+            QuicTransportWriter {
+                send,
+                connection: self.connection,
+            },
+        )
+    }
+}
+
+/// Read half. Decodes one framed `Message<GenericHeader>` per call by
+/// reusing [`framing::read_message`] against the [`RecvStream`].
+pub struct QuicTransportReader {
+    recv: RecvStream,
+}
+
+impl TransportReader for QuicTransportReader {
+    #[allow(clippy::future_not_send)]
+    async fn read_message(
+        &mut self,
+        max_message_size: usize,
+    ) -> Result<Message<GenericHeader>, IggyError> {
+        framing::read_message(&mut self.recv, max_message_size).await
+    }
+}
+
+/// Write half. One [`SendStream::write`] per frame; the inner
+/// [`Connection`] handle is retained so graceful shutdown can issue
+/// `connection.close(QUIC_SHUTDOWN, _)` after the writer task drains.
+pub struct QuicTransportWriter {
+    send: SendStream,
+    connection: Connection,
+}
+
+impl QuicTransportWriter {
+    /// Borrow the underlying connection (for stats, graceful close).
+    #[must_use]
+    pub const fn connection(&self) -> &Connection {
+        &self.connection
+    }
+}
+
+impl TransportWriter for QuicTransportWriter {
+    #[allow(clippy::future_not_send)]
+    async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> 
io::Result<()> {
+        // Drain in order; one write per frame. compio-quic's
+        // SendStream::write takes T: IoBuf and Frozen<MESSAGE_ALIGN>
+        // implements IoBuf (binary_protocol::consensus::iobuf), so the
+        // hand-off is zero-copy. No batched syscall (no sendmmsg on
+        // QUIC); see plan §4 risks.
+        for buf in batch.drain(..) {
+            let BufResult(result, _frozen) = self.send.write(buf).await;
+            result.map_err(|e| io::Error::other(format!("QUIC send failed: 
{e}")))?;
+        }
+        // Flush the QUIC frame onto the wire before returning so the
+        // batch's atomic-or-error contract (TransportWriter doc) holds:
+        // a partial flush on cancellation must not advertise success.
+        self.send
+            .flush()
+            .await
+            .map_err(|e| io::Error::other(format!("QUIC flush failed: {e}")))
+    }
+}
+
+/// Build a [`compio_quic::ServerConfig`] from a single cert + key pair.
+///
+/// Sets the Iggy ALPN ([`ALPN_IGGY_CONSENSUS_V1`]) and the bus-tuned
+/// [`default_transport_config`]; otherwise inherits upstream defaults
+/// including `migration: true`.
+///
+/// # Errors
+///
+/// Returns the underlying `rustls::Error` on cert/key import failure.
+pub fn server_config_with_cert(
+    cert_chain: Vec<rustls::pki_types::CertificateDer<'static>>,
+    key_der: rustls::pki_types::PrivateKeyDer<'static>,
+) -> Result<compio_quic::ServerConfig, rustls::Error> {
+    let mut rustls_cfg = rustls::ServerConfig::builder()
+        .with_no_client_auth()
+        .with_single_cert(cert_chain, key_der)?;
+    rustls_cfg.alpn_protocols = vec![ALPN_IGGY_CONSENSUS_V1.to_vec()];
+    rustls_cfg.max_early_data_size = 0; // 0-RTT off (invariant I7)
+    let crypto = QuicServerConfig::try_from(rustls_cfg)
+        .map_err(|e| rustls::Error::General(format!("QUIC crypto: {e}")))?;
+    let mut server_cfg = 
compio_quic::ServerConfig::with_crypto(Arc::new(crypto));
+    server_cfg.transport_config(Arc::new(default_transport_config()));
+    Ok(server_cfg)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use compio_quic::ClientBuilder;
+    use iggy_binary_protocol::{Command2, HEADER_SIZE};
+    use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
+
+    fn install_crypto_provider() {
+        // Idempotent. Tests in the same process race on
+        // `install_default`, so swallow the second-installer error.
+        let _ = rustls::crypto::ring::default_provider().install_default();
+    }
+
+    fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> {
+        #[allow(clippy::cast_possible_truncation)]
+        Message::<GenericHeader>::new(HEADER_SIZE)
+            .transmute_header(|_, h: &mut GenericHeader| {
+                h.command = command;
+                h.size = HEADER_SIZE as u32;
+            })
+            .into_frozen()
+    }
+
+    fn self_signed() -> (CertificateDer<'static>, PrivateKeyDer<'static>) {
+        let cert = 
rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("rcgen");
+        let cert_der = CertificateDer::from(cert.cert);
+        let key_der: PrivateKeyDer<'static> =
+            PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der()).into();
+        (cert_der, key_der)
+    }
+
+    #[allow(clippy::future_not_send)]
+    async fn server_endpoint(
+        cert: CertificateDer<'static>,
+        key: PrivateKeyDer<'static>,
+    ) -> Endpoint {
+        let server_cfg = server_config_with_cert(vec![cert], 
key).expect("server config");
+        Endpoint::server("127.0.0.1:0", server_cfg)
+            .await
+            .expect("bind")
+    }
+
+    #[allow(clippy::future_not_send)]
+    async fn client_endpoint(server_cert: CertificateDer<'static>) -> Endpoint 
{
+        let mut builder = ClientBuilder::new_with_empty_roots()
+            .with_custom_certificate(server_cert)
+            .expect("trust cert")
+            .with_no_crls();
+        builder = builder.with_alpn_protocols(&["iggy.consensus.v1"]);
+        builder.bind("127.0.0.1:0").await.expect("client bind")
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn loopback_round_trip_three_frames() {
+        install_crypto_provider();
+        let (cert, key) = self_signed();
+        let server = server_endpoint(cert.clone(), key).await;
+        let server_addr = server.local_addr().unwrap();
+
+        let listener = QuicTransportListener::new(server);
+        let server_task = compio::runtime::spawn(async move {
+            let (conn, _peer) = listener.accept().await.expect("accept");
+            let (mut reader, _writer) = conn.into_split();
+            let a = reader
+                .read_message(framing::MAX_MESSAGE_SIZE)
+                .await
+                .unwrap();
+            let b = reader
+                .read_message(framing::MAX_MESSAGE_SIZE)
+                .await
+                .unwrap();
+            let c = reader
+                .read_message(framing::MAX_MESSAGE_SIZE)
+                .await
+                .unwrap();
+            (a.header().command, b.header().command, c.header().command)
+        });
+
+        let client = client_endpoint(cert).await;
+        let connecting = client
+            .connect(server_addr, "localhost", None)
+            .expect("connect");
+        let connection = connecting.await.expect("client handshake");
+        let (send, recv) = connection.open_bi_wait().await.expect("open_bi");
+        let conn = QuicTransportConn::new(connection, (send, recv));
+        let (_r, mut w) = conn.into_split();
+
+        let mut batch = vec![
+            header_only(Command2::Ping),
+            header_only(Command2::Prepare),
+            header_only(Command2::Request),
+        ];
+        w.send_batch(&mut batch).await.expect("send_batch");
+        assert!(batch.is_empty(), "Vec must be drained on success");
+
+        let (a, b, c) = server_task.await.unwrap();
+        assert_eq!(a, Command2::Ping);
+        assert_eq!(b, Command2::Prepare);
+        assert_eq!(c, Command2::Request);
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn read_message_reports_oversize_via_trait() {
+        install_crypto_provider();
+        let (cert, key) = self_signed();
+        let server = server_endpoint(cert.clone(), key).await;
+        let server_addr = server.local_addr().unwrap();
+
+        let listener = QuicTransportListener::new(server);
+        let server_task = compio::runtime::spawn(async move {
+            let (conn, _peer) = listener.accept().await.expect("accept");
+            let (mut reader, _writer) = conn.into_split();
+            reader.read_message(framing::MAX_MESSAGE_SIZE).await
+        });
+
+        let client = client_endpoint(cert).await;
+        let connecting = client
+            .connect(server_addr, "localhost", None)
+            .expect("connect");
+        let connection = connecting.await.expect("handshake");
+        let (mut send, recv) = 
connection.open_bi_wait().await.expect("open_bi");
+        // Bogus oversize size field at offset 48.
+        let mut buf = vec![0u8; HEADER_SIZE];
+        let bogus = u32::try_from(framing::MAX_MESSAGE_SIZE + 1)
+            .unwrap_or(u32::MAX)
+            .to_le_bytes();
+        buf[48..52].copy_from_slice(&bogus);
+        let BufResult(result, _) = send.write(buf).await;
+        result.expect("write");
+        send.flush().await.expect("flush");
+        drop(send); // signal end-of-stream so reader sees the framed bytes
+        drop(recv);
+
+        let res = server_task.await.unwrap();
+        assert!(matches!(res, Err(IggyError::InvalidCommand)));
+    }
+
+    #[test]
+    fn default_transport_config_caps_streams_at_one() {
+        // Lightweight smoke: confirm the helper builds without panicking
+        // and the underlying defaults the audit named are reachable.
+        let _cfg = default_transport_config();
+    }
+}
diff --git a/core/server-ng/src/session_manager.rs 
b/core/server-ng/src/session_manager.rs
index b89ae4a00..6fead47a4 100644
--- a/core/server-ng/src/session_manager.rs
+++ b/core/server-ng/src/session_manager.rs
@@ -23,7 +23,7 @@
 //! Each connection goes through: `connect → login → register → bound`.
 //!
 //! The [`SessionManager`] is the server-side counterpart of the SDK's
-//! session lifecycle. It does **not** own the [`ClientTable`]. That lives
+//! session lifecycle. It does **not** own the `ClientTable`. That lives
 //! in the consensus layer. This module tracks the binding between a
 //! transport connection and the consensus-level `(client_id, session)` pair.
 

Reply via email to