This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit a89de206070796e3a90b16085609d5950a0f035c
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Apr 27 13:27:03 2026 +0200

    feat(message_bus): wire QUIC + WS binds into start_on_shard_zero
    
    Extends start_on_shard_zero with five new parameters: ws_listen_addr,
    quic_listen_addr, quic_credentials (cert chain + key DER newtype),
    on_accepted_ws_client, on_accepted_quic_client. Each new listener
    binds only when its parameter trio is fully populated; partially
    populated trios return IggyError::InvalidConfiguration so operator
    misconfigurations surface at boot. Pre-bind validation calls
    assert_listen_addrs_distinct on the populated TCP-family slots.
    
    BoundPlanes grows two optional SocketAddr fields (ws, quic) populated
    from each successful bind. start_on_shard_zero_default mirrors with
    None defaults so existing TCP-only call sites compile unchanged.
    
    Drive-by: assert_listen_addrs_distinct now skips the equality check
    when port == 0. Two slots at port 0 ("OS-assigned") receive distinct
    kernel ports at bind time and are not a real conflict; loopback
    fixtures rely on this.
    
    New shard_zero_gating::shard_zero_binds_all_four_planes_when_configured
    test exercises the full bind path with a self-signed rcgen cert and
    verifies all four BoundPlanes fields are populated with distinct
    TCP-family ports. 147/147 tests pass on cargo nextest run -p
    message_bus -p server-ng -p shard. Workspace clippy clean.
---
 core/message_bus/src/replica_io.rs          | 115 +++++++++++++++++++++++++---
 core/message_bus/tests/shard_zero_gating.rs |  88 ++++++++++++++++++++-
 2 files changed, 192 insertions(+), 11 deletions(-)

diff --git a/core/message_bus/src/replica_io.rs 
b/core/message_bus/src/replica_io.rs
index 2b3b3802a..44fbab75a 100644
--- a/core/message_bus/src/replica_io.rs
+++ b/core/message_bus/src/replica_io.rs
@@ -35,17 +35,39 @@ use std::rc::Rc;
 use std::time::Duration;
 
 use iggy_common::IggyError;
+use rustls::pki_types::{CertificateDer, PrivateKeyDer};
 
 use crate::auth::TokenSource;
 use crate::connector::start as start_connector;
 use crate::replica_listener::{bind as bind_replica_listener, run as 
run_replica_listener};
-use crate::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, 
client_listener};
+use crate::transports::quic::server_config_with_cert;
+use crate::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedWsClientFn, IggyMessageBus,
+    client_listener, client_listener_quic, client_listener_ws,
+};
+
+/// QUIC server credentials passed by the bootstrap layer.
+///
+/// The cert chain is the leaf-first sequence rustls expects; the key
+/// is the server's private key in DER form. Tests use rcgen to mint a
+/// throwaway pair; production callers load real PKI material via
+/// `core/server-ng`'s `[quic.certificate]` config section.
+pub struct QuicServerCredentials {
+    pub cert_chain: Vec<CertificateDer<'static>>,
+    pub key_der: PrivateKeyDer<'static>,
+}
 
 /// Bound addresses returned to shard 0 after the listeners come up.
-#[derive(Debug, Clone, Copy)]
+///
+/// `ws` and `quic` are populated only when the corresponding
+/// `start_on_shard_zero` parameters are `Some`; an unconfigured plane
+/// stays `None`.
+#[derive(Debug, Clone)]
 pub struct BoundPlanes {
     pub replica: SocketAddr,
     pub client: SocketAddr,
+    pub ws: Option<SocketAddr>,
+    pub quic: Option<SocketAddr>,
 }
 
 /// Boot-time check: every TCP listener address must occupy a distinct
