This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit bd321ebff297107d6eb19dc54ce3590e96461b13
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Apr 27 11:54:32 2026 +0200

    feat(message_bus): add install_client_quic_conn + install_client_ws_stream
    
    The QUIC and WebSocket transports landed at trait-level (`6f99acdad`,
    `963ccf851`) but had no install entry in `installer.rs`. The
    dispatcher branch needed both, which left a documentation gap and a
    load-bearing-claim asymmetry vs the TCP path.
    
    `install_client_quic_conn` wraps a `compio_quic::Connection` plus its
    first-bidi `(SendStream, RecvStream)` pair in a `QuicTransportConn`
    and delegates to the existing generic `install_client_conn`.
    `install_client_ws_stream` wraps a post-upgrade
    `compio_ws::WebSocketStream<TcpStream>` in a `WsTransportConn` and
    delegates the same way.
    
    Neither install path crosses an inter-shard boundary: both
    `compio_quic::Connection` and `compio_ws::WebSocketStream<TcpStream>`
    hold compio `Rc<...>` driver state and are `!Send`. Shard 0 therefore
    terminates QUIC + WS locally and uses the existing
    `ShardFramePayload::ForwardClientSend` / `Consensus` variants for
    outbound + inbound traffic respectively. This matches
    `core/message_bus/CLAUDE.md`'s plane-split rule
    ("TLS/QUIC-terminated connections... shard 0 terminates and forwards
    `Frozen<MESSAGE_ALIGN>` over the inter-shard flume").
    
    The `core/shard/src/lib.rs` `ShardFramePayload` doc-comment is
    amended on `ClientConnectionSetup` to call out the deliberate
    absence of QUIC/WS analogs and point at the rationale, so future
    contributors don't try to reinstate them and trip over the same
    `!Send` wall.
    
    The earlier P4-T2 design doc (silverhand-only) was wrong on
    cross-shard handover for QUIC and is now obsolete on that point;
    the in-tree code + CLAUDE.md are the source of truth.
    
    Tests: no new tests; both install entries are thin delegators over
    the existing generic `install_client_conn`, which is already covered
    by `transports::tcp::tests::send_batch_writes_all_and_drains_vec`,
    `transports::quic::tests::loopback_round_trip_three_frames`, and
    `transports::ws::tests::loopback_round_trip_three_frames`. 135/135
    pass on `cargo nextest run -p message_bus -p server-ng -p shard`.
    Workspace clippy clean. Doc clean modulo one pre-existing
    `coordinator.rs:276` warning inherited from master.
---
 core/message_bus/src/installer.rs | 58 +++++++++++++++++++++++++++++++++++++++
 core/shard/src/lib.rs             |  9 ++++++
 2 files changed, 67 insertions(+)

diff --git a/core/message_bus/src/installer.rs 
b/core/message_bus/src/installer.rs
index 4d274860c..0b52304c7 100644
--- a/core/message_bus/src/installer.rs
+++ b/core/message_bus/src/installer.rs
@@ -29,6 +29,8 @@ use crate::fd_transfer::{self, DupedFd};
 use crate::lifecycle::{InstanceToken, RejectedRegistration, Shutdown};
 use crate::replica_listener::MessageHandler;
 use crate::socket_opts::{apply_keepalive_for_connection, 
apply_nodelay_for_connection};
+use crate::transports::quic::QuicTransportConn;
+use crate::transports::ws::WsTransportConn;
 use crate::transports::{TcpTransportConn, TransportConn, TransportReader};
 use crate::{IggyMessageBus, lifecycle::ShutdownToken};
 use compio::net::TcpStream;
@@ -318,6 +320,62 @@ pub fn install_client_stream(
     install_client_conn(bus, client_id, TcpTransportConn::new(stream), 
on_request);
 }
 
+/// QUIC entry point for client installs.
+///
+/// Wraps the [`compio_quic::Connection`] and its first bidirectional
+/// `(SendStream, RecvStream)` pair (already accepted by shard 0 via
+/// `Connection::accept_bi().await` so the install path never
+/// re-handshakes) in a [`QuicTransportConn`] and delegates to the
+/// existing generic [`install_client_conn`].
+///
+/// No socket-options analog runs here: QUIC keepalive lives in
+/// [`compio_quic::TransportConfig::keep_alive_interval`] set at endpoint
+/// construction time. The connection is encrypted end-to-end and there
+/// is no plaintext fd to dup, which is why this never crosses an
+/// inter-shard channel: shard 0 owns the QUIC `Endpoint`, terminates
+/// every connection locally, and uses the existing
+/// `ForwardClientSend` / `Consensus` shard-frame variants for outbound
+/// + inbound traffic respectively.
+#[allow(clippy::future_not_send)]
+pub fn install_client_quic_conn(
+    bus: &Rc<IggyMessageBus>,
+    client_id: u128,
+    connection: compio_quic::Connection,
+    streams: (compio_quic::SendStream, compio_quic::RecvStream),
+    on_request: RequestHandler,
+) {
+    install_client_conn(
+        bus,
+        client_id,
+        QuicTransportConn::new(connection, streams),
+        on_request,
+    );
+}
+
+/// WebSocket entry point for client installs.
+///
+/// Wraps a post-upgrade [`compio_ws::WebSocketStream`] in a
+/// [`WsTransportConn`] and delegates to the existing generic
+/// [`install_client_conn`]. The HTTP-Upgrade handshake has already been
+/// driven on shard 0; the install path never re-runs it.
+///
+/// Like QUIC, this never crosses an inter-shard channel:
+/// `WebSocketStream<TcpStream>` is `!Send` (it holds compio `Rc<...>`
+/// driver state) so shard 0 terminates locally and uses the existing
+/// `ForwardClientSend` / `Consensus` variants. The post-upgrade socket
+/// IS still a raw TCP fd, but compio-ws has no "rebuild from fd" API
+/// and re-attaching the tungstenite state machine across shards would
+/// lose protocol invariants the dispatcher already enforced.
+#[allow(clippy::future_not_send)]
+pub fn install_client_ws_stream(
+    bus: &Rc<IggyMessageBus>,
+    client_id: u128,
+    stream: compio_ws::WebSocketStream<TcpStream>,
+    on_request: RequestHandler,
+) {
+    install_client_conn(bus, client_id, WsTransportConn::new(stream), 
on_request);
+}
+
 /// Install a pre-wrapped client connection on the bus. Generic over
 /// [`TransportConn`]; plane-symmetric with [`install_replica_conn`].
 #[allow(clippy::future_not_send, clippy::too_many_lines)]
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 579577eee..e420e508c 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -199,6 +199,15 @@ pub enum ShardFramePayload {
     /// owning shard. The receiving shard wraps the fd and installs client
     /// reader / writer tasks locally. The owning shard is encoded in the top
     /// 16 bits of `client_id`.
+    ///
+    /// QUIC + WS clients deliberately do NOT get an analog variant: the
+    /// inter-shard channel requires `Send` payloads, and `compio_quic::
+    /// Connection` / `compio_ws::WebSocketStream<TcpStream>` are both
+    /// `!Send` (they hold compio `Rc<...>` driver state). Shard 0
+    /// therefore terminates QUIC + WS locally and uses the existing
+    /// `ForwardClientSend` / `Consensus` variants for outbound + inbound
+    /// traffic respectively. See `core/message_bus/CLAUDE.md` plane
+    /// split section.
     ClientConnectionSetup { fd: DupedFd, client_id: u128 },
     /// Shard 0 broadcasts the owner for a replica to every shard so each
     /// bus' `send_to_replica` slow path can route through the correct owner.

Reply via email to