This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 4f9af5ea514287701b5283950d15359ce74497a2
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Apr 24 12:03:28 2026 +0200

    feat(message_bus): TCP impls behind Transport traits (IGGY-112, P1-T2)
    
    Phase 1 plugs TCP into the trait surface landed in P1-T1. Without
    an impl, the traits are untestable scaffolding and no downstream
    transport (WS, QUIC) has a reference to compare against. Wire and
    socket behaviour must not change: existing `write_vectored_all`
    batching, zero-copy `Frozen` ownership, and the per-peer drain
    cadence are load-bearing (I4, I8).
    
    Add `transports/tcp.rs` with `TcpTransportListener`,
    `TcpTransportConn`, `TcpTransportReader`, `TcpTransportWriter`.
    `send_batch` does `mem::take` + `write_vectored_all` + `clear`,
    returning the drained Vec through the caller's `&mut` slot so the
    allocation is reused across iterations. Migrate `writer_task::run`
    to drive the trait via a new generic `run_transport<W: TransportWriter>`;
    the TCP entry point keeps its exact public signature and just wraps
    the owned write half in `TcpTransportWriter` before delegating. Fast
    path stays monomorphized - no dyn dispatch, no vtable.
    
    Behavior-preserving: 100 tests pass (96 existing + 4 new trait
    exercises covering listener accept, batched writes, empty batch,
    framing error). `installer::install_*_stream` still takes
    `TcpStream` directly; generic migration there is P1-T3.
---
 core/message_bus/src/transports/mod.rs |   4 +
 core/message_bus/src/transports/tcp.rs | 275 +++++++++++++++++++++++++++++++++
 core/message_bus/src/writer_task.rs    |  48 ++++--
 3 files changed, 314 insertions(+), 13 deletions(-)

diff --git a/core/message_bus/src/transports/mod.rs 
b/core/message_bus/src/transports/mod.rs
index f47e0c404..c98faf467 100644
--- a/core/message_bus/src/transports/mod.rs
+++ b/core/message_bus/src/transports/mod.rs
@@ -65,6 +65,10 @@
 //! plug in behind the same surface; see
 //! `Documents/silverhand/iggy/message_bus/transport-plan/`.
 
+pub mod tcp;
+
+pub use tcp::{TcpTransportConn, TcpTransportListener, TcpTransportReader, 
TcpTransportWriter};
+
 use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
 use iggy_binary_protocol::consensus::iobuf::Frozen;
 use iggy_binary_protocol::{GenericHeader, Message};
