This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0d626e7b711cd83cb886307f7e4db0e495e2f3c1 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 14:17:27 2026 +0200 refactor(message_bus): split WS dispatcher into reader/writer tasks Phase-alternating dispatcher could not drive client -> server -> client request/reply round trips: Phase B's stream.read().await blocked the loop, outbound queue activity could not wake it, and select!-cancelling the read collided with both tungstenite's single state machine and compio's buffer-drop rule. Replace with a two-task split. WsTransportConn::into_split unwraps the post-handshake WebSocketStream<TcpStream> down to the bare TcpStream via the compio_ws -> tungstenite::WebSocket -> compio_io::compat::SyncStream chain (recovering BufRead::fill_buf leftover bytes defensively per RFC 6455 §4.1), splits TCP into owned read/write halves, and spawns: - reader_task: persistent Vec<u8> accumulator, parse_one_frame loop over tungstenite::FrameHeader::parse, role-aware mask validation, control_tx for inbound Ping payloads + peer-Close. - writer_task: select_biased! over shutdown / control_rx / out_rx; server-role outbound is two-write zero-copy (header scratch then Frozen<MESSAGE_ALIGN> body as IoBuf), client-role outbound is single-write copy (header + masked body in one scratch). In-house apply_mask (4-line XOR per RFC 6455 §5.3; tungstenite's apply_mask is pub(crate)). Per-conn Shutdown coordinates teardown between halves. Constructor split: WsTransportConn::new_server / new_client. Default new removed; installer::install_client_ws_stream + WsTransportListener take server role, test-side dialer takes client role. tests/ws_client_roundtrip.rs upgraded to drive a full Request -> Reply round trip via bus.send_to_client; the side-channel-signal workaround is dropped. transports/ws.rs gains 10 new unit tests covering server- push, 16 MiB body, mask round-trip, role-aware mask rejection, frag rejection, Text rejection, partial parse, Pong consumption, and peer- initiated Close. Drive-by: demote two broken intra-doc links in client_listener_ws.rs (ShardFramePayload across-crate; install_client_ws_fd is a trait method) to clean up cargo doc warnings introduced in P5-T3. 157/157 tests pass on cargo nextest run -p message_bus -p server-ng -p shard. Workspace clippy + cargo doc clean. No invariant impact (I3 header byte-identical, I5 fd-delegation TCP-only preserved, I8 zero- copy Frozen preserved on server outbound, I10 no tokio bridge). --- core/message_bus/src/client_listener_ws.rs | 4 +- core/message_bus/src/installer.rs | 7 +- core/message_bus/src/transports/ws.rs | 915 +++++++++++++++++++++----- core/message_bus/tests/ws_client_roundtrip.rs | 64 +- 4 files changed, 783 insertions(+), 207 deletions(-) diff --git a/core/message_bus/src/client_listener_ws.rs b/core/message_bus/src/client_listener_ws.rs index 50b859c17..bebb8cc3a 100644 --- a/core/message_bus/src/client_listener_ws.rs +++ b/core/message_bus/src/client_listener_ws.rs @@ -20,10 +20,10 @@ //! Runs only on shard 0. The accept loop performs no protocol work //! beyond `TcpListener::accept`: every accepted stream is handed //! verbatim to the supplied callback, which dups the fd and ships a -//! [`ShardFramePayload::ClientWsConnectionSetup`] frame to the +//! `ShardFramePayload::ClientWsConnectionSetup` frame to the //! round-robin-selected target shard. The HTTP-Upgrade handshake (and //! the `iggy.consensus.v1` subprotocol enforcement) runs on the owning -//! shard inside [`crate::installer::install_client_ws_fd`]. +//! shard inside [`crate::ConnectionInstaller::install_client_ws_fd`]. //! //! The fd at ship-time is plain TCP; invariant I5 (fd-delegation is //! TCP-only) holds. The WS state machine only materialises after the diff --git a/core/message_bus/src/installer.rs b/core/message_bus/src/installer.rs index f7fc7b861..91ff38ddd 100644 --- a/core/message_bus/src/installer.rs +++ b/core/message_bus/src/installer.rs @@ -433,7 +433,12 @@ pub fn install_client_ws_stream( stream: compio_ws::WebSocketStream<TcpStream>, on_request: RequestHandler, ) { - install_client_conn(bus, client_id, WsTransportConn::new(stream), on_request); + install_client_conn( + bus, + client_id, + WsTransportConn::new_server(stream), + on_request, + ); } /// Install a pre-wrapped client connection on the bus. Generic over diff --git a/core/message_bus/src/transports/ws.rs b/core/message_bus/src/transports/ws.rs index 9a52a40de..277f9cf71 100644 --- a/core/message_bus/src/transports/ws.rs +++ b/core/message_bus/src/transports/ws.rs @@ -18,8 +18,11 @@ //! WebSocket impls of the [`super`] transport traits. //! //! SDK-client plane only; replica plane stays TCP forever (invariant I1). -//! `compio-ws 0.3.1` binds `tungstenite`'s protocol state machine to -//! compio I/O directly, no tokio bridge (invariant I10). +//! `compio-ws 0.3.1` drives the HTTP-Upgrade handshake; after that the +//! [`compio_ws::WebSocketStream`] is unwrapped down to the bare +//! [`compio::net::TcpStream`] and split into owned read / write halves +//! handed to a per-connection reader / writer task pair. No tokio bridge +//! (invariant I10). //! //! # Connection model //! @@ -29,64 +32,72 @@ //! extensions) come from the WS spec layered on top of our 256 B //! framing invariant (I3). //! -//! # Stream split via actor pattern +//! # Two-task split //! -//! `tungstenite`'s state machine spans both directions: control frames -//! (Ping/Pong/Close) and the masked-from-client/unmasked-from-server -//! rules force the read and write halves to cooperate on one -//! `WebSocketStream` instance. We cannot satisfy -//! [`TransportConn::into_split`]'s ownership-split contract by handing -//! out two halves of the underlying TCP connection. +//! `tungstenite::WebSocket` holds a single state machine that spans both +//! directions on a `&mut self`, and `compio_ws::WebSocketStream::read` +//! cannot be cancelled across its internal `fill_read_buf().await` +//! without risking the in-flight buffer. Either constraint alone +//! precludes a `select!`-cancelling read + write race on a shared +//! stream. //! -//! Instead [`WsTransportConn::into_split`] spawns a **dispatcher task** -//! that owns the [`WebSocketStream`] and bridges two -//! `async_channel::bounded` queues: +//! [`WsTransportConn::into_split`] therefore unwraps the post-handshake +//! `WebSocketStream<TcpStream>` to the bare `TcpStream`, splits it into +//! `(OwnedReadHalf, OwnedWriteHalf)`, and spawns: //! -//! - inbound: dispatcher reads frames, pushes parsed -//! `Message<GenericHeader>` to the reader's queue. -//! - outbound: dispatcher pulls `Frozen<MESSAGE_ALIGN>` from the -//! writer's queue, wraps in a binary WS frame, sends. +//! - **reader task**: drains bytes from `OwnedReadHalf` into a persistent +//! accumulator, parses WS frames via [`tungstenite::protocol::frame::FrameHeader`], +//! pushes [`Message<GenericHeader>`] frames to the inbound queue, +//! forwards Ping payloads to the writer via the control channel. +//! - **writer task**: pulls outbound consensus frames + control replies +//! (Pong, Close), encodes them with the appropriate WS framing +//! (server unmasked, client masked per RFC 6455), writes to +//! `OwnedWriteHalf`. //! -//! Per-frame channel hop costs one queue push + one queue pop. Single- -//! threaded compio means the hop is a noop scheduling-wise; the cost is -//! one bounded buffer slot per inflight frame, which the per-peer queue -//! capacity already pays for on the bus side. +//! Reader and writer share no `&mut` state, so a parked read never +//! blocks an outbound send. Per-connection [`Shutdown`] coordinates +//! teardown: either task triggers it on EOF / I/O error / framing +//! violation, the other observes via [`ShutdownToken::wait`] and exits. //! -//! # Zero-copy payload +//! # Zero-copy on the server-side outbound path //! -//! `tungstenite::Message::Binary` takes `bytes::Bytes`. Wrapping -//! `Frozen<MESSAGE_ALIGN>` via [`bytes::Bytes::from_owner`] is -//! O(1) Arc-allocation, no memcpy. The 256 B header + body slice is -//! handed to the WS frame writer as-is. +//! Server-role writes split each frame into two compio submissions: the +//! ≤ 14 B WS header from a small scratch [`Vec<u8>`], then the +//! [`Frozen<MESSAGE_ALIGN>`] payload as an `IoBuf` directly. No +//! intermediate copy of the body. Client-role writes (test path) build a +//! single `Vec<u8>` containing the header + masked body, paying one copy +//! per frame. +//! +//! # Post-handshake leftover bytes +//! +//! RFC 6455 §4.1 forbids clients sending data frames before receiving +//! the 101 response. Defensively, [`WsTransportConn::into_split`] reads +//! any bytes already buffered in the `compio_io::compat::SyncStream` +//! (via [`std::io::BufRead::fill_buf`]) before unwrapping to the bare +//! `TcpStream`, and seeds the reader's accumulator with them. Expected +//! to be empty in production traffic; non-empty case logged at debug. use super::{TransportConn, TransportListener, TransportReader, TransportWriter}; use crate::framing; +use crate::lifecycle::{Shutdown, ShutdownToken}; use async_channel::{Receiver, Sender, bounded}; use bytes::Bytes; -use compio::net::{TcpListener, TcpStream}; +use compio::BufResult; +use compio::buf::IntoInner; +use compio::io::{AsyncRead, AsyncWriteExt}; +use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpListener, TcpStream}; use compio_ws::WebSocketStream; +use futures::FutureExt; use iggy_binary_protocol::consensus::MESSAGE_ALIGN; use iggy_binary_protocol::consensus::iobuf::Frozen; use iggy_binary_protocol::{GenericHeader, Message}; use iggy_common::IggyError; -use std::io; +use std::io::{self, BufRead, Cursor}; use std::net::SocketAddr; +use std::rc::Rc; use tracing::{debug, warn}; -use tungstenite::protocol::CloseFrame; -use tungstenite::protocol::frame::coding::CloseCode; - -/// Newtype wrapper bridging [`Frozen<MESSAGE_ALIGN>`] into the -/// [`bytes::Bytes::from_owner`] surface (which requires `AsRef<[u8]>`, -/// not `Deref`). The underlying byte slice is shared with the -/// `Bytes`'s atomic-refcount holder; no memcpy. Cost is one -/// `Arc<dyn>` allocation per outbound frame. -struct FrozenBytesOwner(Frozen<MESSAGE_ALIGN>); - -impl AsRef<[u8]> for FrozenBytesOwner { - fn as_ref(&self) -> &[u8] { - self.0.as_slice() - } -} +use tungstenite::protocol::frame::FrameHeader; +use tungstenite::protocol::frame::coding::{Control, Data, OpCode}; /// Subprotocol identifier carried in `Sec-WebSocket-Protocol`. Bump the /// trailing version digits when the wire format changes (matches the @@ -102,12 +113,28 @@ const INBOUND_QUEUE_CAPACITY: usize = 256; /// the per-peer writer task already enforces backpressure upstream. const OUTBOUND_QUEUE_CAPACITY: usize = 256; +/// Reader -> writer control channel depth. Pong replies + Close are +/// rare under nominal traffic; a small bound keeps a chatty peer's +/// Ping flood from displacing consensus traffic. +const CONTROL_QUEUE_CAPACITY: usize = 8; + +/// Per-fill chunk size for the reader task's TCP read. Matches +/// compio-ws's [`Config::DEFAULT_BUF_SIZE`] so kernel buffer sizing on +/// `io_uring` is symmetric pre- and post-rewrite. +const READ_FILL_CHUNK: usize = 128 * 1024; + +/// Maximum WS frame header length per RFC 6455 §5.2 (2 base bytes + +/// 8 extended length bytes + 4 mask bytes). Sized for a per-frame +/// scratch [`Vec<u8>`] reused across the writer's hot loop. +const WS_HEADER_MAX: usize = 14; + /// Inbound WS listener. /// /// Wraps a [`TcpListener`]. `accept` performs the WS HTTP-Upgrade /// handshake via `compio_ws::accept_async` before yielding a /// [`WsTransportConn`], so the install path on the owning shard never -/// sees the upgrade request. +/// sees the upgrade request. Accepted connections take the server role +/// (outbound unmasked, inbound expected masked per RFC 6455). pub struct WsTransportListener { inner: TcpListener, } @@ -128,7 +155,7 @@ impl TransportListener for WsTransportListener { loop { let (stream, addr) = self.inner.accept().await?; match compio_ws::accept_async(stream).await { - Ok(ws) => return Ok((WsTransportConn::new(ws), addr)), + Ok(ws) => return Ok((WsTransportConn::new_server(ws), addr)), Err(e) => { warn!(%addr, "WS handshake failed: {e}"); // Drop the failed stream and accept the next; do @@ -139,18 +166,48 @@ impl TransportListener for WsTransportListener { } } +/// Role of the local end on a WebSocket connection. Mask handling per +/// RFC 6455 §5.3 differs by direction: +/// +/// - **Server** writes unmasked, expects inbound masked. +/// - **Client** writes masked (random per-frame mask), expects inbound +/// unmasked. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Role { + Server, + Client, +} + /// A single WS connection holding the bidirectional [`WebSocketStream`] -/// until [`Self::into_split`] hands ownership to a per-connection -/// dispatcher task. +/// until [`Self::into_split`] unwraps it to the bare [`TcpStream`] and +/// hands the read / write halves to per-task actors. pub struct WsTransportConn { stream: WebSocketStream<TcpStream>, + role: Role, } impl WsTransportConn { - /// Construct from an already-upgraded [`WebSocketStream`]. + /// Construct a server-role connection from an already-upgraded + /// [`WebSocketStream`]. Used by the listener accept path and by the + /// owning-shard install path post-fd-ship. + #[must_use] + pub const fn new_server(stream: WebSocketStream<TcpStream>) -> Self { + Self { + stream, + role: Role::Server, + } + } + + /// Construct a client-role connection from an already-upgraded + /// [`WebSocketStream`]. Used by tests dialing + /// [`compio_ws::client_async`]; production WS dialer paths (if / + /// when shipped) follow the same shape. #[must_use] - pub const fn new(stream: WebSocketStream<TcpStream>) -> Self { - Self { stream } + pub const fn new_client(stream: WebSocketStream<TcpStream>) -> Self { + Self { + stream, + role: Role::Client, + } } } @@ -161,7 +218,54 @@ impl TransportConn for WsTransportConn { fn into_split(self) -> (Self::Reader, Self::Writer) { let (in_tx, in_rx) = bounded::<Message<GenericHeader>>(INBOUND_QUEUE_CAPACITY); let (out_tx, out_rx) = bounded::<Frozen<MESSAGE_ALIGN>>(OUTBOUND_QUEUE_CAPACITY); - compio::runtime::spawn(dispatcher(self.stream, in_tx, out_rx)).detach(); + let (control_tx, control_rx) = bounded::<ControlFrame>(CONTROL_QUEUE_CAPACITY); + let (shutdown, shutdown_token_reader) = Shutdown::new(); + let shutdown = Rc::new(shutdown); + let shutdown_token_writer = shutdown_token_reader.clone(); + + // Unwrap layers: WebSocketStream -> WebSocket -> SyncStream -> TcpStream. + // Recover any post-handshake bytes from the SyncStream's read buffer + // before consuming it (RFC 6455 §4.1 says clients must not pipeline + // before the 101, so this is normally empty; defensive copy keeps a + // misbehaving client from losing its first frame). + let ws_inner = self.stream.into_inner(); + let mut sync_stream = ws_inner.into_inner(); + let leftover: Vec<u8> = match sync_stream.fill_buf() { + Ok(buf) if !buf.is_empty() => { + debug!( + "WS into_split: {} leftover bytes in SyncStream buffer post-handshake", + buf.len() + ); + buf.to_vec() + } + _ => Vec::new(), + }; + let tcp = sync_stream.into_inner(); + let (read_half, write_half) = tcp.into_split(); + + let role = self.role; + let reader_shutdown = Rc::clone(&shutdown); + compio::runtime::spawn(reader_task( + read_half, + leftover, + in_tx, + control_tx, + reader_shutdown, + shutdown_token_reader, + role, + )) + .detach(); + let writer_shutdown = Rc::clone(&shutdown); + compio::runtime::spawn(writer_task( + write_half, + out_rx, + control_rx, + writer_shutdown, + shutdown_token_writer, + role, + )) + .detach(); + ( WsTransportReader { rx: in_rx }, WsTransportWriter { tx: out_tx }, @@ -169,120 +273,122 @@ impl TransportConn for WsTransportConn { } } -/// Per-connection dispatcher loop. Phase-alternating design. -/// -/// `tungstenite`'s state machine and compio's "no buffer drop with -/// pending I/O" rule together rule out a `select!`-cancelling read + -/// write race on a shared `&mut WebSocketStream`. Instead the loop -/// alternates phases: -/// -/// 1. **Phase A** — non-blocking drain of up to -/// [`OUTBOUND_DRAIN_BURST`] outbound frames. `try_recv` returns -/// immediately when the queue is empty; no in-flight I/O is held -/// across the await. -/// 2. **Phase B** — one blocking `stream.read()`. Returns when the -/// peer sends a frame, EOF, error, or peer-close. -/// -/// Outbound latency under this design = at most one inbound RTT (the -/// time to receive the next frame). For request/reply workloads -/// (every client request triggers a server reply) this is identical -/// to a `select!`-based design. For server-push-heavy workloads -/// the upper bound is the keepalive interval; future work can add a -/// keepalive Ping/Pong sweeper that nudges the read past idle. -const OUTBOUND_DRAIN_BURST: usize = 16; +/// Reader -> writer signal: outstanding control work the writer must +/// emit on the wire. +#[derive(Debug)] +enum ControlFrame { + /// Reply to an inbound Ping. Payload is post-unmask. + Pong(Bytes), + /// Send a graceful close (empty body, normal closure). + Close, +} -#[allow(clippy::future_not_send)] -async fn dispatcher( - mut stream: WebSocketStream<TcpStream>, - in_tx: Sender<Message<GenericHeader>>, - out_rx: Receiver<Frozen<MESSAGE_ALIGN>>, -) { - loop { - // Phase A: drain outbound non-blocking, capped at the burst. - for _ in 0..OUTBOUND_DRAIN_BURST { - let Ok(frame) = out_rx.try_recv() else { - break; - }; - let payload = Bytes::from_owner(FrozenBytesOwner(frame)); - if let Err(e) = stream.send(tungstenite::Message::Binary(payload)).await { - warn!("WS write failed: {e}; dispatcher exiting"); - let _ = stream.close(None).await; - return; - } - } +/// Reader-task local representation of one parsed WS frame. +#[derive(Debug)] +enum InboundFrame { + Binary(Vec<u8>), + Ping(Vec<u8>), + /// Server-initiated keepalive reply; reader silently ignores. + Pong, + Close, +} - // Phase B: one read. The compio I/O submission is owned by - // the await and only dropped after completion - matches - // compio's "no buffer drop with pending I/O" rule. - let ws_msg = stream.read().await; - match ws_msg { - Ok(tungstenite::Message::Binary(body)) => match decode_consensus_frame(&body) { - Ok(msg) => { - if in_tx.send(msg).await.is_err() { - debug!("WS reader dropped; dispatcher exiting"); - break; - } - } - Err(e) => { - warn!("WS frame decode failed: {e:?}; closing connection"); - let _ = stream - .close(Some(CloseFrame { - code: CloseCode::Protocol, - reason: "iggy frame decode failed".into(), - })) - .await; - break; - } - }, - Ok(tungstenite::Message::Ping(payload)) => { - if let Err(e) = stream.send(tungstenite::Message::Pong(payload)).await { - debug!("WS pong write failed: {e}; dispatcher exiting"); - break; - } - } - Ok(tungstenite::Message::Pong(_)) => { - // Server-initiated keepalive replies; nothing to do. - } - Ok(tungstenite::Message::Close(_)) => { - debug!("WS peer initiated close"); - break; - } - Ok(tungstenite::Message::Text(_)) => { - warn!("WS text frame received on a binary plane; closing"); - let _ = stream - .close(Some(CloseFrame { - code: CloseCode::Unsupported, - reason: "binary frames only".into(), - })) - .await; - break; - } - Ok(tungstenite::Message::Frame(_)) => { - // Tungstenite's contract: `Message::Frame` is never - // produced on the read path. Defensive log + drop. - warn!("unexpected raw WS frame on read path; ignoring"); - } - Err(e) => { - debug!("WS read error: {e}; dispatcher exiting"); - break; - } - } +/// Categorical reasons the reader may abort. Constructed from inline +/// matches in [`reader_task`] and [`parse_one_frame`]; the variants +/// only reach `tracing::warn!` via the derived `Debug` impl, so the +/// payload fields exist for diagnostic logging only. +#[derive(Debug)] +#[allow(dead_code)] +enum FramingError { + /// Frame's mask bit conflicts with the local role (server got + /// unmasked client frame, or client got masked server frame). + UnexpectedMask, + /// `is_final == false` — fragmentation is not part of the consensus + /// plane invariant. + Fragmented, + /// Continuation, Text, or reserved opcode received. + UnsupportedOpcode, + /// Header parse failed at the tungstenite layer. + InvalidHeader, + /// Total frame body exceeds the bus-wide max message size. + OversizedPayload(u64), +} + +/// 4-byte XOR cycle per RFC 6455 §5.3. Re-implemented in tree because +/// tungstenite's `apply_mask` is `pub(crate)`. No SIMD; profile if mask +/// throughput ever becomes a hot path. +fn apply_mask(buf: &mut [u8], mask: [u8; 4]) { + for (i, byte) in buf.iter_mut().enumerate() { + *byte ^= mask[i & 3]; } - let _ = stream.close(None).await; } -#[derive(Debug)] -enum FrameDecodeError { - BadHeader, - BadSize, +/// Try to parse one complete WS frame off the front of `accumulator`. +/// Returns `Ok(Some(frame))` on a complete frame (bytes consumed), +/// `Ok(None)` if more bytes are needed (accumulator unchanged), or +/// `Err` on a protocol violation. +fn parse_one_frame( + accumulator: &mut Vec<u8>, + role: Role, +) -> Result<Option<InboundFrame>, FramingError> { + let mut cursor = Cursor::new(accumulator.as_slice()); + let parse = FrameHeader::parse(&mut cursor).map_err(|_| FramingError::InvalidHeader)?; + let Some((header, payload_len)) = parse else { + return Ok(None); + }; + // Header is at most 14 bytes per RFC 6455 §5.2; cursor.position() + // returns a u64 only because of the underlying cursor type. + let header_len = usize::try_from(cursor.position()).map_err(|_| FramingError::InvalidHeader)?; + let payload_len_usize = + usize::try_from(payload_len).map_err(|_| FramingError::OversizedPayload(payload_len))?; + if payload_len_usize > framing::MAX_MESSAGE_SIZE { + return Err(FramingError::OversizedPayload(payload_len)); + } + let total = header_len.saturating_add(payload_len_usize); + if accumulator.len() < total { + return Ok(None); + } + + if !header.is_final { + return Err(FramingError::Fragmented); + } + if header.rsv1 || header.rsv2 || header.rsv3 { + return Err(FramingError::InvalidHeader); + } + + // RFC 6455 §5.1: client->server frames MUST be masked, server->client + // MUST NOT be. Validate per local role. + match (role, header.mask) { + (Role::Server, None) | (Role::Client, Some(_)) => { + return Err(FramingError::UnexpectedMask); + } + _ => {} + } + + // Drain header + body bytes from the accumulator. + let mut payload: Vec<u8> = accumulator.drain(..total).collect(); + payload.drain(..header_len); + if let Some(mask) = header.mask { + apply_mask(&mut payload, mask); + } + + let frame = match header.opcode { + OpCode::Data(Data::Binary) => InboundFrame::Binary(payload), + OpCode::Data(Data::Continue | Data::Text | Data::Reserved(_)) + | OpCode::Control(Control::Reserved(_)) => return Err(FramingError::UnsupportedOpcode), + OpCode::Control(Control::Ping) => InboundFrame::Ping(payload), + OpCode::Control(Control::Pong) => InboundFrame::Pong, + OpCode::Control(Control::Close) => InboundFrame::Close, + }; + Ok(Some(frame)) } -/// Parse a binary WS frame's payload into a `Message<GenericHeader>`. +/// Decode a binary WS frame's payload into a `Message<GenericHeader>`. /// /// Reuses the framing invariant (I3): `[256 B header][optional body]` -/// where `body_len = header.size - HEADER_SIZE`. No partial frames; the -/// WS layer guarantees a single message per binary frame (FIN=1). -fn decode_consensus_frame(body: &Bytes) -> Result<Message<GenericHeader>, FrameDecodeError> { +/// where `body_len = header.size - HEADER_SIZE`. The WS layer guarantees +/// a single message per binary frame (FIN=1). +fn decode_consensus_frame(body: &[u8]) -> Result<Message<GenericHeader>, FrameDecodeError> { if body.len() < iggy_binary_protocol::HEADER_SIZE { return Err(FrameDecodeError::BadHeader); } @@ -297,18 +403,274 @@ fn decode_consensus_frame(body: &Bytes) -> Result<Message<GenericHeader>, FrameD return Err(FrameDecodeError::BadSize); } // Aligned MESSAGE_ALIGN backing memory is required by the - // `Message<GenericHeader>` invariant; `Bytes` carries no alignment - // guarantee, so this is a one-time copy from the WS-framed payload - // into a freshly allocated `Owned<MESSAGE_ALIGN>`. The cost is - // unavoidable on inbound for the WS plane (TCP path also does a - // copy via the staged read-into-Owned dance in framing.rs); the - // outbound side stays zero-copy via `Bytes::from_owner`. + // `Message<GenericHeader>` invariant; the unmasked WS payload is a + // plain byte slice with no alignment guarantee, so this is a + // one-time copy from the WS-framed payload into a freshly allocated + // `Owned<MESSAGE_ALIGN>`. Same trade-off as before the rewrite. let owned = iggy_binary_protocol::consensus::iobuf::Owned::<MESSAGE_ALIGN>::copy_from_slice(body); Message::<GenericHeader>::try_from(owned).map_err(|_| FrameDecodeError::BadHeader) } -/// Read half. Polls the dispatcher's inbound queue. +#[derive(Debug)] +enum FrameDecodeError { + BadHeader, + BadSize, +} + +#[allow(clippy::future_not_send)] +async fn reader_task( + mut read_half: OwnedReadHalf<TcpStream>, + initial: Vec<u8>, + in_tx: Sender<Message<GenericHeader>>, + control_tx: Sender<ControlFrame>, + shutdown: Rc<Shutdown>, + shutdown_token: ShutdownToken, + role: Role, +) { + let mut accumulator: Vec<u8> = initial; + loop { + // Drain any complete frames already buffered. + loop { + match parse_one_frame(&mut accumulator, role) { + Ok(Some(InboundFrame::Binary(payload))) => match decode_consensus_frame(&payload) { + Ok(msg) => { + if in_tx.send(msg).await.is_err() { + debug!("WS reader: inbound queue dropped, exiting"); + shutdown.trigger(); + return; + } + } + Err(e) => { + warn!("WS reader: consensus frame decode failed: {e:?}"); + let _ = control_tx.try_send(ControlFrame::Close); + shutdown.trigger(); + return; + } + }, + Ok(Some(InboundFrame::Ping(payload))) => { + if control_tx + .try_send(ControlFrame::Pong(Bytes::from(payload))) + .is_err() + { + warn!("WS reader: control queue full, dropping inbound Ping"); + } + } + Ok(Some(InboundFrame::Pong)) => { + // Server-initiated keepalive reply; nothing to do. + } + Ok(Some(InboundFrame::Close)) => { + debug!("WS reader: peer initiated close"); + let _ = control_tx.try_send(ControlFrame::Close); + shutdown.trigger(); + return; + } + Ok(None) => break, + Err(e) => { + warn!("WS reader: framing violation: {e:?}"); + let _ = control_tx.try_send(ControlFrame::Close); + shutdown.trigger(); + return; + } + } + } + + // Need more bytes. Race the read against the shutdown token so a + // peer that goes silent does not pin the task across graceful + // shutdown. compio TcpStream reads use IoBuf; cancellation + // (drop of the future) is supported and leaks the buffer until + // the kernel completes the SQE. + let buf: Vec<u8> = Vec::with_capacity(READ_FILL_CHUNK); + let read_fut = read_half.read(buf); + let outcome = futures::select_biased! { + () = shutdown_token.wait().fuse() => { + debug!("WS reader: shutdown observed during fill, exiting"); + return; + } + BufResult(result, buf) = read_fut.fuse() => (result, buf), + }; + let (result, buf) = outcome; + match result { + Ok(0) => { + debug!("WS reader: TCP EOF"); + shutdown.trigger(); + return; + } + Ok(n) => accumulator.extend_from_slice(&buf[..n]), + Err(e) => { + debug!("WS reader: TCP read error: {e}"); + shutdown.trigger(); + return; + } + } + } +} + +#[allow(clippy::future_not_send)] +async fn writer_task( + mut write_half: OwnedWriteHalf<TcpStream>, + out_rx: Receiver<Frozen<MESSAGE_ALIGN>>, + control_rx: Receiver<ControlFrame>, + shutdown: Rc<Shutdown>, + shutdown_token: ShutdownToken, + role: Role, +) { + let mut header_scratch: Vec<u8> = Vec::with_capacity(WS_HEADER_MAX); + loop { + let event = futures::select_biased! { + () = shutdown_token.wait().fuse() => Event::Shutdown, + ctrl = control_rx.recv().fuse() => ctrl + .map_or(Event::Shutdown, Event::Control), + out = out_rx.recv().fuse() => out + .map_or(Event::Shutdown, Event::Outbound), + }; + match event { + Event::Outbound(frozen) => { + if let Err(e) = + write_consensus_binary(&mut write_half, &mut header_scratch, frozen, role).await + { + debug!("WS writer: outbound write failed: {e}"); + shutdown.trigger(); + return; + } + } + Event::Control(ControlFrame::Pong(payload)) => { + if let Err(e) = write_control( + &mut write_half, + &mut header_scratch, + payload, + OpCode::Control(Control::Pong), + role, + ) + .await + { + debug!("WS writer: pong write failed: {e}"); + shutdown.trigger(); + return; + } + } + Event::Control(ControlFrame::Close) => { + let _ = write_control( + &mut write_half, + &mut header_scratch, + Bytes::new(), + OpCode::Control(Control::Close), + role, + ) + .await; + shutdown.trigger(); + return; + } + Event::Shutdown => { + let _ = write_control( + &mut write_half, + &mut header_scratch, + Bytes::new(), + OpCode::Control(Control::Close), + role, + ) + .await; + return; + } + } + } +} + +enum Event { + Outbound(Frozen<MESSAGE_ALIGN>), + Control(ControlFrame), + Shutdown, +} + +/// Write one outbound consensus binary frame. +/// +/// Server role: header to scratch, frozen payload directly to compio +/// (zero-copy of body). Client role: header + masked body in one +/// scratch, single write. +#[allow(clippy::future_not_send)] +async fn write_consensus_binary( + write_half: &mut OwnedWriteHalf<TcpStream>, + header_scratch: &mut Vec<u8>, + frozen: Frozen<MESSAGE_ALIGN>, + role: Role, +) -> io::Result<()> { + let payload_len = frozen.len(); + header_scratch.clear(); + match role { + Role::Server => { + let header = build_header(OpCode::Data(Data::Binary), None); + format_header(&header, payload_len as u64, header_scratch); + let scratch = std::mem::take(header_scratch); + let BufResult(r1, returned) = write_half.write_all(scratch).await; + *header_scratch = returned; + r1?; + let BufResult(r2, _) = write_half.write_all(frozen).await; + r2 + } + Role::Client => { + let mask: [u8; 4] = rand::random::<u32>().to_ne_bytes(); + let header = build_header(OpCode::Data(Data::Binary), Some(mask)); + format_header(&header, payload_len as u64, header_scratch); + let header_len = header_scratch.len(); + header_scratch.extend_from_slice(frozen.as_slice()); + apply_mask(&mut header_scratch[header_len..], mask); + let scratch = std::mem::take(header_scratch); + let BufResult(r, returned) = write_half.write_all(scratch).await; + *header_scratch = returned; + r + } + } +} + +/// Write one outbound control frame (Pong or Close). Always single +/// write: control payloads are at most 125 B per RFC 6455 §5.5. +#[allow(clippy::future_not_send)] +async fn write_control( + write_half: &mut OwnedWriteHalf<TcpStream>, + header_scratch: &mut Vec<u8>, + payload: Bytes, + opcode: OpCode, + role: Role, +) -> io::Result<()> { + header_scratch.clear(); + let mask = match role { + Role::Server => None, + Role::Client => Some(rand::random::<[u8; 4]>()), + }; + let header = build_header(opcode, mask); + format_header(&header, payload.len() as u64, header_scratch); + let header_len = header_scratch.len(); + header_scratch.extend_from_slice(&payload); + if let Some(mask) = mask { + apply_mask(&mut header_scratch[header_len..], mask); + } + let scratch = std::mem::take(header_scratch); + let BufResult(r, returned) = write_half.write_all(scratch).await; + *header_scratch = returned; + r +} + +const fn build_header(opcode: OpCode, mask: Option<[u8; 4]>) -> FrameHeader { + FrameHeader { + is_final: true, + rsv1: false, + rsv2: false, + rsv3: false, + opcode, + mask, + } +} + +/// Format a [`FrameHeader`] into `out`. Tungstenite's +/// [`FrameHeader::format`] only fails on `Write::write_all` errors; +/// `Vec<u8>::write_all` is infallible. +fn format_header(header: &FrameHeader, payload_len: u64, out: &mut Vec<u8>) { + header + .format(payload_len, out) + .expect("FrameHeader::format on Vec<u8> is infallible"); +} + +/// Read half. Polls the reader task's inbound queue. pub struct WsTransportReader { rx: Receiver<Message<GenericHeader>>, } @@ -319,10 +681,11 @@ impl TransportReader for WsTransportReader { &mut self, _max_message_size: usize, ) -> Result<Message<GenericHeader>, IggyError> { - // Bound check happens inside `decode_consensus_frame` against - // the build-time `framing::MAX_MESSAGE_SIZE`; the per-call - // override exists for parity with the trait surface and will - // be plumbed through when a runtime-configurable cap lands. + // Bound check happens inside [`parse_one_frame`] (against + // [`framing::MAX_MESSAGE_SIZE`]) and inside [`decode_consensus_frame`] + // (against the build-time max). The per-call override exists for + // parity with the trait surface; runtime override plumbing lands + // when a runtime-configurable cap is needed. self.rx .recv() .await @@ -330,7 +693,7 @@ impl TransportReader for WsTransportReader { } } -/// Write half. Pushes frames into the dispatcher's outbound queue. +/// Write half. Pushes frames into the writer task's outbound queue. pub struct WsTransportWriter { tx: Sender<Frozen<MESSAGE_ALIGN>>, } @@ -339,13 +702,13 @@ impl TransportWriter for WsTransportWriter { #[allow(clippy::future_not_send)] async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> io::Result<()> { // Drain in order; one queue push per frame. Atomic-or-error - // contract holds because the dispatcher writes each frame + // contract holds because the writer task writes each frame // before pulling the next from the queue. for buf in batch.drain(..) { self.tx .send(buf) .await - .map_err(|_| io::Error::other("WS dispatcher queue closed"))?; + .map_err(|_| io::Error::other("WS writer queue closed"))?; } Ok(()) } @@ -356,6 +719,7 @@ mod tests { use super::*; use compio::net::TcpStream; use iggy_binary_protocol::{Command2, HEADER_SIZE}; + use std::time::Duration; #[allow(clippy::cast_possible_truncation)] fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> { @@ -367,6 +731,16 @@ mod tests { .into_frozen() } + #[allow(clippy::cast_possible_truncation)] + fn padded(command: Command2, total_size: usize) -> Frozen<MESSAGE_ALIGN> { + Message::<GenericHeader>::new(total_size) + .transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.size = total_size as u32; + }) + .into_frozen() + } + #[compio::test] #[allow(clippy::future_not_send)] async fn loopback_round_trip_three_frames() { @@ -397,7 +771,7 @@ mod tests { let (ws_client, _resp) = compio_ws::client_async(url, client_tcp) .await .expect("ws handshake"); - let conn = WsTransportConn::new(ws_client); + let conn = WsTransportConn::new_client(ws_client); let (_r, mut w) = conn.into_split(); let mut batch = vec![ @@ -440,9 +814,206 @@ mod tests { .send(tungstenite::Message::Binary(Bytes::from(buf))) .await .unwrap(); - // Server's dispatcher closes the WS; reader queue closes, - // surfacing as ConnectionClosed. + // Server's reader closes; queue closes, surfacing as ConnectionClosed. let res = server_task.await.unwrap(); assert!(matches!(res, Err(IggyError::ConnectionClosed))); } + + /// Full bidirectional round-trip: client sends one frame, server + /// reader receives it and the server's writer pushes a Reply that + /// the client's reader observes. This is the regression test the + /// pre-rewrite dispatcher could not pass. + #[compio::test] + #[allow(clippy::future_not_send)] + async fn server_to_client_reply_round_trip() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let server_addr = listener.local_addr().unwrap(); + let ws_listener = WsTransportListener::new(listener); + + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = ws_listener.accept().await.expect("accept"); + let (mut reader, mut writer) = conn.into_split(); + let req = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .expect("server read"); + assert_eq!(req.header().command, Command2::Request); + let mut batch = vec![header_only(Command2::Reply)]; + writer.send_batch(&mut batch).await.expect("server reply"); + }); + + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let (ws_client, _resp) = compio_ws::client_async(url, client_tcp) + .await + .expect("ws handshake"); + let (mut client_reader, mut client_writer) = + WsTransportConn::new_client(ws_client).into_split(); + + let mut batch = vec![header_only(Command2::Request)]; + client_writer + .send_batch(&mut batch) + .await + .expect("client send"); + + let reply = compio::time::timeout( + Duration::from_secs(2), + client_reader.read_message(framing::MAX_MESSAGE_SIZE), + ) + .await + .expect("client must receive reply within 2 s") + .expect("reply frame"); + assert_eq!(reply.header().command, Command2::Reply); + server_task.await.unwrap(); + } + + /// Verify a 16 MiB body round-trips intact through both reader and + /// writer paths. + #[compio::test] + #[allow(clippy::future_not_send)] + async fn large_frame_round_trip() { + const BODY_SIZE: usize = 16 * 1024 * 1024; + let total = HEADER_SIZE + BODY_SIZE; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let server_addr = listener.local_addr().unwrap(); + let ws_listener = WsTransportListener::new(listener); + + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = ws_listener.accept().await.expect("accept"); + let (mut reader, _writer) = conn.into_split(); + reader.read_message(framing::MAX_MESSAGE_SIZE).await + }); + + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let (ws_client, _resp) = compio_ws::client_async(url, client_tcp) + .await + .expect("ws handshake"); + let (_r, mut w) = WsTransportConn::new_client(ws_client).into_split(); + let mut batch = vec![padded(Command2::Request, total)]; + w.send_batch(&mut batch).await.expect("send"); + + let msg = compio::time::timeout(Duration::from_secs(5), server_task) + .await + .expect("server task within 5 s") + .unwrap() + .expect("msg"); + assert_eq!(msg.header().command, Command2::Request); + assert_eq!(msg.header().size as usize, total); + } + + /// Apply-mask round-trips a payload back to itself. + #[test] + fn apply_mask_round_trip() { + let mask = [0x11, 0x22, 0x33, 0x44]; + let original: Vec<u8> = (0u8..200).collect(); + let mut masked = original.clone(); + apply_mask(&mut masked, mask); + assert_ne!(masked, original); + apply_mask(&mut masked, mask); + assert_eq!(masked, original); + } + + /// Server reading an unmasked client frame must be rejected. + #[test] + fn server_role_rejects_unmasked_inbound() { + // Build an unmasked Binary frame with a 0-byte payload. + let mut buf = Vec::new(); + let header = build_header(OpCode::Data(Data::Binary), None); + format_header(&header, 0, &mut buf); + let res = parse_one_frame(&mut buf, Role::Server); + assert!(matches!(res, Err(FramingError::UnexpectedMask))); + } + + /// Client reading a masked server frame must be rejected. + #[test] + fn client_role_rejects_masked_inbound() { + let mut buf = Vec::new(); + let header = build_header(OpCode::Data(Data::Binary), Some([1, 2, 3, 4])); + format_header(&header, 0, &mut buf); + let res = parse_one_frame(&mut buf, Role::Client); + assert!(matches!(res, Err(FramingError::UnexpectedMask))); + } + + /// Fragmented inbound frame is rejected on this plane. + #[test] + fn fragmented_inbound_rejected() { + let mut buf = Vec::new(); + let header = FrameHeader { + is_final: false, + rsv1: false, + rsv2: false, + rsv3: false, + opcode: OpCode::Data(Data::Binary), + mask: Some([1, 2, 3, 4]), + }; + format_header(&header, 0, &mut buf); + let res = parse_one_frame(&mut buf, Role::Server); + assert!(matches!(res, Err(FramingError::Fragmented))); + } + + /// Text frame is rejected (binary-only plane). + #[test] + fn text_inbound_rejected() { + let mut buf = Vec::new(); + let header = build_header(OpCode::Data(Data::Text), Some([1, 2, 3, 4])); + format_header(&header, 0, &mut buf); + let res = parse_one_frame(&mut buf, Role::Server); + assert!(matches!(res, Err(FramingError::UnsupportedOpcode))); + } + + /// Partial frame returns `Ok(None)` and leaves the accumulator + /// untouched. + #[test] + fn partial_frame_keeps_accumulator() { + let mut buf: Vec<u8> = vec![0x82]; // FIN + Binary opcode, but no length byte + let before = buf.clone(); + let res = parse_one_frame(&mut buf, Role::Server); + assert!(matches!(res, Ok(None))); + assert_eq!(buf, before, "accumulator must not be drained on partial"); + } + + /// Inbound Pong frames are silently consumed and the reader keeps + /// going. Verified by sending a Pong followed by a real consensus + /// frame and confirming the consensus frame still surfaces. + #[test] + fn pong_inbound_silently_consumed() { + let mut buf = Vec::new(); + let header = build_header(OpCode::Control(Control::Pong), Some([1, 2, 3, 4])); + format_header(&header, 0, &mut buf); + let res = parse_one_frame(&mut buf, Role::Server); + assert!(matches!(res, Ok(Some(InboundFrame::Pong)))); + assert!(buf.is_empty()); + } + + /// Peer-initiated Close on the server-side reader surfaces as + /// `ConnectionClosed` on the inbound queue. + #[compio::test] + #[allow(clippy::future_not_send)] + async fn peer_initiated_close_drains_inbound_queue() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let server_addr = listener.local_addr().unwrap(); + let ws_listener = WsTransportListener::new(listener); + + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = ws_listener.accept().await.expect("accept"); + let (mut reader, _writer) = conn.into_split(); + reader.read_message(framing::MAX_MESSAGE_SIZE).await + }); + + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let (mut ws_client, _) = compio_ws::client_async(url, client_tcp).await.unwrap(); + // Client sends a clean Close. compio_ws::WebSocketStream::close + // emits a Close frame and waits for the server's Close reply or + // EOF. + ws_client.close(None).await.expect("client close"); + + let res = compio::time::timeout(Duration::from_secs(2), server_task) + .await + .expect("server task within 2 s") + .unwrap(); + assert!(matches!(res, Err(IggyError::ConnectionClosed))); + } } diff --git a/core/message_bus/tests/ws_client_roundtrip.rs b/core/message_bus/tests/ws_client_roundtrip.rs index d8d8854f4..0d95f2dc6 100644 --- a/core/message_bus/tests/ws_client_roundtrip.rs +++ b/core/message_bus/tests/ws_client_roundtrip.rs @@ -25,16 +25,10 @@ //! Coverage: //! //! - **Positive**: client requests `iggy.consensus.v1`, handshake -//! succeeds, a Request frame reaches the server-side handler. -//! Caveat: a full Request -> Reply round trip would deadlock -//! the per-connection WS dispatcher (`transports::ws.rs`); its -//! phase-alternating drain-then-read loop blocks Phase B on -//! `stream.read().await` and cannot react to outbound queue -//! activity, so the server's Reply never reaches the wire while -//! the client is parked in Phase B waiting for it. Tracked as a -//! Phase 7 follow-up in the silverhand transport-plan; the design -//! note in the dispatcher comment overstates its request/reply -//! support. +//! succeeds, the server-side handler observes a Request, and the +//! server's `bus.send_to_client` reply lands on the client's +//! reader. Verifies the full bidirectional plane through the +//! reader / writer two-task split. //! - **Negative (missing)**: client omits the `Sec-WebSocket-Protocol` //! header; server returns HTTP 400; handshake fails on the client. //! - **Negative (wrong)**: client requests an unrelated subprotocol; @@ -47,23 +41,34 @@ use compio::net::TcpStream; use iggy_binary_protocol::Command2; use iggy_binary_protocol::consensus::MESSAGE_ALIGN; use iggy_binary_protocol::consensus::iobuf::Frozen; -use message_bus::IggyMessageBus; use message_bus::client_listener::RequestHandler; use message_bus::client_listener_ws::{bind, run}; use message_bus::transports::ws::{WS_SUBPROTOCOL, WsTransportConn}; -use message_bus::transports::{TransportConn, TransportWriter}; +use message_bus::transports::{TransportConn, TransportReader, TransportWriter}; +use message_bus::{IggyMessageBus, MessageBus, framing}; use std::rc::Rc; use std::time::Duration; #[compio::test] -async fn handshake_succeeds_and_request_reaches_handler() { +async fn handshake_succeeds_and_round_trip_completes() { let bus = Rc::new(IggyMessageBus::new(0)); - // Side-channel signal: handler fires once when it sees the Request. - let (tx_seen, rx_seen) = async_channel::bounded::<u128>(1); + // Handler echoes a Reply back to the originating client_id via the + // bus's send_to_client surface — same path a real dispatcher would + // take. Spawned because the handler signature is synchronous; the + // bus surface returns Ready-on-first-poll (I2) so the spawned task + // completes within the same runtime tick. + let bus_for_handler = Rc::clone(&bus); let on_request: RequestHandler = Rc::new(move |client_id, msg| { assert_eq!(msg.header().command, Command2::Request); - let _ = tx_seen.try_send(client_id); + let bus = Rc::clone(&bus_for_handler); + compio::runtime::spawn(async move { + let reply = header_only(Command2::Reply, 42, 0).into_frozen(); + bus.send_to_client(client_id, reply) + .await + .expect("server send_to_client"); + }) + .detach(); }); let (listener, server_addr) = bind(loopback()).await.expect("bind"); @@ -83,28 +88,23 @@ async fn handshake_succeeds_and_request_reaches_handler() { .await .expect("ws handshake"); - // Reuse the existing WsTransportConn split for client-side framing. - // Drop the client's reader half; we only verify that the server saw - // the Request. A full round trip is blocked by the dispatcher - // limitation called out in the module-level comment. - let conn = WsTransportConn::new(ws_client); - let (_reader, mut writer) = conn.into_split(); + let conn = WsTransportConn::new_client(ws_client); + let (mut reader, mut writer) = conn.into_split(); let request = header_only(Command2::Request, 42, 0).into_frozen(); let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = vec![request]; writer.send_batch(&mut batch).await.expect("client send"); assert!(batch.is_empty(), "send_batch must drain the Vec"); - // Wait for the handler to observe the Request. - let observed = compio::time::timeout(Duration::from_secs(2), rx_seen.recv()) - .await - .expect("handler must fire within 2 s") - .expect("handler must signal"); - assert_eq!( - observed >> 112, - 0, - "client_id top 16 bits must encode shard 0" - ); + // Read the server's Reply off the WS reader within 2 s. + let reply = compio::time::timeout( + Duration::from_secs(2), + reader.read_message(framing::MAX_MESSAGE_SIZE), + ) + .await + .expect("client must receive reply within 2 s") + .expect("reply frame"); + assert_eq!(reply.header().command, Command2::Reply); bus.shutdown(Duration::from_secs(2)).await; }
