This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 60366e8fa4df4f3ee7b260d49073dcdd7e0a5bf5 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 13:22:02 2026 +0200 feat(message_bus): add WS pre-upgrade listener with cross-shard fd-ship Wire the shard-0 WebSocket client listener so accepted TCP sockets ride the existing fd-ship plumbing pre-upgrade. The HTTP-Upgrade handshake (and the iggy.consensus.v1 subprotocol enforcement) runs on the owning shard inside the new install_client_ws_fd entry on the ConnectionInstaller trait, which wraps the dup'd fd and drives compio_ws::accept_hdr_async with a tungstenite Callback that returns HTTP 400 on missing or wrong subprotocol. Reverses the 2026-04-27 design correction in core/message_bus/CLAUDE.md that claimed WS connections must terminate on shard 0: the !Send constraint on compio_ws::WebSocketStream<TcpStream> is post-upgrade, not pre-upgrade. The shipped fd is plain TCP at ship-time, so invariant I5 (fd-delegation is TCP-only) holds. What landed: - core/message_bus/src/client_listener_ws.rs: bind + run for the pre-upgrade TCP listener. No upgrade work runs on shard 0; the callback (AcceptedWsClientFn) hands the raw stream off. - core/message_bus/src/installer.rs: install_client_ws_fd on the ConnectionInstaller trait + Rc<IggyMessageBus> impl. Wraps fd, spawns a tracked task that runs compio_ws::accept_hdr_async with the new ws_subprotocol_callback, then delegates to the existing install_client_ws_stream on success. - core/message_bus/src/lib.rs: AcceptedWsClientFn type alias. - core/shard/src/lib.rs: new ShardFramePayload::ClientWsConnectionSetup variant alongside ClientConnectionSetup. Doc on ClientConnectionSetup refreshed to drop the obsolete "WS terminates on shard 0" claim. - core/shard/src/coordinator.rs: new delegate_ws_client(stream) that mirrors delegate_client. Same round-robin target, same mint_client_id, same dup_fd; ships ClientWsConnectionSetup instead. - core/shard/src/router.rs: match arm for the new variant calls bus.install_client_ws_fd(fd, client_id, on_request.clone()). - core/message_bus/tests/common/mod.rs: install_ws_clients_locally helper for single-shard tests; dups locally and routes through install_client_ws_fd, exercising the same accept_hdr_async path. - core/message_bus/tests/ws_client_roundtrip.rs: 3 tests. handshake_succeeds_and_request_reaches_handler verifies the install + Request arrives at the server-side handler via a side-channel signal. missing_subprotocol_rejected_at_handshake and wrong_subprotocol_rejected_at_handshake verify that the ws_subprotocol_callback rejects with HTTP 400. Known follow-up (out of P5 scope, Phase 7 work): the WS dispatcher in transports/ws.rs cannot drive a full client -> server -> client request/reply round trip. Phase B's stream.read().await blocks the dispatcher and cannot react to outbound queue activity, and compio's "no buffer drop with pending I/O" rule prevents canceling the read via select! on out_rx.recv(). The unidirectional unit tests in transports/ws.rs do not exercise this path; the integration test here sends a Request and verifies arrival without round-tripping the Reply. 146/146 tests pass on cargo nextest run -p message_bus -p server-ng -p shard. Workspace clippy clean. cargo doc clean modulo the pre-existing core/shard/src/coordinator.rs:308 warning inherited from master. --- core/message_bus/src/client_listener_ws.rs | 103 ++++++++++++++++ core/message_bus/src/installer.rs | 60 +++++++++ core/message_bus/src/lib.rs | 1 + core/message_bus/tests/common/mod.rs | 28 ++++- core/message_bus/tests/ws_client_roundtrip.rs | 168 ++++++++++++++++++++++++++ core/shard/src/coordinator.rs | 32 +++++ core/shard/src/lib.rs | 29 +++-- core/shard/src/router.rs | 10 ++ 8 files changed, 421 insertions(+), 10 deletions(-) diff --git a/core/message_bus/src/client_listener_ws.rs b/core/message_bus/src/client_listener_ws.rs new file mode 100644 index 000000000..50b859c17 --- /dev/null +++ b/core/message_bus/src/client_listener_ws.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pre-upgrade TCP listener for consensus-protocol WebSocket clients. +//! +//! Runs only on shard 0. The accept loop performs no protocol work +//! beyond `TcpListener::accept`: every accepted stream is handed +//! verbatim to the supplied callback, which dups the fd and ships a +//! [`ShardFramePayload::ClientWsConnectionSetup`] frame to the +//! round-robin-selected target shard. The HTTP-Upgrade handshake (and +//! the `iggy.consensus.v1` subprotocol enforcement) runs on the owning +//! shard inside [`crate::installer::install_client_ws_fd`]. +//! +//! The fd at ship-time is plain TCP; invariant I5 (fd-delegation is +//! TCP-only) holds. The WS state machine only materialises after the +//! upgrade, on the owning shard, where it can stay non-`Send`. +//! +//! `ShardFramePayload::ClientWsConnectionSetup` is defined in +//! `core/shard/src/lib.rs`; the rustdoc cannot intra-link across crates +//! without pulling `shard` in as a doc-only dep. + +use crate::AcceptedWsClientFn; +use crate::lifecycle::ShutdownToken; +use compio::net::{SocketOpts, TcpListener}; +use futures::FutureExt; +use iggy_common::IggyError; +use std::net::SocketAddr; +use std::rc::Rc; +use tracing::{debug, error, info}; + +/// Bind the WS pre-upgrade TCP listener and return the bound address. +/// +/// Mirrors [`crate::client_listener::bind`] in shape (`TCP_NODELAY` + +/// `SO_KEEPALIVE` on by default). The receiving shard re-applies socket +/// options on the dup'd fd via the existing client-install path, so +/// kernel-level options propagate end-to-end. +/// +/// # Errors +/// +/// Returns [`IggyError::CannotBindToSocket`] if the bind fails. +#[allow(clippy::future_not_send)] +pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { + // `SO_REUSEPORT` intentionally not set: only shard 0 binds the WS + // listener (per invariant I6). The shard-0 coordinator round-robins + // accepts to owning shards via `ShardFramePayload::ClientWsConnectionSetup`. + let opts = SocketOpts::new().nodelay(true).keepalive(true); + let listener = TcpListener::bind_with_options(addr, &opts) + .await + .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; + let actual = listener + .local_addr() + .map_err(|e| IggyError::IoError(e.to_string()))?; + Ok((listener, actual)) +} + +/// Run the WS pre-upgrade listener accept loop until the shutdown +/// token fires. +/// +/// Each accepted [`compio::net::TcpStream`] is handed verbatim to +/// `on_accepted` (no upgrade attempted here). The callback owns the +/// stream from that point on. +#[allow(clippy::future_not_send)] +pub async fn run(listener: TcpListener, token: ShutdownToken, on_accepted: AcceptedWsClientFn) { + info!( + "Consensus WS client listener (pre-upgrade) accepting on {:?}", + listener.local_addr().ok() + ); + + let on_accepted: Rc<_> = on_accepted; + loop { + futures::select! { + () = token.wait().fuse() => { + debug!("Consensus WS client listener shutting down"); + break; + } + result = listener.accept().fuse() => { + match result { + Ok((stream, peer_addr)) => { + debug!(%peer_addr, "WS client TCP accepted, delegating fd pre-upgrade"); + on_accepted(stream); + } + Err(e) => { + error!("Consensus WS client listener accept failed: {e}"); + } + } + } + } + } +} diff --git a/core/message_bus/src/installer.rs b/core/message_bus/src/installer.rs index 0b52304c7..f7fc7b861 100644 --- a/core/message_bus/src/installer.rs +++ b/core/message_bus/src/installer.rs @@ -62,6 +62,15 @@ pub trait ConnectionInstaller { /// encoded in the top 16 bits of `client_id`. fn install_client_fd(&self, fd: DupedFd, client_id: u128, on_request: RequestHandler); + /// Same for an SDK WebSocket client's pre-upgrade TCP fd. The + /// receiving shard wraps the fd, runs `compio_ws::accept_hdr_async` + /// with the iggy.consensus.v1 subprotocol callback to drive the + /// HTTP-Upgrade handshake, then installs WS reader / writer tasks + /// via [`install_client_ws_stream`] on success. On handshake + /// failure (e.g. wrong / missing subprotocol) the fd is closed by + /// dropping the wrapping `TcpStream`. + fn install_client_ws_fd(&self, fd: DupedFd, client_id: u128, on_request: RequestHandler); + /// Update the replica -> owning shard mapping used by the `send_to_replica` /// slow path on non-owning shards. fn set_shard_mapping(&self, replica: u8, owning_shard: u16); @@ -82,6 +91,24 @@ impl ConnectionInstaller for Rc<IggyMessageBus> { install_client_stream(self, client_id, stream, on_request); } + fn install_client_ws_fd(&self, fd: DupedFd, client_id: u128, on_request: RequestHandler) { + let stream = fd_transfer::wrap_duped_fd(fd); + let bus = Self::clone(self); + let handle = compio::runtime::spawn(async move { + match compio_ws::accept_hdr_async(stream, ws_subprotocol_callback).await { + Ok(ws) => { + if !bus.is_shutting_down() { + install_client_ws_stream(&bus, client_id, ws, on_request); + } + } + Err(e) => { + warn!(client_id, "WS upgrade failed: {e}"); + } + } + }); + self.track_background(handle); + } + fn set_shard_mapping(&self, replica: u8, owning_shard: u16) { IggyMessageBus::set_shard_mapping(self, replica, owning_shard); } @@ -91,6 +118,39 @@ impl ConnectionInstaller for Rc<IggyMessageBus> { } } +/// HTTP-Upgrade callback for the WS client plane. +/// +/// Inspects `Sec-WebSocket-Protocol`. Accepts only the exact value +/// [`crate::transports::ws::WS_SUBPROTOCOL`] (`iggy.consensus.v1`); any +/// other value (or absence) yields HTTP 400 with a body naming the +/// expected subprotocol. The accepted value is mirrored back on the +/// response so the negotiated subprotocol is unambiguous to the +/// client. +#[allow(clippy::result_large_err)] // tungstenite-defined Callback signature; not on hot path +fn ws_subprotocol_callback( + req: &tungstenite::handshake::server::Request, + mut resp: tungstenite::handshake::server::Response, +) -> Result<tungstenite::handshake::server::Response, tungstenite::handshake::server::ErrorResponse> +{ + let want = crate::transports::ws::WS_SUBPROTOCOL.as_bytes(); + let proto = req + .headers() + .get(tungstenite::http::header::SEC_WEBSOCKET_PROTOCOL); + if proto.is_some_and(|hv| hv.as_bytes() == want) { + resp.headers_mut().insert( + tungstenite::http::header::SEC_WEBSOCKET_PROTOCOL, + tungstenite::http::HeaderValue::from_static(crate::transports::ws::WS_SUBPROTOCOL), + ); + return Ok(resp); + } + let mut err = tungstenite::http::Response::new(Some(format!( + "missing or wrong subprotocol; expected {}", + crate::transports::ws::WS_SUBPROTOCOL + ))); + *err.status_mut() = tungstenite::http::StatusCode::BAD_REQUEST; + Err(err) +} + /// TCP entry point: apply socket options (keepalive, `TCP_NODELAY`) on /// the raw stream and delegate to the transport-generic install path. /// diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index 628832cf3..271e6b000 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -82,6 +82,7 @@ pub mod auth_config; pub mod cache; pub mod client_listener; pub mod client_listener_quic; +pub mod client_listener_ws; pub mod config; pub mod connector; mod error; diff --git a/core/message_bus/tests/common/mod.rs b/core/message_bus/tests/common/mod.rs index 4f7e88821..8ed91921a 100644 --- a/core/message_bus/tests/common/mod.rs +++ b/core/message_bus/tests/common/mod.rs @@ -27,11 +27,13 @@ #![allow(dead_code)] // each test binary uses a subset use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE, Message}; +use message_bus::ConnectionInstaller; use message_bus::auth::{StaticSharedSecret, TokenSource}; use message_bus::client_listener::RequestHandler; use message_bus::replica_listener::MessageHandler; use message_bus::{ - AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, IggyMessageBus, installer, + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedWsClientFn, IggyMessageBus, + fd_transfer, installer, }; use std::cell::Cell; use std::net::SocketAddr; @@ -120,3 +122,27 @@ pub fn install_quic_clients_locally( ); }) } + +/// Build an [`AcceptedWsClientFn`] that mints a local client id, dups +/// the accepted fd (mirroring the production cross-shard fd-ship +/// path), and hands it to [`ConnectionInstaller::install_client_ws_fd`] +/// on the given bus. Single-shard tests bypass the inter-shard +/// channel by dup'ing locally; the install path runs `accept_hdr_async`, +/// the subprotocol callback, and `install_client_ws_stream` exactly as +/// the production owning-shard router would. +#[must_use] +pub fn install_ws_clients_locally( + bus: Rc<IggyMessageBus>, + on_request: RequestHandler, +) -> AcceptedWsClientFn { + let counter: Rc<Cell<u128>> = Rc::new(Cell::new(1)); + let shard_id = u128::from(bus.shard_id()); + Rc::new(move |stream| { + let seq = counter.get(); + counter.set(seq.wrapping_add(1)); + let client_id = (shard_id << 112) | seq; + let fd = fd_transfer::dup_fd(&stream).expect("dup_fd"); + drop(stream); + bus.install_client_ws_fd(fd, client_id, on_request.clone()); + }) +} diff --git a/core/message_bus/tests/ws_client_roundtrip.rs b/core/message_bus/tests/ws_client_roundtrip.rs new file mode 100644 index 000000000..d8d8854f4 --- /dev/null +++ b/core/message_bus/tests/ws_client_roundtrip.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end: a real WebSocket client connects to the consensus WS +//! pre-upgrade listener on shard 0; the listener's callback dups the +//! TCP fd and (on the same shard, locally) hands it to +//! `install_client_ws_fd`, which runs `compio_ws::accept_hdr_async` +//! with the iggy.consensus.v1 subprotocol callback before installing +//! the WS connection. +//! +//! Coverage: +//! +//! - **Positive**: client requests `iggy.consensus.v1`, handshake +//! succeeds, a Request frame reaches the server-side handler. +//! Caveat: a full Request -> Reply round trip would deadlock +//! the per-connection WS dispatcher (`transports::ws.rs`); its +//! phase-alternating drain-then-read loop blocks Phase B on +//! `stream.read().await` and cannot react to outbound queue +//! activity, so the server's Reply never reaches the wire while +//! the client is parked in Phase B waiting for it. Tracked as a +//! Phase 7 follow-up in the silverhand transport-plan; the design +//! note in the dispatcher comment overstates its request/reply +//! support. +//! - **Negative (missing)**: client omits the `Sec-WebSocket-Protocol` +//! header; server returns HTTP 400; handshake fails on the client. +//! - **Negative (wrong)**: client requests an unrelated subprotocol; +//! server returns HTTP 400. + +mod common; + +use common::{header_only, install_ws_clients_locally, loopback}; +use compio::net::TcpStream; +use iggy_binary_protocol::Command2; +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::consensus::iobuf::Frozen; +use message_bus::IggyMessageBus; +use message_bus::client_listener::RequestHandler; +use message_bus::client_listener_ws::{bind, run}; +use message_bus::transports::ws::{WS_SUBPROTOCOL, WsTransportConn}; +use message_bus::transports::{TransportConn, TransportWriter}; +use std::rc::Rc; +use std::time::Duration; + +#[compio::test] +async fn handshake_succeeds_and_request_reaches_handler() { + let bus = Rc::new(IggyMessageBus::new(0)); + + // Side-channel signal: handler fires once when it sees the Request. + let (tx_seen, rx_seen) = async_channel::bounded::<u128>(1); + let on_request: RequestHandler = Rc::new(move |client_id, msg| { + assert_eq!(msg.header().command, Command2::Request); + let _ = tx_seen.try_send(client_id); + }); + + let (listener, server_addr) = bind(loopback()).await.expect("bind"); + let token = bus.token(); + let on_accepted = install_ws_clients_locally(bus.clone(), on_request); + let accept_handle = compio::runtime::spawn(async move { + run(listener, token, on_accepted).await; + }); + bus.track_background(accept_handle); + + // Dial as a real WS client with the iggy.consensus.v1 subprotocol. + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let req = tungstenite::ClientRequestBuilder::new(url.parse().unwrap()) + .with_sub_protocol(WS_SUBPROTOCOL); + let (ws_client, _resp) = compio_ws::client_async(req, client_tcp) + .await + .expect("ws handshake"); + + // Reuse the existing WsTransportConn split for client-side framing. + // Drop the client's reader half; we only verify that the server saw + // the Request. A full round trip is blocked by the dispatcher + // limitation called out in the module-level comment. + let conn = WsTransportConn::new(ws_client); + let (_reader, mut writer) = conn.into_split(); + + let request = header_only(Command2::Request, 42, 0).into_frozen(); + let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = vec![request]; + writer.send_batch(&mut batch).await.expect("client send"); + assert!(batch.is_empty(), "send_batch must drain the Vec"); + + // Wait for the handler to observe the Request. + let observed = compio::time::timeout(Duration::from_secs(2), rx_seen.recv()) + .await + .expect("handler must fire within 2 s") + .expect("handler must signal"); + assert_eq!( + observed >> 112, + 0, + "client_id top 16 bits must encode shard 0" + ); + + bus.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn missing_subprotocol_rejected_at_handshake() { + let bus = Rc::new(IggyMessageBus::new(0)); + + // Handler should never run (handshake fails before install). + let on_request: RequestHandler = Rc::new(|_, _| { + panic!("handler must not be called when subprotocol is missing"); + }); + + let (listener, server_addr) = bind(loopback()).await.expect("bind"); + let token = bus.token(); + let on_accepted = install_ws_clients_locally(bus.clone(), on_request); + let accept_handle = compio::runtime::spawn(async move { + run(listener, token, on_accepted).await; + }); + bus.track_background(accept_handle); + + // Dial without the subprotocol header. + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let result = compio_ws::client_async(url, client_tcp).await; + assert!( + result.is_err(), + "WS handshake should fail when subprotocol is absent" + ); + + bus.shutdown(Duration::from_secs(1)).await; +} + +#[compio::test] +async fn wrong_subprotocol_rejected_at_handshake() { + let bus = Rc::new(IggyMessageBus::new(0)); + + let on_request: RequestHandler = Rc::new(|_, _| { + panic!("handler must not be called when subprotocol is wrong"); + }); + + let (listener, server_addr) = bind(loopback()).await.expect("bind"); + let token = bus.token(); + let on_accepted = install_ws_clients_locally(bus.clone(), on_request); + let accept_handle = compio::runtime::spawn(async move { + run(listener, token, on_accepted).await; + }); + bus.track_background(accept_handle); + + let client_tcp = TcpStream::connect(server_addr).await.unwrap(); + let url = format!("ws://{server_addr}/"); + let req = tungstenite::ClientRequestBuilder::new(url.parse().unwrap()) + .with_sub_protocol("definitely.not.iggy"); + let result = compio_ws::client_async(req, client_tcp).await; + assert!( + result.is_err(), + "WS handshake should fail when subprotocol is wrong" + ); + + bus.shutdown(Duration::from_secs(1)).await; +} diff --git a/core/shard/src/coordinator.rs b/core/shard/src/coordinator.rs index edd88e07c..abe99b09b 100644 --- a/core/shard/src/coordinator.rs +++ b/core/shard/src/coordinator.rs @@ -229,6 +229,38 @@ impl<R: Send + 'static> ShardZeroCoordinator<R> { Ok(client_id) } + /// Ship a WebSocket client's pre-upgrade TCP connection to the next + /// round-robin target shard. + /// + /// Identical wire path to [`Self::delegate_client`] but ships + /// [`ShardFramePayload::ClientWsConnectionSetup`] so the receiving + /// shard runs `compio_ws::accept_hdr_async` (HTTP-Upgrade + + /// `iggy.consensus.v1` subprotocol enforcement) before installing + /// the connection. The fd at ship-time is plain TCP; the WS state + /// machine only materialises post-upgrade on the owning shard. + /// + /// # Errors + /// + /// Returns an error when `dup(2)` fails or the target shard's inbox + /// refuses the setup frame. + pub fn delegate_ws_client(&self, stream: TcpStream) -> Result<u128, SendError> { + let target = self.next_client_target(); + let client_id = self.mint_client_id(target); + + let fd = fd_transfer::dup_fd(&stream).map_err(SendError::DupFailed)?; + let setup = ShardFramePayload::ClientWsConnectionSetup { fd, client_id }; + if let Err(e) = self.senders[target as usize].try_send(ShardFrame::lifecycle(setup)) { + warn!( + client_id, + target, "delegate_ws_client try_send failed: {e:?}" + ); + return Err(SendError::RoutingFailed(target)); + } + + drop(stream); + Ok(client_id) + } + /// Broadcast a `ReplicaMappingClear` to every shard. Used by the /// `ConnectionLost` handler before the next `delegate_replica` runs. pub fn broadcast_mapping_clear(&self, replica_id: u8) { diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index e420e508c..4eb2a074a 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -199,16 +199,27 @@ pub enum ShardFramePayload { /// owning shard. The receiving shard wraps the fd and installs client /// reader / writer tasks locally. The owning shard is encoded in the top /// 16 bits of `client_id`. - /// - /// QUIC + WS clients deliberately do NOT get an analog variant: the - /// inter-shard channel requires `Send` payloads, and `compio_quic:: - /// Connection` / `compio_ws::WebSocketStream<TcpStream>` are both - /// `!Send` (they hold compio `Rc<...>` driver state). Shard 0 - /// therefore terminates QUIC + WS locally and uses the existing - /// `ForwardClientSend` / `Consensus` variants for outbound + inbound - /// traffic respectively. See `core/message_bus/CLAUDE.md` plane - /// split section. ClientConnectionSetup { fd: DupedFd, client_id: u128 }, + /// Shard 0 distributes an inbound SDK WebSocket client's pre-upgrade + /// TCP connection fd to the owning shard. The HTTP-Upgrade handshake + /// has NOT run yet at this point: the fd is plain TCP, the dup is + /// safe per invariant I5 (fd-delegation is TCP-only), and + /// `compio_ws::WebSocketStream<TcpStream>`'s `!Send` constraint + /// (compio `Rc<...>` driver state, post-upgrade) does not apply. + /// The receiving shard wraps the fd, runs `compio_ws::accept_hdr_async` + /// with the iggy.consensus.v1 subprotocol callback, then installs + /// client reader / writer tasks locally via + /// `message_bus::installer::install_client_ws_fd`. Owning shard is + /// encoded in the top 16 bits of `client_id`. + /// + /// QUIC clients deliberately do NOT get an analog variant: a + /// `compio_quic::Endpoint` binds one UDP socket and demuxes incoming + /// packets to per-connection `quinn-proto::Connection` objects by + /// Connection ID. Per-connection TLS / packet-number / congestion + /// state is non-serialisable and tied to the endpoint's reactor. + /// Shard 0 therefore terminates QUIC locally and uses the existing + /// `ForwardClientSend` variant for outbound traffic. + ClientWsConnectionSetup { fd: DupedFd, client_id: u128 }, /// Shard 0 broadcasts the owner for a replica to every shard so each /// bus' `send_to_replica` slow path can route through the correct owner. ReplicaMappingUpdate { replica_id: u8, owning_shard: u16 }, diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs index 95c1be665..6ea24225e 100644 --- a/core/shard/src/router.rs +++ b/core/shard/src/router.rs @@ -317,6 +317,16 @@ where self.bus .install_client_fd(fd, client_id, self.on_client_request.clone()); } + crate::ShardFramePayload::ClientWsConnectionSetup { fd, client_id } => { + tracing::info!( + shard = self.id, + client_id, + raw_fd = fd.as_raw_fd(), + "installing delegated WS client fd (pre-upgrade)" + ); + self.bus + .install_client_ws_fd(fd, client_id, self.on_client_request.clone()); + } crate::ShardFramePayload::ReplicaMappingUpdate { replica_id, owning_shard,
