This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 4f9af5ea514287701b5283950d15359ce74497a2 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Apr 24 12:03:28 2026 +0200 feat(message_bus): TCP impls behind Transport traits (IGGY-112, P1-T2) Phase 1 plugs TCP into the trait surface landed in P1-T1. Without an impl, the traits are untestable scaffolding and no downstream transport (WS, QUIC) has a reference to compare against. Wire and socket behaviour must not change: existing `write_vectored_all` batching, zero-copy `Frozen` ownership, and the per-peer drain cadence are load-bearing (I4, I8). Add `transports/tcp.rs` with `TcpTransportListener`, `TcpTransportConn`, `TcpTransportReader`, `TcpTransportWriter`. `send_batch` does `mem::take` + `write_vectored_all` + `clear`, returning the drained Vec through the caller's `&mut` slot so the allocation is reused across iterations. Migrate `writer_task::run` to drive the trait via a new generic `run_transport<W: TransportWriter>`; the TCP entry point keeps its exact public signature and just wraps the owned write half in `TcpTransportWriter` before delegating. Fast path stays monomorphized - no dyn dispatch, no vtable. Behavior-preserving: 100 tests pass (96 existing + 4 new trait exercises covering listener accept, batched writes, empty batch, framing error). `installer::install_*_stream` still takes `TcpStream` directly; generic migration there is P1-T3. --- core/message_bus/src/transports/mod.rs | 4 + core/message_bus/src/transports/tcp.rs | 275 +++++++++++++++++++++++++++++++++ core/message_bus/src/writer_task.rs | 48 ++++-- 3 files changed, 314 insertions(+), 13 deletions(-) diff --git a/core/message_bus/src/transports/mod.rs b/core/message_bus/src/transports/mod.rs index f47e0c404..c98faf467 100644 --- a/core/message_bus/src/transports/mod.rs +++ b/core/message_bus/src/transports/mod.rs @@ -65,6 +65,10 @@ //! plug in behind the same surface; see //! `Documents/silverhand/iggy/message_bus/transport-plan/`. +pub mod tcp; + +pub use tcp::{TcpTransportConn, TcpTransportListener, TcpTransportReader, TcpTransportWriter}; + use iggy_binary_protocol::consensus::MESSAGE_ALIGN; use iggy_binary_protocol::consensus::iobuf::Frozen; use iggy_binary_protocol::{GenericHeader, Message}; diff --git a/core/message_bus/src/transports/tcp.rs b/core/message_bus/src/transports/tcp.rs new file mode 100644 index 000000000..7478c99b4 --- /dev/null +++ b/core/message_bus/src/transports/tcp.rs @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TCP impls of the [`super`] transport traits. +//! +//! Behavior-preserving wrappers around `compio::net::TcpListener`, +//! `compio::net::TcpStream`, and the split halves. The hot path on +//! [`TcpTransportWriter::send_batch`] is identical to +//! [`crate::writer_task::run`]'s inner `write_vectored_all` call: one +//! syscall per batch, zero intermediate copies of `Frozen`. +//! +//! Callers that need listener / dialer ergonomics (socket options, +//! keepalive, directional accept rules) should continue to use the +//! free functions in [`crate::replica_listener`] and +//! [`crate::client_listener`]; this module is the minimal trait +//! adapter, not a replacement for those call sites. Integration with +//! `installer::install_*_stream` lands in P1-T3. + +use super::{TransportConn, TransportListener, TransportReader, TransportWriter}; +use crate::framing; +use compio::io::AsyncWriteExt; +use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpListener, TcpStream}; +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::consensus::iobuf::Frozen; +use iggy_binary_protocol::{GenericHeader, Message}; +use iggy_common::IggyError; +use std::io; +use std::mem; +use std::net::SocketAddr; + +/// Inbound TCP listener wrapper. +/// +/// Constructed from an already-bound [`TcpListener`] so the caller +/// keeps control over socket options (`SO_REUSEPORT`, `nodelay`, +/// `keepalive`) via `compio::net::SocketOpts`. See +/// [`crate::replica_listener::bind`] and +/// [`crate::client_listener::bind`] for the canonical construction. +pub struct TcpTransportListener { + inner: TcpListener, +} + +impl TcpTransportListener { + #[must_use] + pub const fn new(inner: TcpListener) -> Self { + Self { inner } + } +} + +impl TransportListener for TcpTransportListener { + type Conn = TcpTransportConn; + + #[allow(clippy::future_not_send)] + async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)> { + let (stream, addr) = self.inner.accept().await?; + Ok((TcpTransportConn::new(stream), addr)) + } +} + +/// Single TCP connection. +/// +/// Produced by [`TcpTransportListener::accept`] or by wrapping the +/// result of a `TcpStream::connect` on the dialer path. Takes ownership +/// of the stream; [`Self::into_split`] transfers that ownership into +/// the read and write halves bound to the per-connection tasks. +pub struct TcpTransportConn { + stream: TcpStream, +} + +impl TcpTransportConn { + #[must_use] + pub const fn new(stream: TcpStream) -> Self { + Self { stream } + } +} + +impl TransportConn for TcpTransportConn { + type Reader = TcpTransportReader; + type Writer = TcpTransportWriter; + + fn into_split(self) -> (Self::Reader, Self::Writer) { + let (read_half, write_half) = self.stream.into_split(); + ( + TcpTransportReader { inner: read_half }, + TcpTransportWriter { inner: write_half }, + ) + } +} + +/// Read half bound to the per-connection reader task. +/// +/// [`TransportReader::read_message`] delegates to +/// [`framing::read_message`]; the two paths share the same header +/// decode, bounds check, and zero-copy `Owned<MESSAGE_ALIGN>` +/// allocation strategy. +pub struct TcpTransportReader { + inner: OwnedReadHalf<TcpStream>, +} + +impl TcpTransportReader { + #[must_use] + pub const fn new(inner: OwnedReadHalf<TcpStream>) -> Self { + Self { inner } + } +} + +impl TransportReader for TcpTransportReader { + #[allow(clippy::future_not_send)] + async fn read_message( + &mut self, + max_message_size: usize, + ) -> Result<Message<GenericHeader>, IggyError> { + framing::read_message(&mut self.inner, max_message_size).await + } +} + +/// Write half bound to the per-connection writer-batch task. +/// +/// [`TransportWriter::send_batch`] calls +/// [`compio::io::AsyncWriteExt::write_vectored_all`] exactly once per +/// invocation. The caller (e.g. [`crate::writer_task::run`]) is +/// responsible for capping the batch size to +/// `max_batch <= IOV_MAX / 2 = 512`; this impl does not enforce a cap +/// because the Vec is already drained by the caller's admission +/// control. +pub struct TcpTransportWriter { + inner: OwnedWriteHalf<TcpStream>, +} + +impl TcpTransportWriter { + #[must_use] + pub const fn new(inner: OwnedWriteHalf<TcpStream>) -> Self { + Self { inner } + } +} + +impl TransportWriter for TcpTransportWriter { + #[allow(clippy::future_not_send)] + async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> io::Result<()> { + // `write_vectored_all` consumes the Vec via compio's `IoVectoredBuf` + // surface and returns it through `BufResult` so we can reuse the + // allocation. Take the inner Vec, hand it to the kernel, put the + // (now-empty) returned Vec back into the caller's slot. + let owned = mem::take(batch); + let compio::BufResult(result, mut returned) = self.inner.write_vectored_all(owned).await; + returned.clear(); + *batch = returned; + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_binary_protocol::consensus::iobuf::Frozen; + use iggy_binary_protocol::{Command2, HEADER_SIZE}; + + #[allow(clippy::cast_possible_truncation)] + fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> { + Message::<GenericHeader>::new(HEADER_SIZE) + .transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.size = HEADER_SIZE as u32; + }) + .into_frozen() + } + + #[allow(clippy::future_not_send)] + async fn local_pair() -> (TcpStream, TcpStream) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let connect = TcpStream::connect(addr); + let accept = listener.accept(); + let (client_res, accept_res) = futures::join!(connect, accept); + let (server, _) = accept_res.unwrap(); + (client_res.unwrap(), server) + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn listener_accept_yields_conn() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let wrapped = TcpTransportListener::new(listener); + + let connect = TcpStream::connect(addr); + let accept = wrapped.accept(); + let (_client, accept_res) = futures::join!(connect, accept); + let (_conn, _peer_addr) = accept_res.expect("accept via trait"); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn send_batch_writes_all_and_drains_vec() { + let (client, server) = local_pair().await; + let client_conn = TcpTransportConn::new(client); + let (_client_read, mut client_write) = client_conn.into_split(); + + let server_conn = TcpTransportConn::new(server); + let (mut server_read, _server_write) = server_conn.into_split(); + + let mut batch = vec![ + header_only(Command2::Ping), + header_only(Command2::Prepare), + header_only(Command2::Request), + ]; + client_write + .send_batch(&mut batch) + .await + .expect("send_batch"); + assert!(batch.is_empty(), "Vec must be drained on success"); + assert!(batch.capacity() >= 3, "allocation must be reused"); + + // Verify all three frames land intact in order. + let a = server_read + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + let b = server_read + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + let c = server_read + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .unwrap(); + assert_eq!(a.header().command, Command2::Ping); + assert_eq!(b.header().command, Command2::Prepare); + assert_eq!(c.header().command, Command2::Request); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn send_batch_empty_is_noop() { + let (client, _server) = local_pair().await; + let (_r, mut w) = TcpTransportConn::new(client).into_split(); + let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = Vec::with_capacity(8); + w.send_batch(&mut batch).await.expect("empty batch ok"); + assert!(batch.is_empty()); + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn read_message_reports_oversize_via_trait() { + use compio::io::AsyncWriteExt; + let (mut client, server) = local_pair().await; + let (mut r, _w) = TcpTransportConn::new(server).into_split(); + + // Hand-craft a header with a bogus oversize `size` field; the + // trait surface must surface the same `InvalidCommand` error the + // framing path does. + let mut buf = vec![0u8; HEADER_SIZE]; + let bogus = u32::try_from(framing::MAX_MESSAGE_SIZE + 1) + .unwrap_or(u32::MAX) + .to_le_bytes(); + buf[48..52].copy_from_slice(&bogus); + client.write_all(buf).await.0.unwrap(); + + let res = r.read_message(framing::MAX_MESSAGE_SIZE).await; + assert!(matches!(res, Err(IggyError::InvalidCommand))); + } +} diff --git a/core/message_bus/src/writer_task.rs b/core/message_bus/src/writer_task.rs index de3f3304e..75b09c827 100644 --- a/core/message_bus/src/writer_task.rs +++ b/core/message_bus/src/writer_task.rs @@ -31,7 +31,7 @@ //! - a write to the wire fails (broken connection). use crate::lifecycle::{BusMessage, BusReceiver, ShutdownToken}; -use compio::io::AsyncWriteExt; +use crate::transports::{TcpTransportWriter, TransportWriter}; use compio::net::{OwnedWriteHalf, TcpStream}; use futures::FutureExt; use tracing::{debug, error, trace}; @@ -42,10 +42,38 @@ use tracing::{debug, error, trace}; /// `max_batch` caps how many messages a single `writev` syscall coalesces. /// Larger batches reduce syscalls per N messages at the cost of memory /// per batch and worst-case latency for the head-of-batch message. +/// +/// TCP entry point. Wraps the owned write half in a +/// [`TcpTransportWriter`] and delegates the drain loop to +/// [`run_transport`]; every syscall still flows through +/// [`TransportWriter::send_batch`] so future transports drop in behind +/// the same drain logic. #[allow(clippy::future_not_send)] pub async fn run( rx: BusReceiver, - mut write_half: OwnedWriteHalf<TcpStream>, + write_half: OwnedWriteHalf<TcpStream>, + token: ShutdownToken, + label: &'static str, + peer: String, + max_batch: usize, +) { + let writer = TcpTransportWriter::new(write_half); + run_transport(rx, writer, token, label, peer, max_batch).await; +} + +/// Generic drain loop over any [`TransportWriter`]. +/// +/// Pulls `BusMessage`s off the per-peer mpsc, coalesces up to +/// `max_batch` into a single [`TransportWriter::send_batch`] call, and +/// exits cleanly on shutdown, channel close, or write error. +/// +/// Public so alternate transports (WS, QUIC) can reuse the admission +/// control and batch sizing identically to TCP; keep the body +/// transport-agnostic. +#[allow(clippy::future_not_send)] +pub async fn run_transport<W: TransportWriter>( + rx: BusReceiver, + mut writer: W, token: ShutdownToken, label: &'static str, peer: String, @@ -89,12 +117,10 @@ pub async fn run( let drained = batch.len(); trace!(%label, %peer, batch = drained, "writev batch"); - // Single writev for the whole batch. write_vectored_all loops - // internally on partial writes until the full batch lands or the - // socket errors. - let to_write = std::mem::take(&mut batch); - let compio::BufResult(result, mut returned) = write_half.write_vectored_all(to_write).await; - if let Err(e) = result { + // Single batch send. The `TransportWriter` impl is atomic-or-error + // and drains `batch` in place on success so the allocation is + // reused across iterations. + if let Err(e) = writer.send_batch(&mut batch).await { // Error (not warn) because the batch is now on the floor: // VSR's prepare-timeout or view-change will recover, but the // operator needs a loud diagnostic to correlate with the @@ -105,13 +131,9 @@ pub async fn run( %peer, error = ?e, batch_len = drained, - "writer task: write_vectored_all failed, dropping batch" + "writer task: send_batch failed, dropping batch" ); return; } - - // Reuse the Vec allocation across iterations. - returned.clear(); - batch = returned; } }
