This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b15f3966797c9363f044fed71bcce32c3446a925
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Apr 27 13:22:02 2026 +0200

    feat(message_bus): add WS pre-upgrade listener with cross-shard fd-ship
    
    Wire the shard-0 WebSocket client listener so accepted TCP sockets
    ride the existing fd-ship plumbing pre-upgrade. The HTTP-Upgrade
    handshake (and the iggy.consensus.v1 subprotocol enforcement) runs on
    the owning shard inside the new install_client_ws_fd entry on the
    ConnectionInstaller trait, which wraps the dup'd fd and drives
    compio_ws::accept_hdr_async with a tungstenite Callback that returns
    HTTP 400 on missing or wrong subprotocol.
    
    Reverses the 2026-04-27 design correction in core/message_bus/CLAUDE.md
    that claimed WS connections must terminate on shard 0: the !Send
    constraint on compio_ws::WebSocketStream<TcpStream> is post-upgrade,
    not pre-upgrade. The shipped fd is plain TCP at ship-time, so
    invariant I5 (fd-delegation is TCP-only) holds.
    
    What landed:
    
    - core/message_bus/src/client_listener_ws.rs: bind + run for the
      pre-upgrade TCP listener. No upgrade work runs on shard 0; the
      callback (AcceptedWsClientFn) hands the raw stream off.
    - core/message_bus/src/installer.rs: install_client_ws_fd on the
      ConnectionInstaller trait + Rc<IggyMessageBus> impl. Wraps fd,
      spawns a tracked task that runs compio_ws::accept_hdr_async with
      the new ws_subprotocol_callback, then delegates to the existing
      install_client_ws_stream on success.
    - core/message_bus/src/lib.rs: AcceptedWsClientFn type alias.
    - core/shard/src/lib.rs: new ShardFramePayload::ClientWsConnectionSetup
      variant alongside ClientConnectionSetup. Doc on ClientConnectionSetup
      refreshed to drop the obsolete "WS terminates on shard 0" claim.
    - core/shard/src/coordinator.rs: new delegate_ws_client(stream) that
      mirrors delegate_client. Same round-robin target, same mint_client_id,
      same dup_fd; ships ClientWsConnectionSetup instead.
    - core/shard/src/router.rs: match arm for the new variant calls
      bus.install_client_ws_fd(fd, client_id, on_request.clone()).
    - core/message_bus/tests/common/mod.rs: install_ws_clients_locally
      helper for single-shard tests; dups locally and routes through
      install_client_ws_fd, exercising the same accept_hdr_async path.
    - core/message_bus/tests/ws_client_roundtrip.rs: 3 tests.
      handshake_succeeds_and_request_reaches_handler verifies the install
      + Request arrives at the server-side handler via a side-channel
      signal. missing_subprotocol_rejected_at_handshake and
      wrong_subprotocol_rejected_at_handshake verify that the
      ws_subprotocol_callback rejects with HTTP 400.
    
    Known follow-up (out of P5 scope, Phase 7 work): the WS dispatcher in
    transports/ws.rs cannot drive a full client -> server -> client
    request/reply round trip. Phase B's stream.read().await blocks the
    dispatcher and cannot react to outbound queue activity, and compio's
    "no buffer drop with pending I/O" rule prevents canceling the read
    via select! on out_rx.recv(). The unidirectional unit tests in
    transports/ws.rs do not exercise this path; the integration test here
    sends a Request and verifies arrival without round-tripping the Reply.
    
    146/146 tests pass on cargo nextest run -p message_bus -p server-ng
    -p shard. Workspace clippy clean. cargo doc clean modulo the
    pre-existing core/shard/src/coordinator.rs:308 warning inherited from
    master.
---
 core/message_bus/src/client_listener_ws.rs    | 103 ++++++++++++++++
 core/message_bus/src/installer.rs             |  60 +++++++++
 core/message_bus/src/lib.rs                   |   1 +
 core/message_bus/tests/common/mod.rs          |  28 ++++-
 core/message_bus/tests/ws_client_roundtrip.rs | 168 ++++++++++++++++++++++++++
 core/shard/src/coordinator.rs                 |  32 +++++
 core/shard/src/lib.rs                         |  29 +++--
 core/shard/src/router.rs                      |  10 ++
 8 files changed, 421 insertions(+), 10 deletions(-)