diff --git a/core/message_bus/src/transports/tcp.rs 
b/core/message_bus/src/transports/tcp.rs
new file mode 100644
index 000000000..7478c99b4
--- /dev/null
+++ b/core/message_bus/src/transports/tcp.rs
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TCP impls of the [`super`] transport traits.
+//!
+//! Behavior-preserving wrappers around `compio::net::TcpListener`,
+//! `compio::net::TcpStream`, and the split halves. The hot path on
+//! [`TcpTransportWriter::send_batch`] is identical to
+//! [`crate::writer_task::run`]'s inner `write_vectored_all` call: one
+//! syscall per batch, zero intermediate copies of `Frozen`.
+//!
+//! Callers that need listener / dialer ergonomics (socket options,
+//! keepalive, directional accept rules) should continue to use the
+//! free functions in [`crate::replica_listener`] and
+//! [`crate::client_listener`]; this module is the minimal trait
+//! adapter, not a replacement for those call sites. Integration with
+//! `installer::install_*_stream` lands in P1-T3.
+
+use super::{TransportConn, TransportListener, TransportReader, 
TransportWriter};
+use crate::framing;
+use compio::io::AsyncWriteExt;
+use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpListener, TcpStream};
+use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
+use iggy_binary_protocol::consensus::iobuf::Frozen;
+use iggy_binary_protocol::{GenericHeader, Message};
+use iggy_common::IggyError;
+use std::io;
+use std::mem;
+use std::net::SocketAddr;
+
+/// Inbound TCP listener wrapper.
+///
+/// Constructed from an already-bound [`TcpListener`] so the caller
+/// keeps control over socket options (`SO_REUSEPORT`, `nodelay`,
+/// `keepalive`) via `compio::net::SocketOpts`. See
+/// [`crate::replica_listener::bind`] and
+/// [`crate::client_listener::bind`] for the canonical construction.
+pub struct TcpTransportListener {
+    inner: TcpListener,
+}
+
+impl TcpTransportListener {
+    #[must_use]
+    pub const fn new(inner: TcpListener) -> Self {
+        Self { inner }
+    }
+}
+
+impl TransportListener for TcpTransportListener {
+    type Conn = TcpTransportConn;
+
+    #[allow(clippy::future_not_send)]
+    async fn accept(&self) -> io::Result<(Self::Conn, SocketAddr)> {
+        let (stream, addr) = self.inner.accept().await?;
+        Ok((TcpTransportConn::new(stream), addr))
+    }
+}
+
+/// Single TCP connection.
+///
+/// Produced by [`TcpTransportListener::accept`] or by wrapping the
+/// result of a `TcpStream::connect` on the dialer path. Takes ownership
+/// of the stream; [`Self::into_split`] transfers that ownership into
+/// the read and write halves bound to the per-connection tasks.
+pub struct TcpTransportConn {
+    stream: TcpStream,
+}
+
+impl TcpTransportConn {
+    #[must_use]
+    pub const fn new(stream: TcpStream) -> Self {
+        Self { stream }
+    }
+}
+
+impl TransportConn for TcpTransportConn {
+    type Reader = TcpTransportReader;
+    type Writer = TcpTransportWriter;
+
+    fn into_split(self) -> (Self::Reader, Self::Writer) {
+        let (read_half, write_half) = self.stream.into_split();
+        (
+            TcpTransportReader { inner: read_half },
+            TcpTransportWriter { inner: write_half },
+        )
+    }
+}
+
+/// Read half bound to the per-connection reader task.
+///
+/// [`TransportReader::read_message`] delegates to
+/// [`framing::read_message`]; the two paths share the same header
+/// decode, bounds check, and zero-copy `Owned<MESSAGE_ALIGN>`
+/// allocation strategy.
+pub struct TcpTransportReader {
+    inner: OwnedReadHalf<TcpStream>,
+}
+
+impl TcpTransportReader {
+    #[must_use]
+    pub const fn new(inner: OwnedReadHalf<TcpStream>) -> Self {
+        Self { inner }
+    }
+}
+
+impl TransportReader for TcpTransportReader {
+    #[allow(clippy::future_not_send)]
+    async fn read_message(
+        &mut self,
+        max_message_size: usize,
+    ) -> Result<Message<GenericHeader>, IggyError> {
+        framing::read_message(&mut self.inner, max_message_size).await
+    }
+}
+
+/// Write half bound to the per-connection writer-batch task.
+///
+/// [`TransportWriter::send_batch`] calls
+/// [`compio::io::AsyncWriteExt::write_vectored_all`] exactly once per
+/// invocation. The caller (e.g. [`crate::writer_task::run`]) is
+/// responsible for capping the batch size to
+/// `max_batch <= IOV_MAX / 2 = 512`; this impl does not enforce a cap
+/// because the Vec is already drained by the caller's admission
+/// control.
+pub struct TcpTransportWriter {
+    inner: OwnedWriteHalf<TcpStream>,
+}
+
+impl TcpTransportWriter {
+    #[must_use]
+    pub const fn new(inner: OwnedWriteHalf<TcpStream>) -> Self {
+        Self { inner }
+    }
+}
+
+impl TransportWriter for TcpTransportWriter {
+    #[allow(clippy::future_not_send)]
+    async fn send_batch(&mut self, batch: &mut Vec<Frozen<MESSAGE_ALIGN>>) -> 
io::Result<()> {
+        // `write_vectored_all` consumes the Vec via compio's `IoVectoredBuf`
+        // surface and returns it through `BufResult` so we can reuse the
+        // allocation. Take the inner Vec, hand it to the kernel, put the
+        // (now-empty) returned Vec back into the caller's slot.
+        let owned = mem::take(batch);
+        let compio::BufResult(result, mut returned) = 
self.inner.write_vectored_all(owned).await;
+        returned.clear();
+        *batch = returned;
+        result
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_binary_protocol::consensus::iobuf::Frozen;
+    use iggy_binary_protocol::{Command2, HEADER_SIZE};
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> {
+        Message::<GenericHeader>::new(HEADER_SIZE)
+            .transmute_header(|_, h: &mut GenericHeader| {
+                h.command = command;
+                h.size = HEADER_SIZE as u32;
+            })
+            .into_frozen()
+    }
+
+    #[allow(clippy::future_not_send)]
+    async fn local_pair() -> (TcpStream, TcpStream) {
+        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+        let addr = listener.local_addr().unwrap();
+        let connect = TcpStream::connect(addr);
+        let accept = listener.accept();
+        let (client_res, accept_res) = futures::join!(connect, accept);
+        let (server, _) = accept_res.unwrap();
+        (client_res.unwrap(), server)
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn listener_accept_yields_conn() {
+        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+        let addr = listener.local_addr().unwrap();
+        let wrapped = TcpTransportListener::new(listener);
+
+        let connect = TcpStream::connect(addr);
+        let accept = wrapped.accept();
+        let (_client, accept_res) = futures::join!(connect, accept);
+        let (_conn, _peer_addr) = accept_res.expect("accept via trait");
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn send_batch_writes_all_and_drains_vec() {
+        let (client, server) = local_pair().await;
+        let client_conn = TcpTransportConn::new(client);
+        let (_client_read, mut client_write) = client_conn.into_split();
+
+        let server_conn = TcpTransportConn::new(server);
+        let (mut server_read, _server_write) = server_conn.into_split();
+
+        let mut batch = vec![
+            header_only(Command2::Ping),
+            header_only(Command2::Prepare),
+            header_only(Command2::Request),
+        ];
+        client_write
+            .send_batch(&mut batch)
+            .await
+            .expect("send_batch");
+        assert!(batch.is_empty(), "Vec must be drained on success");
+        assert!(batch.capacity() >= 3, "allocation must be reused");
+
+        // Verify all three frames land intact in order.
+        let a = server_read
+            .read_message(framing::MAX_MESSAGE_SIZE)
+            .await
+            .unwrap();
+        let b = server_read
+            .read_message(framing::MAX_MESSAGE_SIZE)
+            .await
+            .unwrap();
+        let c = server_read
+            .read_message(framing::MAX_MESSAGE_SIZE)
+            .await
+            .unwrap();
+        assert_eq!(a.header().command, Command2::Ping);
+        assert_eq!(b.header().command, Command2::Prepare);
+        assert_eq!(c.header().command, Command2::Request);
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn send_batch_empty_is_noop() {
+        let (client, _server) = local_pair().await;
+        let (_r, mut w) = TcpTransportConn::new(client).into_split();
+        let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = Vec::with_capacity(8);
+        w.send_batch(&mut batch).await.expect("empty batch ok");
+        assert!(batch.is_empty());
+    }
+
+    #[compio::test]
+    #[allow(clippy::future_not_send)]
+    async fn read_message_reports_oversize_via_trait() {
+        use compio::io::AsyncWriteExt;
+        let (mut client, server) = local_pair().await;
+        let (mut r, _w) = TcpTransportConn::new(server).into_split();
+
+        // Hand-craft a header with a bogus oversize `size` field; the
+        // trait surface must surface the same `InvalidCommand` error the
+        // framing path does.
+        let mut buf = vec![0u8; HEADER_SIZE];
+        let bogus = u32::try_from(framing::MAX_MESSAGE_SIZE + 1)
+            .unwrap_or(u32::MAX)
+            .to_le_bytes();
+        buf[48..52].copy_from_slice(&bogus);
+        client.write_all(buf).await.0.unwrap();
+
+        let res = r.read_message(framing::MAX_MESSAGE_SIZE).await;
+        assert!(matches!(res, Err(IggyError::InvalidCommand)));
+    }
+}
diff --git a/core/message_bus/src/writer_task.rs 
b/core/message_bus/src/writer_task.rs
index de3f3304e..75b09c827 100644
--- a/core/message_bus/src/writer_task.rs
+++ b/core/message_bus/src/writer_task.rs
@@ -31,7 +31,7 @@
 //! - a write to the wire fails (broken connection).
 
 use crate::lifecycle::{BusMessage, BusReceiver, ShutdownToken};
-use compio::io::AsyncWriteExt;
+use crate::transports::{TcpTransportWriter, TransportWriter};
 use compio::net::{OwnedWriteHalf, TcpStream};
 use futures::FutureExt;
 use tracing::{debug, error, trace};
@@ -42,10 +42,38 @@ use tracing::{debug, error, trace};
 /// `max_batch` caps how many messages a single `writev` syscall coalesces.
 /// Larger batches reduce syscalls per N messages at the cost of memory
 /// per batch and worst-case latency for the head-of-batch message.
+///
+/// TCP entry point. Wraps the owned write half in a
+/// [`TcpTransportWriter`] and delegates the drain loop to
+/// [`run_transport`]; every syscall still flows through
+/// [`TransportWriter::send_batch`] so future transports drop in behind
+/// the same drain logic.
 #[allow(clippy::future_not_send)]
 pub async fn run(
     rx: BusReceiver,
-    mut write_half: OwnedWriteHalf<TcpStream>,
+    write_half: OwnedWriteHalf<TcpStream>,
+    token: ShutdownToken,
+    label: &'static str,
+    peer: String,
+    max_batch: usize,
+) {
+    let writer = TcpTransportWriter::new(write_half);
+    run_transport(rx, writer, token, label, peer, max_batch).await;
+}
+
+/// Generic drain loop over any [`TransportWriter`].
+///
+/// Pulls `BusMessage`s off the per-peer mpsc, coalesces up to
+/// `max_batch` into a single [`TransportWriter::send_batch`] call, and
+/// exits cleanly on shutdown, channel close, or write error.
+///
+/// Public so alternate transports (WS, QUIC) can reuse the admission
+/// control and batch sizing identically to TCP; keep the body
+/// transport-agnostic.
+#[allow(clippy::future_not_send)]
+pub async fn run_transport<W: TransportWriter>(
+    rx: BusReceiver,
+    mut writer: W,
     token: ShutdownToken,
     label: &'static str,
     peer: String,
@@ -89,12 +117,10 @@ pub async fn run(
         let drained = batch.len();
         trace!(%label, %peer, batch = drained, "writev batch");
 
-        // Single writev for the whole batch. write_vectored_all loops
-        // internally on partial writes until the full batch lands or the
-        // socket errors.
-        let to_write = std::mem::take(&mut batch);
-        let compio::BufResult(result, mut returned) = 
write_half.write_vectored_all(to_write).await;
-        if let Err(e) = result {
+        // Single batch send. The `TransportWriter` impl is atomic-or-error
+        // and drains `batch` in place on success so the allocation is
+        // reused across iterations.
+        if let Err(e) = writer.send_batch(&mut batch).await {
             // Error (not warn) because the batch is now on the floor:
             // VSR's prepare-timeout or view-change will recover, but the
             // operator needs a loud diagnostic to correlate with the
@@ -105,13 +131,9 @@ pub async fn run(
                 %peer,
                 error = ?e,
                 batch_len = drained,
-                "writer task: write_vectored_all failed, dropping batch"
+                "writer task: send_batch failed, dropping batch"
             );
             return;
         }
-
-        // Reuse the Vec allocation across iterations.
-        returned.clear();
-        batch = returned;
     }
 }

Reply via email to