This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 901629132329be1a276e49c55511ce7bf25a3fd3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Apr 24 11:56:56 2026 +0200 feat(message_bus): define Transport trait family (IGGY-112, P1-T1) Phase 1 of the transport-plan needs a stable seam so TCP, WS, and QUIC impls can slot in without each reinventing accept / split / batched-write plumbing. The current `transports/mod.rs` was a placeholder; nothing existed for downstream phases to build on. Introduce four traits with associated types (monomorphic hot path): `TransportListener::accept`, `TransportConn::into_split`, `TransportReader::read_message`, `TransportWriter::send_batch`. `send_batch` is atomic-or-error over a reusable `Vec<Frozen<..>>` so the TCP `writev` batching (I4) and zero-copy `Frozen` ownership (I8) stay expressible; the `async fn` sits inside the writer task so `send_to_*` keeps its no-yield caller-side contract (I2). `'static` bounds let the halves move into spawned compio tasks. Behavior-preserving: no impls change, no call sites touched. P1-T2 moves TCP behind the trait; WS / QUIC plug in under Phase 2+ when their demand gates fire. --- core/message_bus/src/transports/mod.rs | 199 ++++++++++++++++++++++++++++++++- 1 file changed, 195 insertions(+), 4 deletions(-) diff --git a/core/message_bus/src/transports/mod.rs b/core/message_bus/src/transports/mod.rs index aa0c7c27b..f47e0c404 100644 --- a/core/message_bus/src/transports/mod.rs +++ b/core/message_bus/src/transports/mod.rs @@ -15,8 +15,199 @@ // specific language governing permissions and limitations // under the License. -//! Transport abstraction - placeholder. +//! Transport abstraction for the `message_bus` wire planes. //! -//! Tracked by IGGY-112. This module is deliberately empty until the -//! `Transport` / `TransportListener` / `TransportConn` trait family lands; -//! do not grow speculative scaffolding here. +//! # Trait family +//! +//! - [`TransportListener`] binds a local address and yields inbound +//! [`TransportConn`]s on `accept`. +//! - [`TransportConn`] is a single byte-oriented connection; it splits +//! into a [`TransportReader`] and a [`TransportWriter`] that move into +//! independent compio tasks (reader loop, writer-batch task). +//! - [`TransportReader`] reads one framed `Message<GenericHeader>` per +//! call. +//! - [`TransportWriter`] sends a batch of `Frozen<MESSAGE_ALIGN>` +//! atomically in a single `writev` (TCP) or per-transport analog +//! (framed binary for WS, one STREAM per message for QUIC). +//! +//! # Invariants the trait surface must preserve +//! +//! - **No-yield on caller-side** (I2): producers call +//! [`MessageBus::send_to_client`](crate::MessageBus::send_to_client) / +//! [`send_to_replica`](crate::MessageBus::send_to_replica), which push +//! onto a bounded `async_channel` and return `Ready` on the first +//! poll. The single yield point in the wire path lives inside +//! [`TransportWriter::send_batch`], invoked by the per-peer writer +//! task. Consensus code relies on this for reentrancy reasoning. +//! - **Batch atomicity** (I4): `send_batch` either writes every buffer +//! in the batch or returns an error; no partial success is exposed. +//! TCP satisfies this via `compio::io::AsyncWriteExt::write_vectored_all`; +//! WS/QUIC impls loop internally until the batch lands or fail. +//! - **Zero-copy `Frozen` ownership** (I8): the batch is handed to the +//! kernel without intermediate copies. Impls MUST NOT clone +//! `Frozen<MESSAGE_ALIGN>` or materialize a `Vec<u8>` on the hot +//! path. +//! +//! # Design notes +//! +//! - Associated types (not trait objects): keep the hot path +//! monomorphic. A `dyn TransportConn` surface can be introduced later +//! if a config-time transport selector needs runtime polymorphism. +//! - `'static` bound on [`TransportConn`], [`TransportReader`], and +//! [`TransportWriter`]: the halves are moved into +//! `compio::runtime::spawn`'d tasks, which require owned data. +//! - fd-delegation (`F_DUPFD_CLOEXEC`) stays TCP-only and lives outside +//! this trait (see [`crate::fd_transfer`]). Other transports terminate +//! on shard 0 and forward `Frozen<MESSAGE_ALIGN>` over the inter-shard +//! flume via [`crate::ShardForwardFn`]. +//! +//! The TCP impl lives in `transports/tcp.rs` (P1-T2). Phase 2+ transports +//! plug in behind the same surface; see +//! `Documents/silverhand/iggy/message_bus/transport-plan/`. + +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::consensus::iobuf::Frozen; +use iggy_binary_protocol::{GenericHeader, Message}; +use iggy_common::IggyError; +use std::io; +use std::net::SocketAddr; + +/// Inbound listener. The accept loops in [`crate::replica_listener`] +/// and [`crate::client_listener`] invoke `accept` inside a `select!` +/// against the bus-wide shutdown token. +#[allow(async_fn_in_trait)] +pub trait TransportListener { + /// Connection type produced by this listener. + type Conn: TransportConn; + + /// Accept the next inbound connection. + /// + /// Returns once a peer has completed the transport-level handshake + /// (TCP SYN/ACK; TLS; QUIC handshake). The `message_bus` + /// authenticated `Ping` handshake runs on the returned connection, + /// after this call. See [`crate::auth`]. + /// + /// # Errors + /// + /// Returns [`io::Error`] on socket faults. The caller logs and + /// continues; a fatal error must be surfaced by dropping the + /// listener rather than through this return value. + async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)>; +} + +/// Single-connection handle produced by [`TransportListener::accept`] +/// or by a transport-specific dialer (`TcpStream::connect`, etc.). +/// +/// `'static` so the reader and writer halves can move into spawned +/// compio tasks. +pub trait TransportConn: 'static { + /// Read half moved into the per-connection reader task. + type Reader: TransportReader; + /// Write half moved into the per-connection writer-batch task. + type Writer: TransportWriter; + + /// Split the connection into independently-owned halves. + /// + /// Mirrors [`compio::net::TcpStream::into_split`] on TCP. WS and + /// QUIC impls map onto their own half-splits or stream wrappers. + fn into_split(self) -> (Self::Reader, Self::Writer); +} + +/// Read half. Produces one framed [`Message<GenericHeader>`] per call. +#[allow(async_fn_in_trait)] +pub trait TransportReader: Unpin + 'static { + /// Read the next `GenericHeader`-framed message off the wire. + /// + /// Validates the 256 B header and bounds the total frame size to + /// `max_message_size`. TCP delegates to + /// [`crate::framing::read_message`]; WS wraps the single binary + /// frame; QUIC consumes one STREAM worth of bytes. + /// + /// # Errors + /// + /// - [`IggyError::ConnectionClosed`] on clean EOF. + /// - [`IggyError::TcpError`] on transport I/O faults. + /// - [`IggyError::InvalidCommand`] on framing violations (bad size + /// field, total frame exceeding `max_message_size`, header decode + /// failure). + async fn read_message( + &mut self, + max_message_size: usize, + ) -> Result<Message<GenericHeader>, IggyError>; +} + +/// Write half. Atomic-or-error batched writer. +#[allow(async_fn_in_trait)] +pub trait TransportWriter: Unpin + 'static { + /// Send every buffer in `batch` atomically. + /// + /// On success the Vec is drained (empty on return); the same + /// allocation is reused by the writer loop on the next iteration, + /// so impls must not free it. On error the buffers may or may not + /// have partially landed on the wire; the caller MUST treat a + /// failed batch as lost data and drop the connection. VSR + /// retransmits via the WAL. + /// + /// TCP implements this via + /// [`compio::io::AsyncWriteExt::write_vectored_all`] on the owned + /// write half, with `max_batch <= IOV_MAX / 2 = 512`. WS/QUIC impls + /// loop internally until every buffer lands or fail the whole + /// batch. + /// + /// # Errors + /// + /// Returns [`io::Error`] on transport faults. No partial-success + /// variant is exposed. + async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> io::Result<()>; +} + +#[cfg(test)] +#[allow(dead_code)] +mod tests { + //! Compile-only stubs that exercise each trait's associated-type + //! plumbing and method signatures. Method bodies `unimplemented!()` + //! so every compile-time obligation (bounds, lifetimes, `'static`) + //! is enforced without needing a runtime harness. + use super::{TransportConn, TransportListener, TransportReader, TransportWriter}; + use iggy_binary_protocol::consensus::MESSAGE_ALIGN; + use iggy_binary_protocol::consensus::iobuf::Frozen; + use iggy_binary_protocol::{GenericHeader, Message}; + use iggy_common::IggyError; + use std::io; + use std::net::SocketAddr; + + struct StubReader; + struct StubWriter; + struct StubConn; + struct StubListener; + + impl TransportReader for StubReader { + async fn read_message( + &mut self, + _max_message_size: usize, + ) -> Result<Message<GenericHeader>, IggyError> { + unimplemented!() + } + } + + impl TransportWriter for StubWriter { + async fn send_batch(&mut self, _batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> io::Result<()> { + unimplemented!() + } + } + + impl TransportConn for StubConn { + type Reader = StubReader; + type Writer = StubWriter; + fn into_split(self) -> (Self::Reader, Self::Writer) { + unimplemented!() + } + } + + impl TransportListener for StubListener { + type Conn = StubConn; + async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)> { + unimplemented!() + } + } +}