@@ -80,8 +102,12 @@ pub fn assert_listen_addrs_distinct(
         for j in (i + 1)..tcp_slots.len() {
             let (a_name, a) = tcp_slots[i];
             let (b_name, b) = tcp_slots[j];
+            // Port 0 means "OS-assigned"; two slots at port 0 receive
+            // distinct kernel ports at bind time, so they don't
+            // conflict. Tests rely on this for loopback fixtures.
             if let (Some(a), Some(b)) = (a, b)
                 && a == b
+                && a.port() != 0
             {
                 panic!("listener address conflict: {a_name} and {b_name} both 
bind to {a}");
             }
@@ -89,29 +115,47 @@ pub fn assert_listen_addrs_distinct(
     }
 }
 
-/// Bind the replica + client listeners on shard 0 and start the outbound
-/// replica connector. Non-zero shards early-return `Ok(None)`.
+/// Bind the replica + client listeners on shard 0 and start the
+/// outbound replica connector. Optionally bind WS and QUIC client
+/// listeners alongside. Non-zero shards early-return `Ok(None)`.
+///
+/// Each accepted / dialed connection is handed to the supplied
+/// delegate callback. The TCP callbacks (`on_accepted_replica`,
+/// `on_accepted_client`) and the WS callback (`on_accepted_ws_client`)
+/// are responsible for the dup-fd + inter-shard send. The QUIC
+/// callback (`on_accepted_quic_client`) installs locally on shard 0
+/// (QUIC has no cross-shard handover).
 ///
-/// Each accepted / dialed TCP stream is handed to the supplied delegate
-/// callback: `on_accepted_replica` for inbound replicas and for freshly
-/// dialed higher-id peers; `on_accepted_client` for SDK client accepts.
-/// The callbacks are responsible for the dup-fd + inter-shard send.
+/// `ws_listen_addr` / `on_accepted_ws_client` are paired: if either is
+/// `Some`, both must be. Same for the QUIC trio
+/// (`quic_listen_addr` / `quic_credentials` / `on_accepted_quic_client`).
+///
+/// # Panics
+///
+/// Panics on TCP-family `(ip, port)` overlap among the populated
+/// `replica`, `client`, `ws` slots — see
+/// [`assert_listen_addrs_distinct`].
 ///
 /// # Errors
 ///
-/// Returns `IggyError::CannotBindToSocket` if either listener bind fails.
+/// Returns `IggyError::CannotBindToSocket` if any listener bind fails.
 #[allow(clippy::future_not_send)]
 #[allow(clippy::too_many_arguments)]
 pub async fn start_on_shard_zero(
     bus: &Rc<IggyMessageBus>,
     replica_listen_addr: SocketAddr,
     client_listen_addr: SocketAddr,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    quic_credentials: Option<QuicServerCredentials>,
     cluster_id: u128,
     self_id: u8,
     replica_count: u8,
     peers: Vec<(u8, SocketAddr)>,
     on_accepted_replica: AcceptedReplicaFn,
     on_accepted_client: AcceptedClientFn,
+    on_accepted_ws_client: Option<AcceptedWsClientFn>,
+    on_accepted_quic_client: Option<AcceptedQuicClientFn>,
     reconnect_period: Duration,
     token_source: Rc<dyn TokenSource>,
 ) -> Result<Option<BoundPlanes>, IggyError> {
@@ -119,6 +163,13 @@ pub async fn start_on_shard_zero(
         return Ok(None);
     }
 
+    assert_listen_addrs_distinct(
+        replica_listen_addr,
+        client_listen_addr,
+        ws_listen_addr,
+        quic_listen_addr,
+    );
+
     let (replica_listener, replica_bound) = 
bind_replica_listener(replica_listen_addr).await?;
     let (clients_listener, client_bound) = 
client_listener::bind(client_listen_addr).await?;
 
@@ -149,6 +200,40 @@ pub async fn start_on_shard_zero(
     });
     bus.track_background(client_handle);
 
+    let ws_bound = match (ws_listen_addr, on_accepted_ws_client) {
+        (Some(addr), Some(on_accepted_ws)) => {
+            let (ws_listener, ws_bound) = 
client_listener_ws::bind(addr).await?;
+            let token_for_ws = bus.token();
+            let ws_handle = compio::runtime::spawn(async move {
+                client_listener_ws::run(ws_listener, token_for_ws, 
on_accepted_ws).await;
+            });
+            bus.track_background(ws_handle);
+            Some(ws_bound)
+        }
+        (None, None) => None,
+        _ => {
+            return Err(IggyError::InvalidConfiguration);
+        }
+    };
+
+    let quic_bound = match (quic_listen_addr, quic_credentials, 
on_accepted_quic_client) {
+        (Some(addr), Some(creds), Some(on_accepted_quic)) => {
+            let server_config = server_config_with_cert(creds.cert_chain, 
creds.key_der)
+                .map_err(|e| IggyError::IoError(format!("QUIC server config 
build failed: {e}")))?;
+            let (endpoint, quic_bound) = client_listener_quic::bind(addr, 
server_config).await?;
+            let token_for_quic = bus.token();
+            let quic_handle = compio::runtime::spawn(async move {
+                client_listener_quic::run(endpoint, token_for_quic, 
on_accepted_quic).await;
+            });
+            bus.track_background(quic_handle);
+            Some(quic_bound)
+        }
+        (None, None, None) => None,
+        _ => {
+            return Err(IggyError::InvalidConfiguration);
+        }
+    };
+
     start_connector(
         bus,
         cluster_id,
@@ -163,12 +248,19 @@ pub async fn start_on_shard_zero(
     Ok(Some(BoundPlanes {
         replica: replica_bound,
         client: client_bound,
+        ws: ws_bound,
+        quic: quic_bound,
     }))
 }
 
 /// [`start_on_shard_zero`] defaulting `reconnect_period` to the bus's
 /// [`crate::MessageBusConfig::reconnect_period`].
 ///
+/// Leaves the WS + QUIC listener slots unconfigured (`None`).
+/// Convenience entry for TCP-only deployments and existing tests;
+/// prefer the full [`start_on_shard_zero`] in production where WS /
+/// QUIC come from `core/server-ng`'s config.
+///
 /// # Errors
 ///
 /// Returns `IggyError::CannotBindToSocket` if either listener bind fails.
@@ -191,12 +283,17 @@ pub async fn start_on_shard_zero_default(
         bus,
         replica_listen_addr,
         client_listen_addr,
+        None,
+        None,
+        None,
         cluster_id,
         self_id,
         replica_count,
         peers,
         on_accepted_replica,
         on_accepted_client,
+        None,
+        None,
         reconnect_period,
         token_source,
     )
diff --git a/core/message_bus/tests/shard_zero_gating.rs 
b/core/message_bus/tests/shard_zero_gating.rs
index 1b99d0a5b..3a1b1f265 100644
--- a/core/message_bus/tests/shard_zero_gating.rs
+++ b/core/message_bus/tests/shard_zero_gating.rs
@@ -21,12 +21,16 @@
 
 mod common;
 
-use common::{install_clients_locally, install_replicas_locally, loopback, 
test_token_source};
+use common::{
+    install_clients_locally, install_quic_clients_locally, 
install_replicas_locally,
+    install_ws_clients_locally, loopback, test_token_source,
+};
 use message_bus::IggyMessageBus;
 use message_bus::client_listener::RequestHandler;
 use message_bus::connector::DEFAULT_RECONNECT_PERIOD;
-use message_bus::replica_io::start_on_shard_zero;
+use message_bus::replica_io::{QuicServerCredentials, start_on_shard_zero};
 use message_bus::replica_listener::{MessageHandler, bind, run};
+use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
 use std::net::SocketAddr;
 use std::rc::Rc;
 use std::time::Duration;
@@ -72,12 +76,17 @@ async fn shard_zero_binds_listener_and_starts_connector() {
         &bus_zero,
         loopback(),
         loopback(),
+        None,
+        None,
+        None,
         CLUSTER,
         0,
         2,
         vec![(1u8, peer_addr)],
         accepted_replica,
         accepted_client,
+        None,
+        None,
         DEFAULT_RECONNECT_PERIOD,
         test_token_source(),
     )
@@ -124,12 +133,17 @@ async fn non_zero_shard_skips_io() {
         &bus_one,
         loopback(),
         loopback(),
+        None,
+        None,
+        None,
         CLUSTER,
         1,
         2,
         vec![(0u8, dead_peer)],
         accepted_replica,
         accepted_client,
+        None,
+        None,
         DEFAULT_RECONNECT_PERIOD,
         test_token_source(),
     )
@@ -155,3 +169,73 @@ async fn non_zero_shard_skips_io() {
         "helper must not register any background tasks on non-zero shards"
     );
 }
+
+fn self_signed() -> (CertificateDer<'static>, PrivateKeyDer<'static>) {
+    let cert = 
rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("rcgen");
+    let cert_der = CertificateDer::from(cert.cert);
+    let key_der: PrivateKeyDer<'static> =
+        PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der()).into();
+    (cert_der, key_der)
+}
+
+#[compio::test]
+async fn shard_zero_binds_all_four_planes_when_configured() {
+    // Idempotent across same-process retries.
+    let _ = rustls::crypto::ring::default_provider().install_default();
+
+    let bus_zero = Rc::new(IggyMessageBus::new(0));
+    let on_message: MessageHandler = Rc::new(|_, _| {});
+    let on_request: RequestHandler = Rc::new(|_, _| {});
+    let accepted_replica = install_replicas_locally(bus_zero.clone(), 
on_message.clone());
+    let accepted_client = install_clients_locally(bus_zero.clone(), 
on_request.clone());
+    let accepted_ws = install_ws_clients_locally(bus_zero.clone(), 
on_request.clone());
+    let accepted_quic = install_quic_clients_locally(bus_zero.clone(), 
on_request);
+
+    let (cert, key) = self_signed();
+    let creds = QuicServerCredentials {
+        cert_chain: vec![cert],
+        key_der: key,
+    };
+
+    let bound = start_on_shard_zero(
+        &bus_zero,
+        loopback(),
+        loopback(),
+        Some(loopback()),
+        Some(loopback()),
+        Some(creds),
+        CLUSTER,
+        0,
+        1,
+        vec![],
+        accepted_replica,
+        accepted_client,
+        Some(accepted_ws),
+        Some(accepted_quic),
+        DEFAULT_RECONNECT_PERIOD,
+        test_token_source(),
+    )
+    .await
+    .expect("start_on_shard_zero must succeed");
+
+    let bound = bound.expect("shard 0 must return bound listeners");
+    assert_ne!(bound.replica.port(), 0, "replica must bind a port");
+    assert_ne!(bound.client.port(), 0, "client must bind a port");
+    let ws = bound.ws.expect("ws plane must be bound");
+    let quic = bound.quic.expect("quic plane must be bound");
+    assert_ne!(ws.port(), 0, "ws must bind a port");
+    assert_ne!(quic.port(), 0, "quic must bind a port");
+
+    // Ports must be pairwise distinct on the TCP-family slots.
+    let tcp_ports = [bound.replica.port(), bound.client.port(), ws.port()];
+    for i in 0..tcp_ports.len() {
+        for j in (i + 1)..tcp_ports.len() {
+            assert_ne!(
+                tcp_ports[i], tcp_ports[j],
+                "TCP-family listeners must use distinct ports",
+            );
+        }
+    }
+
+    bus_zero.shutdown(Duration::from_secs(2)).await;
+}

Reply via email to