This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0aeea4551c0ce4574757403775abaea47b8944b3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Apr 24 12:08:26 2026 +0200 feat(message_bus): generic installer over TransportConn (IGGY-112, P1-T3) Closes out the installer side of Phase 1: `install_replica_stream` and `install_client_stream` were `TcpStream`-hardcoded, which forced every future transport (WS, QUIC) to duplicate the registry insert, the `InstanceToken` fencing, the `install_aborted` race handling, and the `conn_shutdown` reader-wake. Those invariants are load-bearing and must not fork. Split each install path into two entries: - TCP entry keeps the existing `TcpStream` signature and is where the per-connection socket options (keepalive, `TCP_NODELAY`) live. These are TCP-specific by construction; other transports terminate above the raw fd layer. - Generic entry `install_replica_conn<C: TransportConn>` / `install_client_conn<C: TransportConn>` carries the meat of the install: split into reader/writer halves via the trait, spawn the writer task on `writer_task::run_transport`, and run the reader loop through `TransportReader::read_message`. The read loops become `<R: TransportReader>`; the inline `framing::read_message` call turns into a trait method call without any behavioural change. `install_replica_fd` / `install_client_fd` continue to call the TCP entry (fd-delegation stays TCP-only per I5); existing test callers (`tests/common/mod.rs`, `tests/duplicate_client_id.rs`) keep the `TcpStream` signature unchanged. All 100 tests pass; `cargo clippy -p message_bus --all-features --all-targets -- -D warnings` clean; workspace builds. --- core/message_bus/src/installer.rs | 90 +++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 27 deletions(-) diff --git a/core/message_bus/src/installer.rs b/core/message_bus/src/installer.rs index 3ab6d615a..21d381296 100644 --- a/core/message_bus/src/installer.rs +++ b/core/message_bus/src/installer.rs @@ -26,12 +26,12 @@ use crate::client_listener::RequestHandler; use crate::fd_transfer::{self, DupedFd}; -use crate::framing; use crate::lifecycle::{InstanceToken, RejectedRegistration, Shutdown}; use crate::replica_listener::MessageHandler; use crate::socket_opts::{apply_keepalive_for_connection, apply_nodelay_for_connection}; +use crate::transports::{TcpTransportConn, TransportConn, TransportReader}; use crate::{IggyMessageBus, lifecycle::ShutdownToken}; -use compio::net::{OwnedReadHalf, TcpStream}; +use compio::net::TcpStream; use futures::FutureExt; use iggy_binary_protocol::Command2; use std::cell::Cell; @@ -89,25 +89,22 @@ impl ConnectionInstaller for Rc<IggyMessageBus> { } } -/// Install a pre-wrapped replica TCP stream on the bus. Shared by -/// `install_replica_fd` and, once shard 0 acts as the delegate target for -/// itself, the accept path. -#[allow(clippy::future_not_send, clippy::too_many_lines)] +/// TCP entry point: apply socket options (keepalive, `TCP_NODELAY`) on +/// the raw stream and delegate to the transport-generic install path. +/// +/// Socket options live here (not on the generic path) because they are +/// TCP-specific; other transports (WSS via TLS terminator, QUIC) lack a +/// raw-fd level at this layer. Kept as a separate function so the +/// `install_replica_fd` fd-delegation entry and the accept callbacks +/// (`AcceptedReplicaFn` in tests and shard-0 coordinator) converge on +/// one place for those options. +#[allow(clippy::future_not_send)] pub fn install_replica_stream( bus: &Rc<IggyMessageBus>, peer_id: u8, stream: TcpStream, on_message: MessageHandler, ) { - if bus.replicas().contains(peer_id) { - debug!( - replica = peer_id, - "replica already registered on this shard, dropping delegated fd" - ); - drop(stream); - return; - } - let cfg = bus.config(); if let Err(e) = apply_keepalive_for_connection( &stream, @@ -123,8 +120,34 @@ pub fn install_replica_stream( // A miss means we stay Nagle-on for this peer, not a failure. warn!(replica = peer_id, "nodelay failed on delegated fd: {e}"); } + install_replica_conn(bus, peer_id, TcpTransportConn::new(stream), on_message); +} + +/// Install a pre-wrapped replica connection on the bus. +/// +/// Generic over [`TransportConn`] so alternate transports (WS via +/// shard-0 TLS terminator, QUIC via `compio-quic`) plug in behind the +/// same registry-insert + instance-token fencing + install-race +/// handling. TCP-specific socket options live in +/// [`install_replica_stream`]; transports with no equivalent layer call +/// this entry directly with their already-configured connection. +#[allow(clippy::future_not_send, clippy::too_many_lines)] +pub fn install_replica_conn<C: TransportConn>( + bus: &Rc<IggyMessageBus>, + peer_id: u8, + conn: C, + on_message: MessageHandler, +) { + if bus.replicas().contains(peer_id) { + debug!( + replica = peer_id, + "replica already registered on this shard, dropping delegated fd" + ); + drop(conn); + return; + } - let (read_half, write_half) = stream.into_split(); + let (read_half, write_half) = conn.into_split(); let (tx, rx) = async_channel::bounded(bus.peer_queue_capacity()); // Writer and reader both observe abnormal close and used to fire @@ -168,7 +191,7 @@ pub fn install_replica_stream( let token_for_writer = Rc::clone(&install_token); let max_batch = bus.config().max_batch; let writer_handle = compio::runtime::spawn(async move { - crate::writer_task::run( + crate::writer_task::run_transport( rx, write_half, writer_token, @@ -262,8 +285,10 @@ pub fn install_replica_stream( } } -/// Install a pre-wrapped client TCP stream on the bus. -#[allow(clippy::future_not_send, clippy::too_many_lines)] +/// TCP entry point for client installs. Applies socket options and +/// delegates to [`install_client_conn`]. See [`install_replica_stream`] +/// for the plane-symmetric docs. +#[allow(clippy::future_not_send)] pub fn install_client_stream( bus: &Rc<IggyMessageBus>, client_id: u128, @@ -288,8 +313,19 @@ pub fn install_client_stream( "nodelay failed on delegated client fd: {e}" ); } + install_client_conn(bus, client_id, TcpTransportConn::new(stream), on_request); +} - let (read_half, write_half) = stream.into_split(); +/// Install a pre-wrapped client connection on the bus. Generic over +/// [`TransportConn`]; plane-symmetric with [`install_replica_conn`]. +#[allow(clippy::future_not_send, clippy::too_many_lines)] +pub fn install_client_conn<C: TransportConn>( + bus: &Rc<IggyMessageBus>, + client_id: u128, + conn: C, + on_request: RequestHandler, +) { + let (read_half, write_half) = conn.into_split(); let (tx, rx) = async_channel::bounded(bus.peer_queue_capacity()); // If the registry insert below loses a race for `client_id`, the @@ -313,7 +349,7 @@ pub fn install_client_stream( let token_for_writer = Rc::clone(&install_token); let max_batch = bus.config().max_batch; let writer_handle = compio::runtime::spawn(async move { - crate::writer_task::run( + crate::writer_task::run_transport( rx, write_half, writer_token, @@ -445,9 +481,9 @@ async fn drain_rejected_registration(rejected: RejectedRegistration, timeout: Du /// the insert-race path: it wakes the reader off its `io_uring` read SQE /// immediately rather than waiting for peer EOF. #[allow(clippy::future_not_send)] -async fn replica_read_loop( +async fn replica_read_loop<R: TransportReader>( replica_id: u8, - mut read_half: OwnedReadHalf<TcpStream>, + mut read_half: R, on_message: &MessageHandler, token: &ShutdownToken, conn_token: &ShutdownToken, @@ -464,7 +500,7 @@ async fn replica_read_loop( debug!(replica = replica_id, "replica read loop aborted by per-connection shutdown"); return; } - result = framing::read_message(&mut read_half, max_message_size).fuse() => { + result = read_half.read_message(max_message_size).fuse() => { match result { Ok(msg) => { if aborted.get() { @@ -495,9 +531,9 @@ async fn replica_read_loop( /// the insert-race path: it wakes the reader off its `io_uring` read SQE /// immediately rather than waiting for peer EOF. #[allow(clippy::future_not_send)] -async fn client_read_loop( +async fn client_read_loop<R: TransportReader>( client_id: u128, - mut read_half: OwnedReadHalf<TcpStream>, + mut read_half: R, on_request: &RequestHandler, token: &ShutdownToken, conn_token: &ShutdownToken, @@ -514,7 +550,7 @@ async fn client_read_loop( debug!(client = client_id, "client read loop aborted by per-connection shutdown"); return; } - result = framing::read_message(&mut read_half, max_message_size).fuse() => { + result = read_half.read_message(max_message_size).fuse() => { match result { Ok(msg) => { if aborted.get() {
