This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6f99acdadc4c94830c27e623123a9058fa844c27 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 11:22:49 2026 +0200 feat(message_bus): add QUIC transport for SDK-client plane The client plane was TCP-only. Phase 4 of the transport plan calls for QUIC parity with legacy `core/server`; this lands the message- bus side behind the existing `Transport*` trait family. `QuicTransport{Listener,Conn,Reader,Writer}` in `transports/quic.rs` wrap `compio-quic 0.7.2` so the runtime stays single-stack (no tokio bridge, invariant I10). The listener accepts the connection AND drains the first bidi `accept_bi` pair before yielding, so the install path on the owning shard sees a fully- established data plane. Reusing `framing::read_message` against `RecvStream` required dropping the `Unpin` bound from the framing helpers; `RecvStream` is `!Unpin` in compio-quic. The bound was incidental - neither `read_exact` nor `write_all` actually need it. QUIC is always-on, not feature-gated, matching legacy server's posture and shrinking the build matrix. Defenses: 0-RTT off at the rustls layer (`max_early_data_size = 0`); listener refuses any 0-RTT-tagged stream with `QUIC_PROTOCOL_VIOLATION`. ALPN `iggy.consensus.v1`. Default `TransportConfig`: 1 bidi/peer, 30 s idle timeout, 64 MiB windows, CUBIC congestion (upstream default). Tag-along: demote the only stray broken intra-doc link (`ClientTable` in `core/server-ng/src/session_manager.rs:26`, inherited from master) to a code span so `cargo doc` runs warning- free on this branch. Tests: +3 in `transports::quic::tests` (round-trip three frames, oversize-frame surfacing `IggyError::InvalidCommand`, default config smoke). 123/123 pass on `cargo nextest run -p message_bus -p server-ng`. Workspace clippy + cargo doc clean. --- Cargo.lock | 3 + Cargo.toml | 1 + core/message_bus/Cargo.toml | 3 + core/message_bus/src/framing.rs | 4 +- core/message_bus/src/transports/mod.rs | 1 + core/message_bus/src/transports/quic.rs | 483 ++++++++++++++++++++++++++++++++ core/server-ng/src/session_manager.rs | 2 +- 7 files changed, 494 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af920f2d3..cb1463495 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7011,11 +7011,14 @@ dependencies = [ "async-channel", "blake3", "compio", + "compio-quic", "futures", "iggy_binary_protocol", "iggy_common", "libc", "rand 0.10.1", + "rcgen", + "rustls", "socket2 0.6.3", "tempfile", "thiserror 2.0.18", diff --git a/Cargo.toml b/Cargo.toml index dbb14f5ff..87d8878d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ compio = { version = "0.18.0", features = [ compio-buf = "0.8.1" # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668) compio-driver = "0.11.4" +compio-quic = "=0.7.2" configs = { path = "core/configs", version = "0.1.0" } configs_derive = { path = "core/configs_derive", version = "0.1.0" } consensus = { path = "core/consensus" } diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml index 6fa1ae1f4..ace090f16 100644 --- a/core/message_bus/Cargo.toml +++ b/core/message_bus/Cargo.toml @@ -32,16 +32,19 @@ publish = false async-channel = { workspace = true } blake3 = { workspace = true } compio = { workspace = true } +compio-quic = { workspace = true } futures = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } libc = { workspace = true } rand = { workspace = true } +rustls = { workspace = true } socket2 = { workspace = true, features = ["all"] } thiserror = { workspace = true } tracing = { workspace = true } [dev-dependencies] +rcgen = { workspace = true } tempfile = { workspace = true } [lints.clippy] diff --git a/core/message_bus/src/framing.rs b/core/message_bus/src/framing.rs index 0f3af8576..315234e32 100644 --- a/core/message_bus/src/framing.rs +++ b/core/message_bus/src/framing.rs @@ -61,7 +61,7 @@ const _: () = { /// /// Returns `IggyError::TcpError` if the write fails. #[allow(clippy::future_not_send)] -pub async fn write_message<S: AsyncWriteExt + Unpin>( +pub async fn write_message<S: AsyncWriteExt>( stream: &mut S, message: Message<GenericHeader>, ) -> Result<(), IggyError> { @@ -90,7 +90,7 @@ pub async fn write_message<S: AsyncWriteExt + Unpin>( /// Returns `IggyError::TcpError` on I/O errors. /// Returns `IggyError::InvalidCommand` if the header fails validation. #[allow(clippy::future_not_send)] -pub async fn read_message<S: AsyncReadExt + Unpin>( +pub async fn read_message<S: AsyncReadExt>( stream: &mut S, max_message_size: usize, ) -> Result<Message<GenericHeader>, IggyError> { diff --git a/core/message_bus/src/transports/mod.rs b/core/message_bus/src/transports/mod.rs index 157a5780a..76e1965f3 100644 --- a/core/message_bus/src/transports/mod.rs +++ b/core/message_bus/src/transports/mod.rs @@ -65,6 +65,7 @@ //! plug in behind the same surface; see //! `Documents/silverhand/iggy/message_bus/transport-plan/`. +pub mod quic; pub mod tcp; // Only `Conn` and `Writer` have crate-internal callers today diff --git a/core/message_bus/src/transports/quic.rs b/core/message_bus/src/transports/quic.rs new file mode 100644 index 000000000..a407b3954 --- /dev/null +++ b/core/message_bus/src/transports/quic.rs @@ -0,0 +1,483 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! QUIC impls of the [`super`] transport traits. +//! +//! SDK-client plane only; replica plane stays TCP forever (invariant I1). +//! `compio-quic 0.7.2` binds `quinn-proto` natively to the compio runtime, +//! so no tokio bridge (invariant I10). +//! +//! # Connection model +//! +//! One bidirectional stream per peer (no multiplexing). The connection's +//! first `accept_bi` pair carries every consensus frame. The +//! [`QuicTransportListener`] accept loop drives the QUIC handshake to +//! completion AND drains the first bidi pair before yielding the +//! [`QuicTransportConn`], so the owning shard's install path doesn't +//! need an additional `await` boundary before data flows. See +//! `Documents/silverhand/iggy/message_bus/transport-plan/designs/message-bus-quic-install.md` +//! for the full rationale. +//! +//! # Zero-copy +//! +//! `compio_quic::SendStream::write<T: IoBuf>` accepts +//! `Frozen<MESSAGE_ALIGN>` directly; `QuicTransportWriter::send_batch` +//! loops one `write` per frame (QUIC has no `sendmmsg` analog). Per-message +//! syscalls are the documented trade-off versus the TCP `writev` path; small +//! high-RPS workloads stay on TCP. See plan §4 risks. +//! +//! # 0-RTT +//! +//! 0-RTT is off by default at the rustls layer (`max_early_data_size = 0`). +//! `RecvStream::is_0rtt()` exists for defense-in-depth; the listener treats +//! a `true` here as a misconfiguration and refuses the connection. Any +//! future per-command 0-RTT enablement requires an audit per +//! `transport-plan/designs/quic-0rtt-audit-template.md` (P4-T5). + +use super::{TransportConn, TransportListener, TransportReader, TransportWriter}; +use crate::framing; +use compio::BufResult; +use compio::io::AsyncWrite; +use compio_quic::{ + Connection, Endpoint, Incoming, RecvStream, SendStream, VarInt, congestion, + crypto::rustls::QuicServerConfig, +}; +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::consensus::iobuf::Frozen; +use iggy_binary_protocol::{GenericHeader, Message}; +use iggy_common::IggyError; +use std::io; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, warn}; + +/// ALPN protocol identifier carried by every Iggy QUIC client + server. +/// +/// Bump the trailing version digits when the wire format changes (e.g. a +/// new `GenericHeader` layout). See invariant I3 for the framing contract. +pub const ALPN_IGGY_CONSENSUS_V1: &[u8] = b"iggy.consensus.v1"; + +/// QUIC application close codes. Sized so the table fits a single cache +/// line; matches the design doc allocation (P4-T2 §8). +pub const QUIC_HANDSHAKE_FAILED: u32 = 0x1001; +pub const QUIC_AUTH_FAILED: u32 = 0x1002; +pub const QUIC_SHUTDOWN: u32 = 0x1003; +pub const QUIC_PROTOCOL_VIOLATION: u32 = 0x1004; + +/// Idle-timeout default for client-plane connections. Matches the 30 s +/// replica handshake / dedup window so operators see one number across +/// the bus. Override at endpoint construction time if needed. +const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30); + +/// Keepalive interval default. Half of [`DEFAULT_IDLE_TIMEOUT`]'s lower +/// bound so a single dropped keepalive never closes the connection. +const DEFAULT_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); + +/// Stream / receive / send window default. Matches +/// `MessageBusConfig::default().max_message_size = 64 MiB` so a single +/// in-flight max-size frame fits without head-of-line wait. +const DEFAULT_WINDOW: u32 = 64 * 1024 * 1024; + +/// Build a [`compio_quic::TransportConfig`] tuned for the SDK-client plane. +/// +/// Defaults: 1 bidi stream per peer, 30 s idle timeout, CUBIC congestion +/// (upstream default), 64 MiB send/receive windows. Settings match the +/// audit's knob table (`findings/compio-quic-audit.md` §2). +/// +/// # Panics +/// +/// Panics only if `DEFAULT_IDLE_TIMEOUT` is changed to a value outside +/// quinn-proto's `IdleTimeout` range (its `TryFrom<Duration>` rejects +/// values above `VarInt::MAX_U64` ms). 30 s is well inside the range. +#[must_use] +pub fn default_transport_config() -> compio_quic::TransportConfig { + let mut cfg = compio_quic::TransportConfig::default(); + cfg.max_concurrent_bidi_streams(VarInt::from_u32(1)) + .max_concurrent_uni_streams(VarInt::from_u32(0)) + .stream_receive_window(VarInt::from_u32(DEFAULT_WINDOW)) + .receive_window(VarInt::from_u32(DEFAULT_WINDOW)) + .send_window(u64::from(DEFAULT_WINDOW)) + .max_idle_timeout(Some( + DEFAULT_IDLE_TIMEOUT + .try_into() + .expect("30 s fits in IdleTimeout"), + )) + .keep_alive_interval(Some(DEFAULT_KEEP_ALIVE_INTERVAL)) + .congestion_controller_factory(Arc::new(congestion::CubicConfig::default())); + cfg +} + +/// Inbound QUIC listener. +/// +/// Wraps a bound [`Endpoint`]. `accept` drives the QUIC handshake and the +/// first `accept_bi` pair to completion before yielding a +/// [`QuicTransportConn`], so the caller (`replica_listener::run`'s QUIC +/// twin in P4-T2's design) hands a fully-set-up data plane to the install +/// path. +pub struct QuicTransportListener { + endpoint: Endpoint, +} + +impl QuicTransportListener { + /// Wrap a pre-bound [`Endpoint`]. + /// + /// Caller is responsible for the [`Endpoint`]'s ALPN set + /// ([`ALPN_IGGY_CONSENSUS_V1`]), [`compio_quic::ServerConfig`] crypto + /// material, and [`default_transport_config`] hookup. + #[must_use] + pub const fn new(endpoint: Endpoint) -> Self { + Self { endpoint } + } + + /// Borrow the underlying endpoint for `local_addr`, `set_server_config`, + /// `close`, or `shutdown` calls. + #[must_use] + pub const fn endpoint(&self) -> &Endpoint { + &self.endpoint + } +} + +impl TransportListener for QuicTransportListener { + type Conn = QuicTransportConn; + + #[allow(clippy::future_not_send)] + async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)> { + loop { + let Some(incoming) = self.endpoint.wait_incoming().await else { + return Err(io::Error::new( + io::ErrorKind::ConnectionAborted, + "QUIC endpoint closed", + )); + }; + match accept_one(incoming).await { + Ok(Some((conn, addr))) => return Ok((conn, addr)), + Ok(None) => {} + Err(e) => return Err(e), + } + } + } +} + +#[allow(clippy::future_not_send)] +async fn accept_one(incoming: Incoming) -> io::Result<Option<(QuicTransportConn, SocketAddr)>> { + let connecting = match incoming.accept() { + Ok(c) => c, + Err(e) => { + warn!("QUIC incoming.accept failed: {e}"); + return Ok(None); + } + }; + let connection = match connecting.await { + Ok(c) => c, + Err(e) => { + warn!("QUIC handshake failed: {e}"); + return Ok(None); + } + }; + let addr = connection.remote_address(); + + let (send, recv) = match connection.accept_bi().await { + Ok(streams) => streams, + Err(e) => { + warn!(%addr, "QUIC accept_bi failed: {e}"); + connection.close(VarInt::from_u32(QUIC_HANDSHAKE_FAILED), b"accept_bi failed"); + return Ok(None); + } + }; + + if recv.is_0rtt() { + warn!( + %addr, + "QUIC stream accepted in 0-RTT window; refusing (audit P4-T5 not run)" + ); + connection.close( + VarInt::from_u32(QUIC_PROTOCOL_VIOLATION), + b"0-RTT not permitted", + ); + return Ok(None); + } + + debug!(%addr, "QUIC connection accepted, first bidi stream ready"); + Ok(Some(( + QuicTransportConn::new(connection, (send, recv)), + addr, + ))) +} + +/// A single QUIC connection plus its first bidirectional stream. +/// +/// The owned `Connection` handle is retained on the writer half so +/// graceful shutdown can fire `Connection::close(QUIC_SHUTDOWN, _)` +/// independent of stream drop order. +pub struct QuicTransportConn { + connection: Connection, + streams: (SendStream, RecvStream), +} + +impl QuicTransportConn { + /// Construct from an already-established connection + bidi pair. + #[must_use] + pub const fn new(connection: Connection, streams: (SendStream, RecvStream)) -> Self { + Self { + connection, + streams, + } + } +} + +impl TransportConn for QuicTransportConn { + type Reader = QuicTransportReader; + type Writer = QuicTransportWriter; + + fn into_split(self) -> (Self::Reader, Self::Writer) { + let (send, recv) = self.streams; + ( + QuicTransportReader { recv }, + QuicTransportWriter { + send, + connection: self.connection, + }, + ) + } +} + +/// Read half. Decodes one framed `Message<GenericHeader>` per call by +/// reusing [`framing::read_message`] against the [`RecvStream`]. +pub struct QuicTransportReader { + recv: RecvStream, +} + +impl TransportReader for QuicTransportReader { + #[allow(clippy::future_not_send)] + async fn read_message( + &mut self, + max_message_size: usize, + ) -> Result<Message<GenericHeader>, IggyError> { + framing::read_message(&mut self.recv, max_message_size).await + } +} + +/// Write half. One [`SendStream::write`] per frame; the inner +/// [`Connection`] handle is retained so graceful shutdown can issue +/// `connection.close(QUIC_SHUTDOWN, _)` after the writer task drains. +pub struct QuicTransportWriter { + send: SendStream, + connection: Connection, +} + +impl QuicTransportWriter { + /// Borrow the underlying connection (for stats, graceful close). + #[must_use] + pub const fn connection(&self) -> &Connection { + &self.connection + } +} + +impl TransportWriter for QuicTransportWriter { + #[allow(clippy::future_not_send)] + async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> io::Result<()> { + // Drain in order; one write per frame. compio-quic's + // SendStream::write takes T: IoBuf and Frozen<MESSAGE_ALIGN> + // implements IoBuf (binary_protocol::consensus::iobuf), so the + // hand-off is zero-copy. No batched syscall (no sendmmsg on + // QUIC); see plan §4 risks. + for buf in batch.drain(..) { + let BufResult(result, _frozen) = self.send.write(buf).await; + result.map_err(|e| io::Error::other(format!("QUIC send failed: {e}")))?; + } + // Flush the QUIC frame onto the wire before returning so the + // batch's atomic-or-error contract (TransportWriter doc) holds: + // a partial flush on cancellation must not advertise success. + self.send + .flush() + .await + .map_err(|e| io::Error::other(format!("QUIC flush failed: {e}"))) + } +} + +/// Build a [`compio_quic::ServerConfig`] from a single cert + key pair. +/// +/// Sets the Iggy ALPN ([`ALPN_IGGY_CONSENSUS_V1`]) and the bus-tuned +/// [`default_transport_config`]; otherwise inherits upstream defaults +/// including `migration: true`. +/// +/// # Errors +/// +/// Returns the underlying `rustls::Error` on cert/key import failure. +pub fn server_config_with_cert( + cert_chain: Vec<rustls::pki_types::CertificateDer<'static>>, + key_der: rustls::pki_types::PrivateKeyDer<'static>, +) -> Result<compio_quic::ServerConfig, rustls::Error> { + let mut rustls_cfg = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(cert_chain, key_der)?; + rustls_cfg.alpn_protocols = vec![ALPN_IGGY_CONSENSUS_V1.to_vec()]; + rustls_cfg.max_early_data_size = 0; // 0-RTT off (invariant I7) + let crypto = QuicServerConfig::try_from(rustls_cfg) + .map_err(|e| rustls::Error::General(format!("QUIC crypto: {e}")))?; + let mut server_cfg = compio_quic::ServerConfig::with_crypto(Arc::new(crypto)); + server_cfg.transport_config(Arc::new(default_transport_config())); + Ok(server_cfg) +} + +#[cfg(test)] +mod tests { + use super::*; + use compio_quic::ClientBuilder; + use iggy_binary_protocol::{Command2, HEADER_SIZE}; + use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; + + fn install_crypto_provider() { + // Idempotent. Tests in the same process race on + // `install_default`, so swallow the second-installer error. + let _ = rustls::crypto::ring::default_provider().install_default(); + } + + fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> { + #[allow(clippy::cast_possible_truncation)] + Message::<GenericHeader>::new(HEADER_SIZE) + .transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.size = HEADER_SIZE as u32; + }) + .into_frozen() + } + + fn self_signed() -> (CertificateDer<'static>, PrivateKeyDer<'static>) { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("rcgen"); + let cert_der = CertificateDer::from(cert.cert); + let key_der: PrivateKeyDer<'static> = + PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der()).into(); + (cert_der, key_der) + } + + #[allow(clippy::future_not_send)] + async fn server_endpoint( + cert: CertificateDer<'static>, + key: PrivateKeyDer<'static>, + ) -> Endpoint { + let server_cfg = server_config_with_cert(vec![cert], key).expect("server config"); + Endpoint::server("127.0.0.1:0", server_cfg) + .await + .expect("bind") + } + + #[allow(clippy::future_not_send)] + async fn client_endpoint(server_cert: CertificateDer<'static>) -> Endpoint { + let mut builder = ClientBuilder::new_with_empty_roots() + .with_custom_certificate(server_cert) + .expect("trust cert") + .with_no_crls(); + builder = builder.with_alpn_protocols(&["iggy.consensus.v1"]); + builder.bind("127.0.0.1:0").await.expect("client bind") + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn loopback_round_trip_three_frames() { + install_crypto_provider(); + let (cert, key) = self_signed(); + let server = server_endpoint(cert.clone(), key).await; + let server_addr = server.local_addr().unwrap(); + + let listener = QuicTransportListener::new(server); + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = listener.accept().await.expect("accept"); + let (mut reader, _writer) = conn.into_split(); + let a = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + let b = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + let c = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + (a.header().command, b.header().command, c.header().command) + }); + + let client = client_endpoint(cert).await; + let connecting = client + .connect(server_addr, "localhost", None) + .expect("connect"); + let connection = connecting.await.expect("client handshake"); + let (send, recv) = connection.open_bi_wait().await.expect("open_bi"); + let conn = QuicTransportConn::new(connection, (send, recv)); + let (_r, mut w) = conn.into_split(); + + let mut batch = vec![ + header_only(Command2::Ping), + header_only(Command2::Prepare), + header_only(Command2::Request), + ]; + w.send_batch(&mut batch).await.expect("send_batch"); + assert!(batch.is_empty(), "Vec must be drained on success"); + + let (a, b, c) = server_task.await.unwrap(); + assert_eq!(a, Command2::Ping); + assert_eq!(b, Command2::Prepare); + assert_eq!(c, Command2::Request); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn read_message_reports_oversize_via_trait() { + install_crypto_provider(); + let (cert, key) = self_signed(); + let server = server_endpoint(cert.clone(), key).await; + let server_addr = server.local_addr().unwrap(); + + let listener = QuicTransportListener::new(server); + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = listener.accept().await.expect("accept"); + let (mut reader, _writer) = conn.into_split(); + reader.read_message(framing::MAX_MESSAGE_SIZE).await + }); + + let client = client_endpoint(cert).await; + let connecting = client + .connect(server_addr, "localhost", None) + .expect("connect"); + let connection = connecting.await.expect("handshake"); + let (mut send, recv) = connection.open_bi_wait().await.expect("open_bi"); + // Bogus oversize size field at offset 48. + let mut buf = vec![0u8; HEADER_SIZE]; + let bogus = u32::try_from(framing::MAX_MESSAGE_SIZE + 1) + .unwrap_or(u32::MAX) + .to_le_bytes(); + buf[48..52].copy_from_slice(&bogus); + let BufResult(result, _) = send.write(buf).await; + result.expect("write"); + send.flush().await.expect("flush"); + drop(send); // signal end-of-stream so reader sees the framed bytes + drop(recv); + + let res = server_task.await.unwrap(); + assert!(matches!(res, Err(IggyError::InvalidCommand))); + } + + #[test] + fn default_transport_config_caps_streams_at_one() { + // Lightweight smoke: confirm the helper builds without panicking + // and the underlying defaults the audit named are reachable. + let _cfg = default_transport_config(); + } +} diff --git a/core/server-ng/src/session_manager.rs b/core/server-ng/src/session_manager.rs index b89ae4a00..6fead47a4 100644 --- a/core/server-ng/src/session_manager.rs +++ b/core/server-ng/src/session_manager.rs @@ -23,7 +23,7 @@ //! Each connection goes through: `connect → login → register → bound`. //! //! The [`SessionManager`] is the server-side counterpart of the SDK's -//! session lifecycle. It does **not** own the [`ClientTable`]. That lives +//! session lifecycle. It does **not** own the `ClientTable`. That lives //! in the consensus layer. This module tracks the binding between a //! transport connection and the consensus-level `(client_id, session)` pair.
