This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 5d62d128d6b447b056b290d125d57558eae1462b Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 13:03:16 2026 +0200 feat(message_bus): add shard-0 QUIC client listener client_listener_quic::{bind, run} drives the QUIC accept loop on shard 0. bind constructs a compio_quic::Endpoint from a ServerConfig (built by the bootstrap caller via transports::quic::server_config_with_cert with operator-provided cert chain + key DER). run drives wait_incoming -> Incoming::accept -> Connecting -> Connection::accept_bi to completion (reusing the existing QuicTransportListener trait impl which encodes the 0-RTT-refuse rule), then invokes AcceptedQuicClientFn(connection, streams). QUIC stays shard-0 terminal: a compio_quic::Endpoint binds one UDP socket and demuxes by Connection ID, and per-conn TLS / packet-number / congestion state in quinn-proto::Connection is non-serialisable. Cross-shard handover is fundamentally out of reach. New AcceptedQuicClientFn type alias on lib.rs (mirrors AcceptedClientFn shape but carries the (Connection, (SendStream, RecvStream)) tuple). New into_parts accessor on QuicTransportConn deconstructs back into the raw tuple for the install path. Tests: new tests/quic_client_roundtrip.rs exercises a real rcgen-cert loopback Request -> Reply round trip via bus.send_to_client. New tests/common::install_quic_clients_locally helper mirrors the existing install_clients_locally for unit-test minting + install. 143/143 tests pass on cargo nextest run -p message_bus -p server-ng -p shard. Workspace clippy clean. Drive-by: replica_io.rs panic! trailing comma removed (clippy lint on the P5-T1 commit landed after the pre-commit fmt rewrap). --- core/message_bus/src/client_listener_quic.rs | 112 +++++++++++++++++++++ core/message_bus/src/lib.rs | 28 ++++++ core/message_bus/src/replica_io.rs | 2 +- core/message_bus/src/transports/quic.rs | 15 +++ core/message_bus/tests/common/mod.rs | 28 +++++- core/message_bus/tests/quic_client_roundtrip.rs | 124 ++++++++++++++++++++++++ 6 files changed, 307 insertions(+), 2 deletions(-) diff --git a/core/message_bus/src/client_listener_quic.rs b/core/message_bus/src/client_listener_quic.rs new file mode 100644 index 000000000..16079885a --- /dev/null +++ b/core/message_bus/src/client_listener_quic.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! QUIC listener for consensus-protocol SDK clients. +//! +//! Runs only on shard 0. The accept loop drives the QUIC handshake to +//! completion AND accepts the first bidirectional stream pair before +//! invoking the supplied callback, so the callback receives a +//! ready-for-traffic [`compio_quic::Connection`] plus its +//! `(SendStream, RecvStream)` pair. Subprotocol enforcement runs at +//! the rustls ALPN layer (set to `iggy.consensus.v1` in +//! [`crate::transports::quic::server_config_with_cert`]); no +//! application-layer code runs for clients with the wrong ALPN. +//! +//! 0-RTT data is refused at accept time +//! ([`crate::transports::quic::server_config_with_cert`] sets +//! `max_early_data_size = 0` at the rustls layer; the accept path +//! double-checks `RecvStream::is_0rtt()` for defense in depth and +//! closes with `QUIC_PROTOCOL_VIOLATION` on mismatch). +//! +//! QUIC stays shard-0 terminal: a `compio_quic::Endpoint` binds a +//! single UDP socket and demuxes incoming packets to in-flight +//! connections by Connection ID, and per-connection TLS / packet-number +//! / congestion state lives in `quinn-proto::Connection`, which is +//! non-serialisable. Cross-shard handover is fundamentally out of +//! reach for QUIC; the callback always installs locally on shard 0. + +use crate::lifecycle::ShutdownToken; +use crate::transports::quic::QuicTransportListener; +use crate::{AcceptedQuicClientFn, transports::TransportListener}; +use compio_quic::{Endpoint, ServerConfig}; +use futures::FutureExt; +use iggy_common::IggyError; +use std::net::SocketAddr; +use tracing::{debug, error, info}; + +/// Bind a QUIC [`Endpoint`] without starting the accept loop. +/// +/// `server_config` is built by the bootstrap caller via +/// [`crate::transports::quic::server_config_with_cert`] from the +/// operator-provided cert chain + key DER. +/// +/// # Errors +/// +/// Returns [`IggyError::CannotBindToSocket`] if the bind fails. +#[allow(clippy::future_not_send)] +pub async fn bind( + addr: SocketAddr, + server_config: ServerConfig, +) -> Result<(Endpoint, SocketAddr), IggyError> { + let endpoint = Endpoint::server(addr, server_config) + .await + .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; + let actual = endpoint + .local_addr() + .map_err(|e| IggyError::IoError(e.to_string()))?; + Ok((endpoint, actual)) +} + +/// Run the QUIC listener accept loop until the shutdown token fires. +/// +/// Each accepted connection is handed to `on_accepted` along with its +/// first bidirectional stream pair. The callback owns the connection +/// from that point on; it mints a `client_id` and calls +/// [`crate::installer::install_client_quic_conn`]. +#[allow(clippy::future_not_send)] +pub async fn run(endpoint: Endpoint, token: ShutdownToken, on_accepted: AcceptedQuicClientFn) { + info!( + "Consensus QUIC client listener accepting on {:?}", + endpoint.local_addr().ok() + ); + + let listener = QuicTransportListener::new(endpoint); + loop { + futures::select! { + () = token.wait().fuse() => { + debug!("Consensus QUIC client listener shutting down"); + break; + } + result = listener.accept().fuse() => { + match result { + Ok((conn, peer_addr)) => { + debug!(%peer_addr, "QUIC client accepted, handing to installer"); + let (connection, streams) = conn.into_parts(); + on_accepted(connection, streams); + } + Err(e) => { + // Endpoint-fatal (e.g. endpoint closed). The + // shutdown token will normally fire just before + // this; surface the error and exit. + error!("Consensus QUIC client listener accept failed: {e}"); + break; + } + } + } + } + } +} diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index 811102e51..628832cf3 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -81,6 +81,7 @@ pub mod auth; pub mod auth_config; pub mod cache; pub mod client_listener; +pub mod client_listener_quic; pub mod config; pub mod connector; mod error; @@ -157,6 +158,33 @@ pub type AcceptedReplicaFn = std::rc::Rc<dyn Fn(compio::net::TcpStream, u8)>; /// assigning the client id as part of its delegation policy. pub type AcceptedClientFn = std::rc::Rc<dyn Fn(compio::net::TcpStream)>; +/// Callback invoked on every accepted SDK QUIC client connection. +/// +/// Fires after shard 0's QUIC listener drives the handshake to +/// completion AND accepts the first bidirectional stream pair, so the +/// callback receives a ready-for-traffic [`compio_quic::Connection`] +/// plus its `(SendStream, RecvStream)` pair. The callback mints a +/// client id, then calls [`installer::install_client_quic_conn`] on +/// the local bus. +/// +/// QUIC stays shard-0 terminal (per `INVARIANTS.md` I6 and the +/// non-serialisable nature of `quinn-proto::Connection` state); no +/// cross-shard handover analog exists for this plane. +pub type AcceptedQuicClientFn = std::rc::Rc< + dyn Fn(compio_quic::Connection, (compio_quic::SendStream, compio_quic::RecvStream)), +>; + +/// Callback invoked on every accepted SDK WebSocket client connection. +/// +/// Fires after shard 0's WS listener accepts a raw TCP socket. The +/// HTTP-Upgrade handshake has NOT run yet; the callback hands the +/// raw stream off to the owning shard via inter-shard fd-shipping +/// (`ShardFramePayload::ClientWsConnectionSetup`). The owning shard +/// runs the upgrade and the subprotocol check locally. This shape +/// preserves invariant I5 (fd-delegation is TCP-only) because the +/// shipped fd is plain TCP at ship-time. +pub type AcceptedWsClientFn = std::rc::Rc<dyn Fn(compio::net::TcpStream)>; + /// Notifier fired when a delegated replica connection dies. /// /// The delegated replica connection's writer / reader tasks invoke this on diff --git a/core/message_bus/src/replica_io.rs b/core/message_bus/src/replica_io.rs index f0abddecb..2b3b3802a 100644 --- a/core/message_bus/src/replica_io.rs +++ b/core/message_bus/src/replica_io.rs @@ -83,7 +83,7 @@ pub fn assert_listen_addrs_distinct( if let (Some(a), Some(b)) = (a, b) && a == b { - panic!("listener address conflict: {a_name} and {b_name} both bind to {a}",); + panic!("listener address conflict: {a_name} and {b_name} both bind to {a}"); } } } diff --git a/core/message_bus/src/transports/quic.rs b/core/message_bus/src/transports/quic.rs index a407b3954..b96b16408 100644 --- a/core/message_bus/src/transports/quic.rs +++ b/core/message_bus/src/transports/quic.rs @@ -238,6 +238,21 @@ impl QuicTransportConn { streams, } } + + /// Deconstruct into the raw `(Connection, (SendStream, RecvStream))` + /// tuple, mirror of [`Self::new`]. + /// + /// `client_listener_quic::run` uses this to hand the + /// already-accepted connection + first bidi pair to + /// [`crate::installer::install_client_quic_conn`] (which then wraps + /// them in a fresh `QuicTransportConn` and dispatches via the + /// generic install path). [`TransportConn::into_split`] cannot be + /// used at that boundary because the install path needs the raw + /// tuple, not the split halves. + #[must_use] + pub fn into_parts(self) -> (Connection, (SendStream, RecvStream)) { + (self.connection, self.streams) + } } impl TransportConn for QuicTransportConn { diff --git a/core/message_bus/tests/common/mod.rs b/core/message_bus/tests/common/mod.rs index 81cdf1fa5..4f7e88821 100644 --- a/core/message_bus/tests/common/mod.rs +++ b/core/message_bus/tests/common/mod.rs @@ -30,7 +30,9 @@ use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE, Message}; use message_bus::auth::{StaticSharedSecret, TokenSource}; use message_bus::client_listener::RequestHandler; use message_bus::replica_listener::MessageHandler; -use message_bus::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, installer}; +use message_bus::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, IggyMessageBus, installer, +}; use std::cell::Cell; use std::net::SocketAddr; use std::rc::Rc; @@ -94,3 +96,27 @@ pub fn install_clients_locally( installer::install_client_stream(&bus, client_id, stream, on_request.clone()); }) } + +/// Build an [`AcceptedQuicClientFn`] that mints a local client id and +/// installs the QUIC connection directly on the given bus. Mirror of +/// [`install_clients_locally`] for the QUIC plane. +#[must_use] +pub fn install_quic_clients_locally( + bus: Rc<IggyMessageBus>, + on_request: RequestHandler, +) -> AcceptedQuicClientFn { + let counter: Rc<Cell<u128>> = Rc::new(Cell::new(1)); + let shard_id = u128::from(bus.shard_id()); + Rc::new(move |connection, streams| { + let seq = counter.get(); + counter.set(seq.wrapping_add(1)); + let client_id = (shard_id << 112) | seq; + installer::install_client_quic_conn( + &bus, + client_id, + connection, + streams, + on_request.clone(), + ); + }) +} diff --git a/core/message_bus/tests/quic_client_roundtrip.rs b/core/message_bus/tests/quic_client_roundtrip.rs new file mode 100644 index 000000000..c3d96723a --- /dev/null +++ b/core/message_bus/tests/quic_client_roundtrip.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end: a real QUIC client connects to the consensus QUIC client +//! listener, sends a Request, the handler echoes a Reply back via +//! `bus.send_to_client`, the client reads the Reply. + +mod common; + +use common::{header_only, install_quic_clients_locally, loopback}; +use compio_quic::{ClientBuilder, Endpoint}; +use iggy_binary_protocol::Command2; +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::consensus::iobuf::Frozen; +use message_bus::client_listener::RequestHandler; +use message_bus::client_listener_quic::{bind, run}; +use message_bus::framing; +use message_bus::transports::quic::{QuicTransportConn, server_config_with_cert}; +use message_bus::transports::{TransportConn, TransportReader, TransportWriter}; +use message_bus::{IggyMessageBus, MessageBus}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use std::rc::Rc; +use std::time::Duration; + +fn install_crypto_provider() { + // Idempotent across same-process retries. + let _ = rustls::crypto::ring::default_provider().install_default(); +} + +fn self_signed() -> (CertificateDer<'static>, PrivateKeyDer<'static>) { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("rcgen"); + let cert_der = CertificateDer::from(cert.cert); + let key_der: PrivateKeyDer<'static> = + PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der()).into(); + (cert_der, key_der) +} + +#[allow(clippy::future_not_send)] +async fn client_endpoint(server_cert: CertificateDer<'static>) -> Endpoint { + let mut builder = ClientBuilder::new_with_empty_roots() + .with_custom_certificate(server_cert) + .expect("trust cert") + .with_no_crls(); + builder = builder.with_alpn_protocols(&["iggy.consensus.v1"]); + builder.bind("127.0.0.1:0").await.expect("client bind") +} + +#[compio::test] +async fn request_reply_round_trip() { + install_crypto_provider(); + + let bus = Rc::new(IggyMessageBus::new(7)); + + let bus_for_handler = bus.clone(); + let on_request: RequestHandler = Rc::new(move |client_id, msg| { + assert_eq!(msg.header().command, Command2::Request); + let bus = bus_for_handler.clone(); + compio::runtime::spawn(async move { + let reply = header_only(Command2::Reply, 42, 0); + bus.send_to_client(client_id, reply.into_frozen()) + .await + .expect("send_to_client should succeed"); + }) + .detach(); + }); + + let (cert, key) = self_signed(); + let server_cfg = server_config_with_cert(vec![cert.clone()], key).expect("server config"); + let (endpoint, server_addr) = bind(loopback(), server_cfg).await.expect("bind"); + + let token = bus.token(); + let on_accepted = install_quic_clients_locally(bus.clone(), on_request); + let accept_handle = compio::runtime::spawn(async move { + run(endpoint, token, on_accepted).await; + }); + bus.track_background(accept_handle); + + // Dial as a real QUIC client. + let client = client_endpoint(cert).await; + let connecting = client + .connect(server_addr, "localhost", None) + .expect("connect"); + let connection = connecting.await.expect("client handshake"); + let (send, recv) = connection.open_bi_wait().await.expect("open_bi"); + + // Reuse the existing QuicTransportConn split for client-side framing. + let conn = QuicTransportConn::new(connection, (send, recv)); + let (mut reader, mut writer) = conn.into_split(); + + // Send a Request as a single-frame batch via the writer's atomic + // send_batch (mirrors the production hot path). + let request = header_only(Command2::Request, 42, 0).into_frozen(); + let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = vec![request]; + writer.send_batch(&mut batch).await.expect("client send"); + assert!(batch.is_empty(), "send_batch must drain the Vec"); + + // Read the Reply on the RecvStream. + let reply = reader + .read_message(framing::MAX_MESSAGE_SIZE) + .await + .expect("client read"); + assert_eq!(reply.header().command, Command2::Reply); + assert_eq!(reply.header().cluster, 42); + + let outcome = bus.shutdown(Duration::from_secs(2)).await; + assert_eq!( + outcome.force, 0, + "graceful shutdown should not force-cancel" + ); +}