diff --git a/core/message_bus/src/client_listener_ws.rs 
b/core/message_bus/src/client_listener_ws.rs
new file mode 100644
index 000000000..50b859c17
--- /dev/null
+++ b/core/message_bus/src/client_listener_ws.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pre-upgrade TCP listener for consensus-protocol WebSocket clients.
+//!
+//! Runs only on shard 0. The accept loop performs no protocol work
+//! beyond `TcpListener::accept`: every accepted stream is handed
+//! verbatim to the supplied callback, which dups the fd and ships a
+//! [`ShardFramePayload::ClientWsConnectionSetup`] frame to the
+//! round-robin-selected target shard. The HTTP-Upgrade handshake (and
+//! the `iggy.consensus.v1` subprotocol enforcement) runs on the owning
+//! shard inside [`crate::installer::install_client_ws_fd`].
+//!
+//! The fd at ship-time is plain TCP; invariant I5 (fd-delegation is
+//! TCP-only) holds. The WS state machine only materialises after the
+//! upgrade, on the owning shard, where it can stay non-`Send`.
+//!
+//! `ShardFramePayload::ClientWsConnectionSetup` is defined in
+//! `core/shard/src/lib.rs`; the rustdoc cannot intra-link across crates
+//! without pulling `shard` in as a doc-only dep.
+
+use crate::AcceptedWsClientFn;
+use crate::lifecycle::ShutdownToken;
+use compio::net::{SocketOpts, TcpListener};
+use futures::FutureExt;
+use iggy_common::IggyError;
+use std::net::SocketAddr;
+use std::rc::Rc;
+use tracing::{debug, error, info};
+
+/// Bind the WS pre-upgrade TCP listener and return the bound address.
+///
+/// Mirrors [`crate::client_listener::bind`] in shape (`TCP_NODELAY` +
+/// `SO_KEEPALIVE` on by default). The receiving shard re-applies socket
+/// options on the dup'd fd via the existing client-install path, so
+/// kernel-level options propagate end-to-end.
+///
+/// # Errors
+///
+/// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
+#[allow(clippy::future_not_send)]
+pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), 
IggyError> {
+    // `SO_REUSEPORT` intentionally not set: only shard 0 binds the WS
+    // listener (per invariant I6). The shard-0 coordinator round-robins
+    // accepts to owning shards via 
`ShardFramePayload::ClientWsConnectionSetup`.
+    let opts = SocketOpts::new().nodelay(true).keepalive(true);
+    let listener = TcpListener::bind_with_options(addr, &opts)
+        .await
+        .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
+    let actual = listener
+        .local_addr()
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
+    Ok((listener, actual))
+}
+
+/// Run the WS pre-upgrade listener accept loop until the shutdown
+/// token fires.
+///
+/// Each accepted [`compio::net::TcpStream`] is handed verbatim to
+/// `on_accepted` (no upgrade attempted here). The callback owns the
+/// stream from that point on.
+#[allow(clippy::future_not_send)]
+pub async fn run(listener: TcpListener, token: ShutdownToken, on_accepted: 
AcceptedWsClientFn) {
+    info!(
+        "Consensus WS client listener (pre-upgrade) accepting on {:?}",
+        listener.local_addr().ok()
+    );
+
+    let on_accepted: Rc<_> = on_accepted;
+    loop {
+        futures::select! {
+            () = token.wait().fuse() => {
+                debug!("Consensus WS client listener shutting down");
+                break;
+            }
+            result = listener.accept().fuse() => {
+                match result {
+                    Ok((stream, peer_addr)) => {
+                        debug!(%peer_addr, "WS client TCP accepted, delegating 
fd pre-upgrade");
+                        on_accepted(stream);
+                    }
+                    Err(e) => {
+                        error!("Consensus WS client listener accept failed: 
{e}");
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/core/message_bus/src/installer.rs 
b/core/message_bus/src/installer.rs
index 0b52304c7..f7fc7b861 100644
--- a/core/message_bus/src/installer.rs
+++ b/core/message_bus/src/installer.rs
@@ -62,6 +62,15 @@ pub trait ConnectionInstaller {
     /// encoded in the top 16 bits of `client_id`.
     fn install_client_fd(&self, fd: DupedFd, client_id: u128, on_request: 
RequestHandler);
 
+    /// Same for an SDK WebSocket client's pre-upgrade TCP fd. The
+    /// receiving shard wraps the fd, runs `compio_ws::accept_hdr_async`
+    /// with the iggy.consensus.v1 subprotocol callback to drive the
+    /// HTTP-Upgrade handshake, then installs WS reader / writer tasks
+    /// via [`install_client_ws_stream`] on success. On handshake
+    /// failure (e.g. wrong / missing subprotocol) the fd is closed by
+    /// dropping the wrapping `TcpStream`.
+    fn install_client_ws_fd(&self, fd: DupedFd, client_id: u128, on_request: 
RequestHandler);
+
     /// Update the replica -> owning shard mapping used by the 
`send_to_replica`
     /// slow path on non-owning shards.
     fn set_shard_mapping(&self, replica: u8, owning_shard: u16);
@@ -82,6 +91,24 @@ impl ConnectionInstaller for Rc<IggyMessageBus> {
         install_client_stream(self, client_id, stream, on_request);
     }
 
+    fn install_client_ws_fd(&self, fd: DupedFd, client_id: u128, on_request: 
RequestHandler) {
+        let stream = fd_transfer::wrap_duped_fd(fd);
+        let bus = Self::clone(self);
+        let handle = compio::runtime::spawn(async move {
+            match compio_ws::accept_hdr_async(stream, 
ws_subprotocol_callback).await {
+                Ok(ws) => {
+                    if !bus.is_shutting_down() {
+                        install_client_ws_stream(&bus, client_id, ws, 
on_request);
+                    }
+                }
+                Err(e) => {
+                    warn!(client_id, "WS upgrade failed: {e}");
+                }
+            }
+        });
+        self.track_background(handle);
+    }
+
     fn set_shard_mapping(&self, replica: u8, owning_shard: u16) {
         IggyMessageBus::set_shard_mapping(self, replica, owning_shard);
     }
@@ -91,6 +118,39 @@ impl ConnectionInstaller for Rc<IggyMessageBus> {
     }
 }
 
+/// HTTP-Upgrade callback for the WS client plane.
+///
+/// Inspects `Sec-WebSocket-Protocol`. Accepts only the exact value
+/// [`crate::transports::ws::WS_SUBPROTOCOL`] (`iggy.consensus.v1`); any
+/// other value (or absence) yields HTTP 400 with a body naming the
+/// expected subprotocol. The accepted value is mirrored back on the
+/// response so the negotiated subprotocol is unambiguous to the
+/// client.
+#[allow(clippy::result_large_err)] // tungstenite-defined Callback signature; 
not on hot path
+fn ws_subprotocol_callback(
+    req: &tungstenite::handshake::server::Request,
+    mut resp: tungstenite::handshake::server::Response,
+) -> Result<tungstenite::handshake::server::Response, 
tungstenite::handshake::server::ErrorResponse>
+{
+    let want = crate::transports::ws::WS_SUBPROTOCOL.as_bytes();
+    let proto = req
+        .headers()
+        .get(tungstenite::http::header::SEC_WEBSOCKET_PROTOCOL);
+    if proto.is_some_and(|hv| hv.as_bytes() == want) {
+        resp.headers_mut().insert(
+            tungstenite::http::header::SEC_WEBSOCKET_PROTOCOL,
+            
tungstenite::http::HeaderValue::from_static(crate::transports::ws::WS_SUBPROTOCOL),
+        );
+        return Ok(resp);
+    }
+    let mut err = tungstenite::http::Response::new(Some(format!(
+        "missing or wrong subprotocol; expected {}",
+        crate::transports::ws::WS_SUBPROTOCOL
+    )));
+    *err.status_mut() = tungstenite::http::StatusCode::BAD_REQUEST;
+    Err(err)
+}
+
 /// TCP entry point: apply socket options (keepalive, `TCP_NODELAY`) on
 /// the raw stream and delegate to the transport-generic install path.
 ///
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 628832cf3..271e6b000 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -82,6 +82,7 @@ pub mod auth_config;
 pub mod cache;
 pub mod client_listener;
 pub mod client_listener_quic;
+pub mod client_listener_ws;
 pub mod config;
 pub mod connector;
 mod error;
diff --git a/core/message_bus/tests/common/mod.rs 
b/core/message_bus/tests/common/mod.rs
index 4f7e88821..8ed91921a 100644
--- a/core/message_bus/tests/common/mod.rs
+++ b/core/message_bus/tests/common/mod.rs
@@ -27,11 +27,13 @@
 #![allow(dead_code)] // each test binary uses a subset
 
 use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE, Message};
+use message_bus::ConnectionInstaller;
 use message_bus::auth::{StaticSharedSecret, TokenSource};
 use message_bus::client_listener::RequestHandler;
 use message_bus::replica_listener::MessageHandler;
 use message_bus::{
-    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, IggyMessageBus, 
installer,
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedWsClientFn, IggyMessageBus,
+    fd_transfer, installer,
 };
 use std::cell::Cell;
 use std::net::SocketAddr;
@@ -120,3 +122,27 @@ pub fn install_quic_clients_locally(
         );
     })
 }
+
+/// Build an [`AcceptedWsClientFn`] that mints a local client id, dups
+/// the accepted fd (mirroring the production cross-shard fd-ship
+/// path), and hands it to [`ConnectionInstaller::install_client_ws_fd`]
+/// on the given bus. Single-shard tests bypass the inter-shard
+/// channel by dup'ing locally; the install path runs `accept_hdr_async`,
+/// the subprotocol callback, and `install_client_ws_stream` exactly as
+/// the production owning-shard router would.
+#[must_use]
+pub fn install_ws_clients_locally(
+    bus: Rc<IggyMessageBus>,
+    on_request: RequestHandler,
+) -> AcceptedWsClientFn {
+    let counter: Rc<Cell<u128>> = Rc::new(Cell::new(1));
+    let shard_id = u128::from(bus.shard_id());
+    Rc::new(move |stream| {
+        let seq = counter.get();
+        counter.set(seq.wrapping_add(1));
+        let client_id = (shard_id << 112) | seq;
+        let fd = fd_transfer::dup_fd(&stream).expect("dup_fd");
+        drop(stream);
+        bus.install_client_ws_fd(fd, client_id, on_request.clone());
+    })
+}
diff --git a/core/message_bus/tests/ws_client_roundtrip.rs 
b/core/message_bus/tests/ws_client_roundtrip.rs
new file mode 100644
index 000000000..d8d8854f4
--- /dev/null
+++ b/core/message_bus/tests/ws_client_roundtrip.rs
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! End-to-end: a real WebSocket client connects to the consensus WS
+//! pre-upgrade listener on shard 0; the listener's callback dups the
+//! TCP fd and (on the same shard, locally) hands it to
+//! `install_client_ws_fd`, which runs `compio_ws::accept_hdr_async`
+//! with the iggy.consensus.v1 subprotocol callback before installing
+//! the WS connection.
+//!
+//! Coverage:
+//!
+//! - **Positive**: client requests `iggy.consensus.v1`, handshake
+//!   succeeds, a Request frame reaches the server-side handler.
+//!   Caveat: a full Request -> Reply round trip would deadlock
+//!   the per-connection WS dispatcher (`transports::ws.rs`); its
+//!   phase-alternating drain-then-read loop blocks Phase B on
+//!   `stream.read().await` and cannot react to outbound queue
+//!   activity, so the server's Reply never reaches the wire while
+//!   the client is parked in Phase B waiting for it. Tracked as a
+//!   Phase 7 follow-up in the silverhand transport-plan; the design
+//!   note in the dispatcher comment overstates its request/reply
+//!   support.
+//! - **Negative (missing)**: client omits the `Sec-WebSocket-Protocol`
+//!   header; server returns HTTP 400; handshake fails on the client.
+//! - **Negative (wrong)**: client requests an unrelated subprotocol;
+//!   server returns HTTP 400.
+
+mod common;
+
+use common::{header_only, install_ws_clients_locally, loopback};
+use compio::net::TcpStream;
+use iggy_binary_protocol::Command2;
+use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
+use iggy_binary_protocol::consensus::iobuf::Frozen;
+use message_bus::IggyMessageBus;
+use message_bus::client_listener::RequestHandler;
+use message_bus::client_listener_ws::{bind, run};
+use message_bus::transports::ws::{WS_SUBPROTOCOL, WsTransportConn};
+use message_bus::transports::{TransportConn, TransportWriter};
+use std::rc::Rc;
+use std::time::Duration;
+
+#[compio::test]
+async fn handshake_succeeds_and_request_reaches_handler() {
+    let bus = Rc::new(IggyMessageBus::new(0));
+
+    // Side-channel signal: handler fires once when it sees the Request.
+    let (tx_seen, rx_seen) = async_channel::bounded::<u128>(1);
+    let on_request: RequestHandler = Rc::new(move |client_id, msg| {
+        assert_eq!(msg.header().command, Command2::Request);
+        let _ = tx_seen.try_send(client_id);
+    });
+
+    let (listener, server_addr) = bind(loopback()).await.expect("bind");
+    let token = bus.token();
+    let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
+    let accept_handle = compio::runtime::spawn(async move {
+        run(listener, token, on_accepted).await;
+    });
+    bus.track_background(accept_handle);
+
+    // Dial as a real WS client with the iggy.consensus.v1 subprotocol.
+    let client_tcp = TcpStream::connect(server_addr).await.unwrap();
+    let url = format!("ws://{server_addr}/");
+    let req = tungstenite::ClientRequestBuilder::new(url.parse().unwrap())
+        .with_sub_protocol(WS_SUBPROTOCOL);
+    let (ws_client, _resp) = compio_ws::client_async(req, client_tcp)
+        .await
+        .expect("ws handshake");
+
+    // Reuse the existing WsTransportConn split for client-side framing.
+    // Drop the client's reader half; we only verify that the server saw
+    // the Request. A full round trip is blocked by the dispatcher
+    // limitation called out in the module-level comment.
+    let conn = WsTransportConn::new(ws_client);
+    let (_reader, mut writer) = conn.into_split();
+
+    let request = header_only(Command2::Request, 42, 0).into_frozen();
+    let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = vec![request];
+    writer.send_batch(&mut batch).await.expect("client send");
+    assert!(batch.is_empty(), "send_batch must drain the Vec");
+
+    // Wait for the handler to observe the Request.
+    let observed = compio::time::timeout(Duration::from_secs(2), 
rx_seen.recv())
+        .await
+        .expect("handler must fire within 2 s")
+        .expect("handler must signal");
+    assert_eq!(
+        observed >> 112,
+        0,
+        "client_id top 16 bits must encode shard 0"
+    );
+
+    bus.shutdown(Duration::from_secs(2)).await;
+}
+
+#[compio::test]
+async fn missing_subprotocol_rejected_at_handshake() {
+    let bus = Rc::new(IggyMessageBus::new(0));
+
+    // Handler should never run (handshake fails before install).
+    let on_request: RequestHandler = Rc::new(|_, _| {
+        panic!("handler must not be called when subprotocol is missing");
+    });
+
+    let (listener, server_addr) = bind(loopback()).await.expect("bind");
+    let token = bus.token();
+    let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
+    let accept_handle = compio::runtime::spawn(async move {
+        run(listener, token, on_accepted).await;
+    });
+    bus.track_background(accept_handle);
+
+    // Dial without the subprotocol header.
+    let client_tcp = TcpStream::connect(server_addr).await.unwrap();
+    let url = format!("ws://{server_addr}/");
+    let result = compio_ws::client_async(url, client_tcp).await;
+    assert!(
+        result.is_err(),
+        "WS handshake should fail when subprotocol is absent"
+    );
+
+    bus.shutdown(Duration::from_secs(1)).await;
+}
+
+#[compio::test]
+async fn wrong_subprotocol_rejected_at_handshake() {
+    let bus = Rc::new(IggyMessageBus::new(0));
+
+    let on_request: RequestHandler = Rc::new(|_, _| {
+        panic!("handler must not be called when subprotocol is wrong");
+    });
+
+    let (listener, server_addr) = bind(loopback()).await.expect("bind");
+    let token = bus.token();
+    let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
+    let accept_handle = compio::runtime::spawn(async move {
+        run(listener, token, on_accepted).await;
+    });
+    bus.track_background(accept_handle);
+
+    let client_tcp = TcpStream::connect(server_addr).await.unwrap();
+    let url = format!("ws://{server_addr}/");
+    let req = tungstenite::ClientRequestBuilder::new(url.parse().unwrap())
+        .with_sub_protocol("definitely.not.iggy");
+    let result = compio_ws::client_async(req, client_tcp).await;
+    assert!(
+        result.is_err(),
+        "WS handshake should fail when subprotocol is wrong"
+    );
+
+    bus.shutdown(Duration::from_secs(1)).await;
+}
diff --git a/core/shard/src/coordinator.rs b/core/shard/src/coordinator.rs
index edd88e07c..abe99b09b 100644
--- a/core/shard/src/coordinator.rs
+++ b/core/shard/src/coordinator.rs
@@ -229,6 +229,38 @@ impl<R: Send + 'static> ShardZeroCoordinator<R> {
         Ok(client_id)
     }
 
+    /// Ship a WebSocket client's pre-upgrade TCP connection to the next
+    /// round-robin target shard.
+    ///
+    /// Identical wire path to [`Self::delegate_client`] but ships
+    /// [`ShardFramePayload::ClientWsConnectionSetup`] so the receiving
+    /// shard runs `compio_ws::accept_hdr_async` (HTTP-Upgrade +
+    /// `iggy.consensus.v1` subprotocol enforcement) before installing
+    /// the connection. The fd at ship-time is plain TCP; the WS state
+    /// machine only materialises post-upgrade on the owning shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error when `dup(2)` fails or the target shard's inbox
+    /// refuses the setup frame.
+    pub fn delegate_ws_client(&self, stream: TcpStream) -> Result<u128, 
SendError> {
+        let target = self.next_client_target();
+        let client_id = self.mint_client_id(target);
+
+        let fd = fd_transfer::dup_fd(&stream).map_err(SendError::DupFailed)?;
+        let setup = ShardFramePayload::ClientWsConnectionSetup { fd, client_id 
};
+        if let Err(e) = self.senders[target as 
usize].try_send(ShardFrame::lifecycle(setup)) {
+            warn!(
+                client_id,
+                target, "delegate_ws_client try_send failed: {e:?}"
+            );
+            return Err(SendError::RoutingFailed(target));
+        }
+
+        drop(stream);
+        Ok(client_id)
+    }
+
     /// Broadcast a `ReplicaMappingClear` to every shard. Used by the
     /// `ConnectionLost` handler before the next `delegate_replica` runs.
     pub fn broadcast_mapping_clear(&self, replica_id: u8) {
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index e420e508c..4eb2a074a 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -199,16 +199,27 @@ pub enum ShardFramePayload {
     /// owning shard. The receiving shard wraps the fd and installs client
     /// reader / writer tasks locally. The owning shard is encoded in the top
     /// 16 bits of `client_id`.
-    ///
-    /// QUIC + WS clients deliberately do NOT get an analog variant: the
-    /// inter-shard channel requires `Send` payloads, and `compio_quic::
-    /// Connection` / `compio_ws::WebSocketStream<TcpStream>` are both
-    /// `!Send` (they hold compio `Rc<...>` driver state). Shard 0
-    /// therefore terminates QUIC + WS locally and uses the existing
-    /// `ForwardClientSend` / `Consensus` variants for outbound + inbound
-    /// traffic respectively. See `core/message_bus/CLAUDE.md` plane
-    /// split section.
     ClientConnectionSetup { fd: DupedFd, client_id: u128 },
+    /// Shard 0 distributes an inbound SDK WebSocket client's pre-upgrade
+    /// TCP connection fd to the owning shard. The HTTP-Upgrade handshake
+    /// has NOT run yet at this point: the fd is plain TCP, the dup is
+    /// safe per invariant I5 (fd-delegation is TCP-only), and
+    /// `compio_ws::WebSocketStream<TcpStream>`'s `!Send` constraint
+    /// (compio `Rc<...>` driver state, post-upgrade) does not apply.
+    /// The receiving shard wraps the fd, runs `compio_ws::accept_hdr_async`
+    /// with the iggy.consensus.v1 subprotocol callback, then installs
+    /// client reader / writer tasks locally via
+    /// `message_bus::installer::install_client_ws_fd`. Owning shard is
+    /// encoded in the top 16 bits of `client_id`.
+    ///
+    /// QUIC clients deliberately do NOT get an analog variant: a
+    /// `compio_quic::Endpoint` binds one UDP socket and demuxes incoming
+    /// packets to per-connection `quinn-proto::Connection` objects by
+    /// Connection ID. Per-connection TLS / packet-number / congestion
+    /// state is non-serialisable and tied to the endpoint's reactor.
+    /// Shard 0 therefore terminates QUIC locally and uses the existing
+    /// `ForwardClientSend` variant for outbound traffic.
+    ClientWsConnectionSetup { fd: DupedFd, client_id: u128 },
     /// Shard 0 broadcasts the owner for a replica to every shard so each
     /// bus' `send_to_replica` slow path can route through the correct owner.
     ReplicaMappingUpdate { replica_id: u8, owning_shard: u16 },
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 95c1be665..6ea24225e 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -317,6 +317,16 @@ where
                 self.bus
                     .install_client_fd(fd, client_id, 
self.on_client_request.clone());
             }
+            crate::ShardFramePayload::ClientWsConnectionSetup { fd, client_id 
} => {
+                tracing::info!(
+                    shard = self.id,
+                    client_id,
+                    raw_fd = fd.as_raw_fd(),
+                    "installing delegated WS client fd (pre-upgrade)"
+                );
+                self.bus
+                    .install_client_ws_fd(fd, client_id, 
self.on_client_request.clone());
+            }
             crate::ShardFramePayload::ReplicaMappingUpdate {
                 replica_id,
                 owning_shard,

Reply via email to