This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 45b4197c224e91099f5a85565e431521e8e2641a Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 11:37:20 2026 +0200 feat(message_bus): add WebSocket transport for SDK-client plane The client plane lacked WS support; Phase 2 of the transport plan delivers it for browser SDKs and operators that need WS behind a load-balancer-terminated TLS terminator. Lands the message-bus side behind the existing `Transport*` trait family. `WsTransport{Listener,Conn,Reader,Writer}` in `transports/ws.rs` wrap `compio-ws 0.3.1` so the runtime stays single-stack (no tokio bridge, invariant I10). `WebSocketStream` cannot be split: tungstenite's protocol state machine spans both directions. The transport's `into_split` spawns a per-connection dispatcher task that owns the stream and bridges two `async_channel::bounded` queues to the Reader/Writer halves. A `select!`-based dispatcher hit compio's "no buffer drop with pending I/O" panic on cancelled reads. The dispatcher is therefore phase-alternating: drain up to 16 outbound frames non-blocking via `try_recv`, then await one `stream.read()`. For request/reply workloads outbound latency is bounded by the next inbound RTT; server-push-heavy traffic gets a follow-up keepalive sweeper. Outbound stays zero-copy: `bytes::Bytes::from_owner` wraps a `Frozen<MESSAGE_ALIGN>` newtype that impls `AsRef<[u8]>` (the underlying Frozen has `Deref` only). Inbound copies once into a `MESSAGE_ALIGN`-aligned `Owned` because `Bytes` carries no alignment guarantee. WS is always-on (no Cargo feature), matching the QUIC posture and shrinking the build matrix. `compio-ws + tungstenite 0.28 + bytes` move to message_bus's regular `[dependencies]`. `tungstenite = "0.28"` is pinned directly (not via the workspace's 0.29 used by tokio-tungstenite elsewhere) because compio-ws does not re-export `Message` and we need to use the same enum compio-ws expects. Out of scope on this branch: shard-0 HTTP-Upgrade routing in `client_listener` and the `install_client_ws_stream` entry point. Both ride the dispatcher branch alongside QUIC's `ShardFramePayload::ClientQuicConnectionSetup` variant. Tests: +2 in `transports::ws::tests` (round-trip three frames, malformed-frame closes connection). 125/125 pass on `cargo nextest run -p message_bus -p server-ng`. Workspace clippy + cargo doc clean. --- Cargo.lock | 3 + Cargo.toml | 1 + core/message_bus/Cargo.toml | 5 + core/message_bus/src/transports/mod.rs | 1 + core/message_bus/src/transports/ws.rs | 448 +++++++++++++++++++++++++++++++++ 5 files changed, 458 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index cb1463495..36f299edd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7010,8 +7010,10 @@ version = "0.1.0" dependencies = [ "async-channel", "blake3", + "bytes", "compio", "compio-quic", + "compio-ws", "futures", "iggy_binary_protocol", "iggy_common", @@ -7023,6 +7025,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tracing", + "tungstenite 0.28.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 87d8878d1..aa367d9a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,7 @@ compio-buf = "0.8.1" # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668) compio-driver = "0.11.4" compio-quic = "=0.7.2" +compio-ws = "=0.3.1" configs = { path = "core/configs", version = "0.1.0" } configs_derive = { path = "core/configs_derive", version = "0.1.0" } consensus = { path = "core/consensus" } diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml index ace090f16..2be7d01a9 100644 --- a/core/message_bus/Cargo.toml +++ b/core/message_bus/Cargo.toml @@ -31,8 +31,10 @@ publish = false [dependencies] async-channel = { workspace = true } blake3 = { workspace = true } +bytes = { workspace = true } compio = { workspace = true } compio-quic = { workspace = true } +compio-ws = { workspace = true } futures = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } @@ -42,6 +44,9 @@ rustls = { workspace = true } socket2 = { workspace = true, features = ["all"] } thiserror = { workspace = true } tracing = { workspace = true } +# Pinned to the major compio-ws 0.3.1 wraps (it does not re-export Message), +# so our send/receive sites use the same Message enum compio-ws expects. +tungstenite = "=0.28.0" [dev-dependencies] rcgen = { workspace = true } diff --git a/core/message_bus/src/transports/mod.rs b/core/message_bus/src/transports/mod.rs index 76e1965f3..2e1875919 100644 --- a/core/message_bus/src/transports/mod.rs +++ b/core/message_bus/src/transports/mod.rs @@ -67,6 +67,7 @@ pub mod quic; pub mod tcp; +pub mod ws; // Only `Conn` and `Writer` have crate-internal callers today // (`installer.rs` wraps the dialed/accepted stream in a `TcpTransportConn`; diff --git a/core/message_bus/src/transports/ws.rs b/core/message_bus/src/transports/ws.rs new file mode 100644 index 000000000..9a52a40de --- /dev/null +++ b/core/message_bus/src/transports/ws.rs @@ -0,0 +1,448 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! WebSocket impls of the [`super`] transport traits. +//! +//! SDK-client plane only; replica plane stays TCP forever (invariant I1). +//! `compio-ws 0.3.1` binds `tungstenite`'s protocol state machine to +//! compio I/O directly, no tokio bridge (invariant I10). +//! +//! # Connection model +//! +//! One binary WS frame per consensus message. The 256 B `GenericHeader` +//! plus the optional body is the frame payload; the frame format +//! constraints (FIN=1, opcode=0x2 binary, no fragmentation, no +//! extensions) come from the WS spec layered on top of our 256 B +//! framing invariant (I3). +//! +//! # Stream split via actor pattern +//! +//! `tungstenite`'s state machine spans both directions: control frames +//! (Ping/Pong/Close) and the masked-from-client/unmasked-from-server +//! rules force the read and write halves to cooperate on one +//! `WebSocketStream` instance. We cannot satisfy +//! [`TransportConn::into_split`]'s ownership-split contract by handing +//! out two halves of the underlying TCP connection. +//! +//! Instead [`WsTransportConn::into_split`] spawns a **dispatcher task** +//! that owns the [`WebSocketStream`] and bridges two +//! `async_channel::bounded` queues: +//! +//! - inbound: dispatcher reads frames, pushes parsed +//! `Message<GenericHeader>` to the reader's queue. +//! - outbound: dispatcher pulls `Frozen<MESSAGE_ALIGN>` from the +//! writer's queue, wraps in a binary WS frame, sends. +//! +//! Per-frame channel hop costs one queue push + one queue pop. Single- +//! threaded compio means the hop is a noop scheduling-wise; the cost is +//! one bounded buffer slot per inflight frame, which the per-peer queue +//! capacity already pays for on the bus side. +//! +//! # Zero-copy payload +//! +//! `tungstenite::Message::Binary` takes `bytes::Bytes`. Wrapping +//! `Frozen<MESSAGE_ALIGN>` via [`bytes::Bytes::from_owner`] is +//! O(1) Arc-allocation, no memcpy. The 256 B header + body slice is +//! handed to the WS frame writer as-is. + +use super::{TransportConn, TransportListener, TransportReader, TransportWriter}; +use crate::framing; +use async_channel::{Receiver, Sender, bounded}; +use bytes::Bytes; +use compio::net::{TcpListener, TcpStream}; +use compio_ws::WebSocketStream; +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::consensus::iobuf::Frozen; +use iggy_binary_protocol::{GenericHeader, Message}; +use iggy_common::IggyError; +use std::io; +use std::net::SocketAddr; +use tracing::{debug, warn}; +use tungstenite::protocol::CloseFrame; +use tungstenite::protocol::frame::coding::CloseCode; + +/// Newtype wrapper bridging [`Frozen<MESSAGE_ALIGN>`] into the +/// [`bytes::Bytes::from_owner`] surface (which requires `AsRef<[u8]>`, +/// not `Deref`). The underlying byte slice is shared with the +/// `Bytes`'s atomic-refcount holder; no memcpy. Cost is one +/// `Arc<dyn>` allocation per outbound frame. +struct FrozenBytesOwner(Frozen<MESSAGE_ALIGN>); + +impl AsRef<[u8]> for FrozenBytesOwner { + fn as_ref(&self) -> &[u8] { + self.0.as_slice() + } +} + +/// Subprotocol identifier carried in `Sec-WebSocket-Protocol`. Bump the +/// trailing version digits when the wire format changes (matches the +/// QUIC ALPN constant in `transports/quic.rs`). +pub const WS_SUBPROTOCOL: &str = "iggy.consensus.v1"; + +/// Per-connection inbound queue depth. Bounded so an idle reader cannot +/// let memory grow unboundedly under a chatty peer; mirrors the bus's +/// per-peer outbound queue convention. +const INBOUND_QUEUE_CAPACITY: usize = 256; + +/// Per-connection outbound queue depth. Same shape as the inbound side; +/// the per-peer writer task already enforces backpressure upstream. +const OUTBOUND_QUEUE_CAPACITY: usize = 256; + +/// Inbound WS listener. +/// +/// Wraps a [`TcpListener`]. `accept` performs the WS HTTP-Upgrade +/// handshake via `compio_ws::accept_async` before yielding a +/// [`WsTransportConn`], so the install path on the owning shard never +/// sees the upgrade request. +pub struct WsTransportListener { + inner: TcpListener, +} + +impl WsTransportListener { + /// Wrap a pre-bound [`TcpListener`]. + #[must_use] + pub const fn new(inner: TcpListener) -> Self { + Self { inner } + } +} + +impl TransportListener for WsTransportListener { + type Conn = WsTransportConn; + + #[allow(clippy::future_not_send)] + async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)> { + loop { + let (stream, addr) = self.inner.accept().await?; + match compio_ws::accept_async(stream).await { + Ok(ws) => return Ok((WsTransportConn::new(ws), addr)), + Err(e) => { + warn!(%addr, "WS handshake failed: {e}"); + // Drop the failed stream and accept the next; do + // not surface as a listener-fatal error. + } + } + } + } +} + +/// A single WS connection holding the bidirectional [`WebSocketStream`] +/// until [`Self::into_split`] hands ownership to a per-connection +/// dispatcher task. +pub struct WsTransportConn { + stream: WebSocketStream<TcpStream>, +} + +impl WsTransportConn { + /// Construct from an already-upgraded [`WebSocketStream`]. + #[must_use] + pub const fn new(stream: WebSocketStream<TcpStream>) -> Self { + Self { stream } + } +} + +impl TransportConn for WsTransportConn { + type Reader = WsTransportReader; + type Writer = WsTransportWriter; + + fn into_split(self) -> (Self::Reader, Self::Writer) { + let (in_tx, in_rx) = bounded::<Message<GenericHeader>>(INBOUND_QUEUE_CAPACITY); + let (out_tx, out_rx) = bounded::<Frozen<MESSAGE_ALIGN>>(OUTBOUND_QUEUE_CAPACITY); + compio::runtime::spawn(dispatcher(self.stream, in_tx, out_rx)).detach(); + ( + WsTransportReader { rx: in_rx }, + WsTransportWriter { tx: out_tx }, + ) + } +} + +/// Per-connection dispatcher loop. Phase-alternating design. +/// +/// `tungstenite`'s state machine and compio's "no buffer drop with +/// pending I/O" rule together rule out a `select!`-cancelling read + +/// write race on a shared `&mut WebSocketStream`. Instead the loop +/// alternates phases: +/// +/// 1. **Phase A** — non-blocking drain of up to +/// [`OUTBOUND_DRAIN_BURST`] outbound frames. `try_recv` returns +/// immediately when the queue is empty; no in-flight I/O is held +/// across the await. +/// 2. **Phase B** — one blocking `stream.read()`. Returns when the +/// peer sends a frame, EOF, error, or peer-close. +/// +/// Outbound latency under this design = at most one inbound RTT (the +/// time to receive the next frame). For request/reply workloads +/// (every client request triggers a server reply) this is identical +/// to a `select!`-based design. For server-push-heavy workloads +/// the upper bound is the keepalive interval; future work can add a +/// keepalive Ping/Pong sweeper that nudges the read past idle. +const OUTBOUND_DRAIN_BURST: usize = 16; + +#[allow(clippy::future_not_send)] +async fn dispatcher( + mut stream: WebSocketStream<TcpStream>, + in_tx: Sender<Message<GenericHeader>>, + out_rx: Receiver<Frozen<MESSAGE_ALIGN>>, +) { + loop { + // Phase A: drain outbound non-blocking, capped at the burst. + for _ in 0..OUTBOUND_DRAIN_BURST { + let Ok(frame) = out_rx.try_recv() else { + break; + }; + let payload = Bytes::from_owner(FrozenBytesOwner(frame)); + if let Err(e) = stream.send(tungstenite::Message::Binary(payload)).await { + warn!("WS write failed: {e}; dispatcher exiting"); + let _ = stream.close(None).await; + return; + } + } + + // Phase B: one read. The compio I/O submission is owned by + // the await and only dropped after completion - matches + // compio's "no buffer drop with pending I/O" rule. + let ws_msg = stream.read().await; + match ws_msg { + Ok(tungstenite::Message::Binary(body)) => match decode_consensus_frame(&body) { + Ok(msg) => { + if in_tx.send(msg).await.is_err() { + debug!("WS reader dropped; dispatcher exiting"); + break; + } + } + Err(e) => { + warn!("WS frame decode failed: {e:?}; closing connection"); + let _ = stream + .close(Some(CloseFrame { + code: CloseCode::Protocol, + reason: "iggy frame decode failed".into(), + })) + .await; + break; + } + }, + Ok(tungstenite::Message::Ping(payload)) => { + if let Err(e) = stream.send(tungstenite::Message::Pong(payload)).await { + debug!("WS pong write failed: {e}; dispatcher exiting"); + break; + } + } + Ok(tungstenite::Message::Pong(_)) => { + // Server-initiated keepalive replies; nothing to do. + } + Ok(tungstenite::Message::Close(_)) => { + debug!("WS peer initiated close"); + break; + } + Ok(tungstenite::Message::Text(_)) => { + warn!("WS text frame received on a binary plane; closing"); + let _ = stream + .close(Some(CloseFrame { + code: CloseCode::Unsupported, + reason: "binary frames only".into(), + })) + .await; + break; + } + Ok(tungstenite::Message::Frame(_)) => { + // Tungstenite's contract: `Message::Frame` is never + // produced on the read path. Defensive log + drop. + warn!("unexpected raw WS frame on read path; ignoring"); + } + Err(e) => { + debug!("WS read error: {e}; dispatcher exiting"); + break; + } + } + } + let _ = stream.close(None).await; +} + +#[derive(Debug)] +enum FrameDecodeError { + BadHeader, + BadSize, +} + +/// Parse a binary WS frame's payload into a `Message<GenericHeader>`. +/// +/// Reuses the framing invariant (I3): `[256 B header][optional body]` +/// where `body_len = header.size - HEADER_SIZE`. No partial frames; the +/// WS layer guarantees a single message per binary frame (FIN=1). +fn decode_consensus_frame(body: &Bytes) -> Result<Message<GenericHeader>, FrameDecodeError> { + if body.len() < iggy_binary_protocol::HEADER_SIZE { + return Err(FrameDecodeError::BadHeader); + } + let total_size = u32::from_le_bytes( + body[48..52] + .try_into() + .map_err(|_| FrameDecodeError::BadHeader)?, + ) as usize; + if !(iggy_binary_protocol::HEADER_SIZE..=framing::MAX_MESSAGE_SIZE).contains(&total_size) + || total_size != body.len() + { + return Err(FrameDecodeError::BadSize); + } + // Aligned MESSAGE_ALIGN backing memory is required by the + // `Message<GenericHeader>` invariant; `Bytes` carries no alignment + // guarantee, so this is a one-time copy from the WS-framed payload + // into a freshly allocated `Owned<MESSAGE_ALIGN>`. The cost is + // unavoidable on inbound for the WS plane (TCP path also does a + // copy via the staged read-into-Owned dance in framing.rs); the + // outbound side stays zero-copy via `Bytes::from_owner`. + let owned = + iggy_binary_protocol::consensus::iobuf::Owned::<MESSAGE_ALIGN>::copy_from_slice(body); + Message::<GenericHeader>::try_from(owned).map_err(|_| FrameDecodeError::BadHeader) +} + +/// Read half. Polls the dispatcher's inbound queue. +pub struct WsTransportReader { + rx: Receiver<Message<GenericHeader>>, +} + +impl TransportReader for WsTransportReader { + #[allow(clippy::future_not_send)] + async fn read_message( + &mut self, + _max_message_size: usize, + ) -> Result<Message<GenericHeader>, IggyError> { + // Bound check happens inside `decode_consensus_frame` against + // the build-time `framing::MAX_MESSAGE_SIZE`; the per-call + // override exists for parity with the trait surface and will + // be plumbed through when a runtime-configurable cap lands. + self.rx + .recv() + .await + .map_err(|_| IggyError::ConnectionClosed) + } +} + +/// Write half. Pushes frames into the dispatcher's outbound queue. +pub struct WsTransportWriter { + tx: Sender<Frozen<MESSAGE_ALIGN>>, +} + +impl TransportWriter for WsTransportWriter { + #[allow(clippy::future_not_send)] + async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> io::Result<()> { + // Drain in order; one queue push per frame. Atomic-or-error + // contract holds because the dispatcher writes each frame + // before pulling the next from the queue. + for buf in batch.drain(..) { + self.tx + .send(buf) + .await + .map_err(|_| io::Error::other("WS dispatcher queue closed"))?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use compio::net::TcpStream; + use iggy_binary_protocol::{Command2, HEADER_SIZE}; + + #[allow(clippy::cast_possible_truncation)] + fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> { + Message::<GenericHeader>::new(HEADER_SIZE) + .transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.size = HEADER_SIZE as u32; + }) + .into_frozen() + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn loopback_round_trip_three_frames() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let server_addr = listener.local_addr().unwrap(); + let ws_listener = WsTransportListener::new(listener); + + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = ws_listener.accept().await.expect("accept"); + let (mut reader, _writer) = conn.into_split(); + let a = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + let b = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + let c = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + (a.header().command, b.header().command, c.header().command) + }); + + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let (ws_client, _resp) = compio_ws::client_async(url, client_tcp) + .await + .expect("ws handshake"); + let conn = WsTransportConn::new(ws_client); + let (_r, mut w) = conn.into_split(); + + let mut batch = vec![ + header_only(Command2::Ping), + header_only(Command2::Prepare), + header_only(Command2::Request), + ]; + w.send_batch(&mut batch).await.expect("send_batch"); + assert!(batch.is_empty(), "Vec must be drained on success"); + + let (a, b, c) = server_task.await.unwrap(); + assert_eq!(a, Command2::Ping); + assert_eq!(b, Command2::Prepare); + assert_eq!(c, Command2::Request); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn malformed_frame_closes_connection() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let server_addr = listener.local_addr().unwrap(); + let ws_listener = WsTransportListener::new(listener); + + let server_task = compio::runtime::spawn(async move { + let (conn, _peer) = ws_listener.accept().await.expect("accept"); + let (mut reader, _writer) = conn.into_split(); + reader.read_message(framing::MAX_MESSAGE_SIZE).await + }); + + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let (mut ws_client, _) = compio_ws::client_async(url, client_tcp).await.unwrap(); + // Deliberately wrong total_size at offset 48 of the 256 B header. + let mut buf = vec![0u8; HEADER_SIZE]; + let bogus = u32::try_from(framing::MAX_MESSAGE_SIZE + 1) + .unwrap_or(u32::MAX) + .to_le_bytes(); + buf[48..52].copy_from_slice(&bogus); + ws_client + .send(tungstenite::Message::Binary(Bytes::from(buf))) + .await + .unwrap(); + // Server's dispatcher closes the WS; reader queue closes, + // surfacing as ConnectionClosed. + let res = server_task.await.unwrap(); + assert!(matches!(res, Err(IggyError::ConnectionClosed))); + } +}
