This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 8d82f1673a79bfdbf7216dab02c3fda5e06bdfd4 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 13:27:03 2026 +0200 feat(message_bus): wire QUIC + WS binds into start_on_shard_zero Extends start_on_shard_zero with five new parameters: ws_listen_addr, quic_listen_addr, quic_credentials (cert chain + key DER newtype), on_accepted_ws_client, on_accepted_quic_client. Each new listener binds only when its parameter trio is fully populated; partially populated trios return IggyError::InvalidConfiguration so operator misconfigurations surface at boot. Pre-bind validation calls assert_listen_addrs_distinct on the populated TCP-family slots. BoundPlanes grows two optional SocketAddr fields (ws, quic) populated from each successful bind. start_on_shard_zero_default mirrors with None defaults so existing TCP-only call sites compile unchanged. Drive-by: assert_listen_addrs_distinct now skips the equality check when port == 0. Two slots at port 0 ("OS-assigned") receive distinct kernel ports at bind time and are not a real conflict; loopback fixtures rely on this. New shard_zero_gating::shard_zero_binds_all_four_planes_when_configured test exercises the full bind path with a self-signed rcgen cert and verifies all four BoundPlanes fields are populated with distinct TCP-family ports. 147/147 tests pass on cargo nextest run -p message_bus -p server-ng -p shard. Workspace clippy clean. --- core/message_bus/src/replica_io.rs | 115 +++++++++++++++++++++++++--- core/message_bus/tests/shard_zero_gating.rs | 88 ++++++++++++++++++++- 2 files changed, 192 insertions(+), 11 deletions(-) diff --git a/core/message_bus/src/replica_io.rs b/core/message_bus/src/replica_io.rs index 2b3b3802a..44fbab75a 100644 --- a/core/message_bus/src/replica_io.rs +++ b/core/message_bus/src/replica_io.rs @@ -35,17 +35,39 @@ use std::rc::Rc; use std::time::Duration; use iggy_common::IggyError; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use crate::auth::TokenSource; use crate::connector::start as start_connector; use crate::replica_listener::{bind as bind_replica_listener, run as run_replica_listener}; -use crate::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, client_listener}; +use crate::transports::quic::server_config_with_cert; +use crate::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedWsClientFn, IggyMessageBus, + client_listener, client_listener_quic, client_listener_ws, +}; + +/// QUIC server credentials passed by the bootstrap layer. +/// +/// The cert chain is the leaf-first sequence rustls expects; the key +/// is the server's private key in DER form. Tests use rcgen to mint a +/// throwaway pair; production callers load real PKI material via +/// `core/server-ng`'s `[quic.certificate]` config section. +pub struct QuicServerCredentials { + pub cert_chain: Vec<CertificateDer<'static>>, + pub key_der: PrivateKeyDer<'static>, +} /// Bound addresses returned to shard 0 after the listeners come up. -#[derive(Debug, Clone, Copy)] +/// +/// `ws` and `quic` are populated only when the corresponding +/// `start_on_shard_zero` parameters are `Some`; an unconfigured plane +/// stays `None`. +#[derive(Debug, Clone)] pub struct BoundPlanes { pub replica: SocketAddr, pub client: SocketAddr, + pub ws: Option<SocketAddr>, + pub quic: Option<SocketAddr>, } /// Boot-time check: every TCP listener address must occupy a distinct @@ -80,8 +102,12 @@ pub fn assert_listen_addrs_distinct( for j in (i + 1)..tcp_slots.len() { let (a_name, a) = tcp_slots[i]; let (b_name, b) = tcp_slots[j]; + // Port 0 means "OS-assigned"; two slots at port 0 receive + // distinct kernel ports at bind time, so they don't + // conflict. Tests rely on this for loopback fixtures. if let (Some(a), Some(b)) = (a, b) && a == b + && a.port() != 0 { panic!("listener address conflict: {a_name} and {b_name} both bind to {a}"); } @@ -89,29 +115,47 @@ pub fn assert_listen_addrs_distinct( } } -/// Bind the replica + client listeners on shard 0 and start the outbound -/// replica connector. Non-zero shards early-return `Ok(None)`. +/// Bind the replica + client listeners on shard 0 and start the +/// outbound replica connector. Optionally bind WS and QUIC client +/// listeners alongside. Non-zero shards early-return `Ok(None)`. +/// +/// Each accepted / dialed connection is handed to the supplied +/// delegate callback. The TCP callbacks (`on_accepted_replica`, +/// `on_accepted_client`) and the WS callback (`on_accepted_ws_client`) +/// are responsible for the dup-fd + inter-shard send. The QUIC +/// callback (`on_accepted_quic_client`) installs locally on shard 0 +/// (QUIC has no cross-shard handover). /// -/// Each accepted / dialed TCP stream is handed to the supplied delegate -/// callback: `on_accepted_replica` for inbound replicas and for freshly -/// dialed higher-id peers; `on_accepted_client` for SDK client accepts. -/// The callbacks are responsible for the dup-fd + inter-shard send. +/// `ws_listen_addr` / `on_accepted_ws_client` are paired: if either is +/// `Some`, both must be. Same for the QUIC trio +/// (`quic_listen_addr` / `quic_credentials` / `on_accepted_quic_client`). +/// +/// # Panics +/// +/// Panics on TCP-family `(ip, port)` overlap among the populated +/// `replica`, `client`, `ws` slots — see +/// [`assert_listen_addrs_distinct`]. /// /// # Errors /// -/// Returns `IggyError::CannotBindToSocket` if either listener bind fails. +/// Returns `IggyError::CannotBindToSocket` if any listener bind fails. #[allow(clippy::future_not_send)] #[allow(clippy::too_many_arguments)] pub async fn start_on_shard_zero( bus: &Rc<IggyMessageBus>, replica_listen_addr: SocketAddr, client_listen_addr: SocketAddr, + ws_listen_addr: Option<SocketAddr>, + quic_listen_addr: Option<SocketAddr>, + quic_credentials: Option<QuicServerCredentials>, cluster_id: u128, self_id: u8, replica_count: u8, peers: Vec<(u8, SocketAddr)>, on_accepted_replica: AcceptedReplicaFn, on_accepted_client: AcceptedClientFn, + on_accepted_ws_client: Option<AcceptedWsClientFn>, + on_accepted_quic_client: Option<AcceptedQuicClientFn>, reconnect_period: Duration, token_source: Rc<dyn TokenSource>, ) -> Result<Option<BoundPlanes>, IggyError> { @@ -119,6 +163,13 @@ pub async fn start_on_shard_zero( return Ok(None); } + assert_listen_addrs_distinct( + replica_listen_addr, + client_listen_addr, + ws_listen_addr, + quic_listen_addr, + ); + let (replica_listener, replica_bound) = bind_replica_listener(replica_listen_addr).await?; let (clients_listener, client_bound) = client_listener::bind(client_listen_addr).await?; @@ -149,6 +200,40 @@ pub async fn start_on_shard_zero( }); bus.track_background(client_handle); + let ws_bound = match (ws_listen_addr, on_accepted_ws_client) { + (Some(addr), Some(on_accepted_ws)) => { + let (ws_listener, ws_bound) = client_listener_ws::bind(addr).await?; + let token_for_ws = bus.token(); + let ws_handle = compio::runtime::spawn(async move { + client_listener_ws::run(ws_listener, token_for_ws, on_accepted_ws).await; + }); + bus.track_background(ws_handle); + Some(ws_bound) + } + (None, None) => None, + _ => { + return Err(IggyError::InvalidConfiguration); + } + }; + + let quic_bound = match (quic_listen_addr, quic_credentials, on_accepted_quic_client) { + (Some(addr), Some(creds), Some(on_accepted_quic)) => { + let server_config = server_config_with_cert(creds.cert_chain, creds.key_der) + .map_err(|e| IggyError::IoError(format!("QUIC server config build failed: {e}")))?; + let (endpoint, quic_bound) = client_listener_quic::bind(addr, server_config).await?; + let token_for_quic = bus.token(); + let quic_handle = compio::runtime::spawn(async move { + client_listener_quic::run(endpoint, token_for_quic, on_accepted_quic).await; + }); + bus.track_background(quic_handle); + Some(quic_bound) + } + (None, None, None) => None, + _ => { + return Err(IggyError::InvalidConfiguration); + } + }; + start_connector( bus, cluster_id, @@ -163,12 +248,19 @@ pub async fn start_on_shard_zero( Ok(Some(BoundPlanes { replica: replica_bound, client: client_bound, + ws: ws_bound, + quic: quic_bound, })) } /// [`start_on_shard_zero`] defaulting `reconnect_period` to the bus's /// [`crate::MessageBusConfig::reconnect_period`]. /// +/// Leaves the WS + QUIC listener slots unconfigured (`None`). +/// Convenience entry for TCP-only deployments and existing tests; +/// prefer the full [`start_on_shard_zero`] in production where WS / +/// QUIC come from `core/server-ng`'s config. +/// /// # Errors /// /// Returns `IggyError::CannotBindToSocket` if either listener bind fails. @@ -191,12 +283,17 @@ pub async fn start_on_shard_zero_default( bus, replica_listen_addr, client_listen_addr, + None, + None, + None, cluster_id, self_id, replica_count, peers, on_accepted_replica, on_accepted_client, + None, + None, reconnect_period, token_source, ) diff --git a/core/message_bus/tests/shard_zero_gating.rs b/core/message_bus/tests/shard_zero_gating.rs index 1b99d0a5b..3a1b1f265 100644 --- a/core/message_bus/tests/shard_zero_gating.rs +++ b/core/message_bus/tests/shard_zero_gating.rs @@ -21,12 +21,16 @@ mod common; -use common::{install_clients_locally, install_replicas_locally, loopback, test_token_source}; +use common::{ + install_clients_locally, install_quic_clients_locally, install_replicas_locally, + install_ws_clients_locally, loopback, test_token_source, +}; use message_bus::IggyMessageBus; use message_bus::client_listener::RequestHandler; use message_bus::connector::DEFAULT_RECONNECT_PERIOD; -use message_bus::replica_io::start_on_shard_zero; +use message_bus::replica_io::{QuicServerCredentials, start_on_shard_zero}; use message_bus::replica_listener::{MessageHandler, bind, run}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; @@ -72,12 +76,17 @@ async fn shard_zero_binds_listener_and_starts_connector() { &bus_zero, loopback(), loopback(), + None, + None, + None, CLUSTER, 0, 2, vec![(1u8, peer_addr)], accepted_replica, accepted_client, + None, + None, DEFAULT_RECONNECT_PERIOD, test_token_source(), ) @@ -124,12 +133,17 @@ async fn non_zero_shard_skips_io() { &bus_one, loopback(), loopback(), + None, + None, + None, CLUSTER, 1, 2, vec![(0u8, dead_peer)], accepted_replica, accepted_client, + None, + None, DEFAULT_RECONNECT_PERIOD, test_token_source(), ) @@ -155,3 +169,73 @@ async fn non_zero_shard_skips_io() { "helper must not register any background tasks on non-zero shards" ); } + +fn self_signed() -> (CertificateDer<'static>, PrivateKeyDer<'static>) { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("rcgen"); + let cert_der = CertificateDer::from(cert.cert); + let key_der: PrivateKeyDer<'static> = + PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der()).into(); + (cert_der, key_der) +} + +#[compio::test] +async fn shard_zero_binds_all_four_planes_when_configured() { + // Idempotent across same-process retries. + let _ = rustls::crypto::ring::default_provider().install_default(); + + let bus_zero = Rc::new(IggyMessageBus::new(0)); + let on_message: MessageHandler = Rc::new(|_, _| {}); + let on_request: RequestHandler = Rc::new(|_, _| {}); + let accepted_replica = install_replicas_locally(bus_zero.clone(), on_message.clone()); + let accepted_client = install_clients_locally(bus_zero.clone(), on_request.clone()); + let accepted_ws = install_ws_clients_locally(bus_zero.clone(), on_request.clone()); + let accepted_quic = install_quic_clients_locally(bus_zero.clone(), on_request); + + let (cert, key) = self_signed(); + let creds = QuicServerCredentials { + cert_chain: vec![cert], + key_der: key, + }; + + let bound = start_on_shard_zero( + &bus_zero, + loopback(), + loopback(), + Some(loopback()), + Some(loopback()), + Some(creds), + CLUSTER, + 0, + 1, + vec![], + accepted_replica, + accepted_client, + Some(accepted_ws), + Some(accepted_quic), + DEFAULT_RECONNECT_PERIOD, + test_token_source(), + ) + .await + .expect("start_on_shard_zero must succeed"); + + let bound = bound.expect("shard 0 must return bound listeners"); + assert_ne!(bound.replica.port(), 0, "replica must bind a port"); + assert_ne!(bound.client.port(), 0, "client must bind a port"); + let ws = bound.ws.expect("ws plane must be bound"); + let quic = bound.quic.expect("quic plane must be bound"); + assert_ne!(ws.port(), 0, "ws must bind a port"); + assert_ne!(quic.port(), 0, "quic must bind a port"); + + // Ports must be pairwise distinct on the TCP-family slots. + let tcp_ports = [bound.replica.port(), bound.client.port(), ws.port()]; + for i in 0..tcp_ports.len() { + for j in (i + 1)..tcp_ports.len() { + assert_ne!( + tcp_ports[i], tcp_ports[j], + "TCP-family listeners must use distinct ports", + ); + } + } + + bus_zero.shutdown(Duration::from_secs(2)).await; +}
