This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0aeea4551c0ce4574757403775abaea47b8944b3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Apr 24 12:08:26 2026 +0200

    feat(message_bus): generic installer over TransportConn (IGGY-112, P1-T3)
    
    Closes out the installer side of Phase 1: `install_replica_stream` and
    `install_client_stream` were `TcpStream`-hardcoded, which forced every
    future transport (WS, QUIC) to duplicate the registry insert, the
    `InstanceToken` fencing, the `install_aborted` race handling, and the
    `conn_shutdown` reader-wake. Those invariants are load-bearing and
    must not fork.
    
    Split each install path into two entries:
      - TCP entry keeps the existing `TcpStream` signature and is where
        the per-connection socket options (keepalive, `TCP_NODELAY`)
        live. These are TCP-specific by construction; other transports
        terminate above the raw fd layer.
      - Generic entry `install_replica_conn<C: TransportConn>` /
        `install_client_conn<C: TransportConn>` carries the meat of the
        install: split into reader/writer halves via the trait, spawn
        the writer task on `writer_task::run_transport`, and run the
        reader loop through `TransportReader::read_message`.
    
    The read loops become `<R: TransportReader>`; the inline
    `framing::read_message` call turns into a trait method call without
    any behavioural change.
    
    `install_replica_fd` / `install_client_fd` continue to call the TCP
    entry (fd-delegation stays TCP-only per I5); existing test callers
    (`tests/common/mod.rs`, `tests/duplicate_client_id.rs`) keep the
    `TcpStream` signature unchanged.
    
    All 100 tests pass; `cargo clippy -p message_bus --all-features
    --all-targets -- -D warnings` clean; workspace builds.
---
 core/message_bus/src/installer.rs | 90 +++++++++++++++++++++++++++------------
 1 file changed, 63 insertions(+), 27 deletions(-)

diff --git a/core/message_bus/src/installer.rs 
b/core/message_bus/src/installer.rs
index 3ab6d615a..21d381296 100644
--- a/core/message_bus/src/installer.rs
+++ b/core/message_bus/src/installer.rs
@@ -26,12 +26,12 @@
 
 use crate::client_listener::RequestHandler;
 use crate::fd_transfer::{self, DupedFd};
-use crate::framing;
 use crate::lifecycle::{InstanceToken, RejectedRegistration, Shutdown};
 use crate::replica_listener::MessageHandler;
 use crate::socket_opts::{apply_keepalive_for_connection, 
apply_nodelay_for_connection};
+use crate::transports::{TcpTransportConn, TransportConn, TransportReader};
 use crate::{IggyMessageBus, lifecycle::ShutdownToken};
-use compio::net::{OwnedReadHalf, TcpStream};
+use compio::net::TcpStream;
 use futures::FutureExt;
 use iggy_binary_protocol::Command2;
 use std::cell::Cell;
@@ -89,25 +89,22 @@ impl ConnectionInstaller for Rc<IggyMessageBus> {
     }
 }
 
-/// Install a pre-wrapped replica TCP stream on the bus. Shared by
-/// `install_replica_fd` and, once shard 0 acts as the delegate target for
-/// itself, the accept path.
-#[allow(clippy::future_not_send, clippy::too_many_lines)]
+/// TCP entry point: apply socket options (keepalive, `TCP_NODELAY`) on
+/// the raw stream and delegate to the transport-generic install path.
+///
+/// Socket options live here (not on the generic path) because they are
+/// TCP-specific; other transports (WSS via TLS terminator, QUIC) lack a
+/// raw-fd level at this layer. Kept as a separate function so the
+/// `install_replica_fd` fd-delegation entry and the accept callbacks
+/// (`AcceptedReplicaFn` in tests and shard-0 coordinator) converge on
+/// one place for those options.
+#[allow(clippy::future_not_send)]
 pub fn install_replica_stream(
     bus: &Rc<IggyMessageBus>,
     peer_id: u8,
     stream: TcpStream,
     on_message: MessageHandler,
 ) {
-    if bus.replicas().contains(peer_id) {
-        debug!(
-            replica = peer_id,
-            "replica already registered on this shard, dropping delegated fd"
-        );
-        drop(stream);
-        return;
-    }
-
     let cfg = bus.config();
     if let Err(e) = apply_keepalive_for_connection(
         &stream,
@@ -123,8 +120,34 @@ pub fn install_replica_stream(
         // A miss means we stay Nagle-on for this peer, not a failure.
         warn!(replica = peer_id, "nodelay failed on delegated fd: {e}");
     }
+    install_replica_conn(bus, peer_id, TcpTransportConn::new(stream), 
on_message);
+}
+
+/// Install a pre-wrapped replica connection on the bus.
+///
+/// Generic over [`TransportConn`] so alternate transports (WS via
+/// shard-0 TLS terminator, QUIC via `compio-quic`) plug in behind the
+/// same registry-insert + instance-token fencing + install-race
+/// handling. TCP-specific socket options live in
+/// [`install_replica_stream`]; transports with no equivalent layer call
+/// this entry directly with their already-configured connection.
+#[allow(clippy::future_not_send, clippy::too_many_lines)]
+pub fn install_replica_conn<C: TransportConn>(
+    bus: &Rc<IggyMessageBus>,
+    peer_id: u8,
+    conn: C,
+    on_message: MessageHandler,
+) {
+    if bus.replicas().contains(peer_id) {
+        debug!(
+            replica = peer_id,
+            "replica already registered on this shard, dropping delegated fd"
+        );
+        drop(conn);
+        return;
+    }
 
-    let (read_half, write_half) = stream.into_split();
+    let (read_half, write_half) = conn.into_split();
     let (tx, rx) = async_channel::bounded(bus.peer_queue_capacity());
 
     // Writer and reader both observe abnormal close and used to fire
@@ -168,7 +191,7 @@ pub fn install_replica_stream(
     let token_for_writer = Rc::clone(&install_token);
     let max_batch = bus.config().max_batch;
     let writer_handle = compio::runtime::spawn(async move {
-        crate::writer_task::run(
+        crate::writer_task::run_transport(
             rx,
             write_half,
             writer_token,
@@ -262,8 +285,10 @@ pub fn install_replica_stream(
     }
 }
 
-/// Install a pre-wrapped client TCP stream on the bus.
-#[allow(clippy::future_not_send, clippy::too_many_lines)]
+/// TCP entry point for client installs. Applies socket options and
+/// delegates to [`install_client_conn`]. See [`install_replica_stream`]
+/// for the plane-symmetric docs.
+#[allow(clippy::future_not_send)]
 pub fn install_client_stream(
     bus: &Rc<IggyMessageBus>,
     client_id: u128,
@@ -288,8 +313,19 @@ pub fn install_client_stream(
             "nodelay failed on delegated client fd: {e}"
         );
     }
+    install_client_conn(bus, client_id, TcpTransportConn::new(stream), 
on_request);
+}
 
-    let (read_half, write_half) = stream.into_split();
+/// Install a pre-wrapped client connection on the bus. Generic over
+/// [`TransportConn`]; plane-symmetric with [`install_replica_conn`].
+#[allow(clippy::future_not_send, clippy::too_many_lines)]
+pub fn install_client_conn<C: TransportConn>(
+    bus: &Rc<IggyMessageBus>,
+    client_id: u128,
+    conn: C,
+    on_request: RequestHandler,
+) {
+    let (read_half, write_half) = conn.into_split();
     let (tx, rx) = async_channel::bounded(bus.peer_queue_capacity());
 
     // If the registry insert below loses a race for `client_id`, the
@@ -313,7 +349,7 @@ pub fn install_client_stream(
     let token_for_writer = Rc::clone(&install_token);
     let max_batch = bus.config().max_batch;
     let writer_handle = compio::runtime::spawn(async move {
-        crate::writer_task::run(
+        crate::writer_task::run_transport(
             rx,
             write_half,
             writer_token,
@@ -445,9 +481,9 @@ async fn drain_rejected_registration(rejected: 
RejectedRegistration, timeout: Du
 /// the insert-race path: it wakes the reader off its `io_uring` read SQE
 /// immediately rather than waiting for peer EOF.
 #[allow(clippy::future_not_send)]
-async fn replica_read_loop(
+async fn replica_read_loop<R: TransportReader>(
     replica_id: u8,
-    mut read_half: OwnedReadHalf<TcpStream>,
+    mut read_half: R,
     on_message: &MessageHandler,
     token: &ShutdownToken,
     conn_token: &ShutdownToken,
@@ -464,7 +500,7 @@ async fn replica_read_loop(
                 debug!(replica = replica_id, "replica read loop aborted by 
per-connection shutdown");
                 return;
             }
-            result = framing::read_message(&mut read_half, 
max_message_size).fuse() => {
+            result = read_half.read_message(max_message_size).fuse() => {
                 match result {
                     Ok(msg) => {
                         if aborted.get() {
@@ -495,9 +531,9 @@ async fn replica_read_loop(
 /// the insert-race path: it wakes the reader off its `io_uring` read SQE
 /// immediately rather than waiting for peer EOF.
 #[allow(clippy::future_not_send)]
-async fn client_read_loop(
+async fn client_read_loop<R: TransportReader>(
     client_id: u128,
-    mut read_half: OwnedReadHalf<TcpStream>,
+    mut read_half: R,
     on_request: &RequestHandler,
     token: &ShutdownToken,
     conn_token: &ShutdownToken,
@@ -514,7 +550,7 @@ async fn client_read_loop(
                 debug!(client = client_id, "client read loop aborted by 
per-connection shutdown");
                 return;
             }
-            result = framing::read_message(&mut read_half, 
max_message_size).fuse() => {
+            result = read_half.read_message(max_message_size).fuse() => {
                 match result {
                     Ok(msg) => {
                         if aborted.get() {

Reply via email to