This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 5d62d128d6b447b056b290d125d57558eae1462b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Apr 27 13:03:16 2026 +0200

    feat(message_bus): add shard-0 QUIC client listener
    
    client_listener_quic::{bind, run} drives the QUIC accept loop on
    shard 0. bind constructs a compio_quic::Endpoint from a ServerConfig
    (built by the bootstrap caller via transports::quic::server_config_with_cert
    with operator-provided cert chain + key DER). run drives
    wait_incoming -> Incoming::accept -> Connecting -> Connection::accept_bi
    to completion (reusing the existing QuicTransportListener trait impl
    which encodes the 0-RTT-refuse rule), then invokes
    AcceptedQuicClientFn(connection, streams).
    
    QUIC stays shard-0 terminal: a compio_quic::Endpoint binds one UDP
    socket and demuxes by Connection ID, and per-conn TLS / packet-number
    / congestion state in quinn-proto::Connection is non-serialisable.
    Cross-shard handover is fundamentally out of reach.
    
    New AcceptedQuicClientFn type alias on lib.rs (mirrors
    AcceptedClientFn shape but carries the (Connection, (SendStream,
    RecvStream)) tuple). New into_parts accessor on QuicTransportConn
    deconstructs back into the raw tuple for the install path. Tests:
    new tests/quic_client_roundtrip.rs exercises a real rcgen-cert
    loopback Request -> Reply round trip via bus.send_to_client. New
    tests/common::install_quic_clients_locally helper mirrors the
    existing install_clients_locally for unit-test minting + install.
    143/143 tests pass on cargo nextest run -p message_bus -p server-ng
    -p shard. Workspace clippy clean.
    
    Drive-by: replica_io.rs panic! trailing comma removed (clippy lint
    on the P5-T1 commit landed after the pre-commit fmt rewrap).
---
 core/message_bus/src/client_listener_quic.rs    | 112 +++++++++++++++++++++
 core/message_bus/src/lib.rs                     |  28 ++++++
 core/message_bus/src/replica_io.rs              |   2 +-
 core/message_bus/src/transports/quic.rs         |  15 +++
 core/message_bus/tests/common/mod.rs            |  28 +++++-
 core/message_bus/tests/quic_client_roundtrip.rs | 124 ++++++++++++++++++++++++
 6 files changed, 307 insertions(+), 2 deletions(-)

diff --git a/core/message_bus/src/client_listener_quic.rs 
b/core/message_bus/src/client_listener_quic.rs
new file mode 100644
index 000000000..16079885a
--- /dev/null
+++ b/core/message_bus/src/client_listener_quic.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! QUIC listener for consensus-protocol SDK clients.
+//!
+//! Runs only on shard 0. The accept loop drives the QUIC handshake to
+//! completion AND accepts the first bidirectional stream pair before
+//! invoking the supplied callback, so the callback receives a
+//! ready-for-traffic [`compio_quic::Connection`] plus its
+//! `(SendStream, RecvStream)` pair. Subprotocol enforcement runs at
+//! the rustls ALPN layer (set to `iggy.consensus.v1` in
+//! [`crate::transports::quic::server_config_with_cert`]); no
+//! application-layer code runs for clients with the wrong ALPN.
+//!
+//! 0-RTT data is refused at accept time
+//! ([`crate::transports::quic::server_config_with_cert`] sets
+//! `max_early_data_size = 0` at the rustls layer; the accept path
+//! double-checks `RecvStream::is_0rtt()` for defense in depth and
+//! closes with `QUIC_PROTOCOL_VIOLATION` on mismatch).
+//!
+//! QUIC stays shard-0 terminal: a `compio_quic::Endpoint` binds a
+//! single UDP socket and demuxes incoming packets to in-flight
+//! connections by Connection ID, and per-connection TLS / packet-number
+//! / congestion state lives in `quinn-proto::Connection`, which is
+//! non-serialisable. Cross-shard handover is fundamentally out of
+//! reach for QUIC; the callback always installs locally on shard 0.
+
+use crate::lifecycle::ShutdownToken;
+use crate::transports::quic::QuicTransportListener;
+use crate::{AcceptedQuicClientFn, transports::TransportListener};
+use compio_quic::{Endpoint, ServerConfig};
+use futures::FutureExt;
+use iggy_common::IggyError;
+use std::net::SocketAddr;
+use tracing::{debug, error, info};
+
+/// Bind a QUIC [`Endpoint`] without starting the accept loop.
+///
+/// `server_config` is built by the bootstrap caller via
+/// [`crate::transports::quic::server_config_with_cert`] from the
+/// operator-provided cert chain + key DER.
+///
+/// # Errors
+///
+/// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
+#[allow(clippy::future_not_send)]
+pub async fn bind(
+    addr: SocketAddr,
+    server_config: ServerConfig,
+) -> Result<(Endpoint, SocketAddr), IggyError> {
+    let endpoint = Endpoint::server(addr, server_config)
+        .await
+        .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
+    let actual = endpoint
+        .local_addr()
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
+    Ok((endpoint, actual))
+}
+
+/// Run the QUIC listener accept loop until the shutdown token fires.
+///
+/// Each accepted connection is handed to `on_accepted` along with its
+/// first bidirectional stream pair. The callback owns the connection
+/// from that point on; it mints a `client_id` and calls
+/// [`crate::installer::install_client_quic_conn`].
+#[allow(clippy::future_not_send)]
+pub async fn run(endpoint: Endpoint, token: ShutdownToken, on_accepted: 
AcceptedQuicClientFn) {
+    info!(
+        "Consensus QUIC client listener accepting on {:?}",
+        endpoint.local_addr().ok()
+    );
+
+    let listener = QuicTransportListener::new(endpoint);
+    loop {
+        futures::select! {
+            () = token.wait().fuse() => {
+                debug!("Consensus QUIC client listener shutting down");
+                break;
+            }
+            result = listener.accept().fuse() => {
+                match result {
+                    Ok((conn, peer_addr)) => {
+                        debug!(%peer_addr, "QUIC client accepted, handing to 
installer");
+                        let (connection, streams) = conn.into_parts();
+                        on_accepted(connection, streams);
+                    }
+                    Err(e) => {
+                        // Endpoint-fatal (e.g. endpoint closed). The
+                        // shutdown token will normally fire just before
+                        // this; surface the error and exit.
+                        error!("Consensus QUIC client listener accept failed: 
{e}");
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 811102e51..628832cf3 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -81,6 +81,7 @@ pub mod auth;
 pub mod auth_config;
 pub mod cache;
 pub mod client_listener;
+pub mod client_listener_quic;
 pub mod config;
 pub mod connector;
 mod error;
@@ -157,6 +158,33 @@ pub type AcceptedReplicaFn = std::rc::Rc<dyn 
Fn(compio::net::TcpStream, u8)>;
 /// assigning the client id as part of its delegation policy.
 pub type AcceptedClientFn = std::rc::Rc<dyn Fn(compio::net::TcpStream)>;
 
+/// Callback invoked on every accepted SDK QUIC client connection.
+///
+/// Fires after shard 0's QUIC listener drives the handshake to
+/// completion AND accepts the first bidirectional stream pair, so the
+/// callback receives a ready-for-traffic [`compio_quic::Connection`]
+/// plus its `(SendStream, RecvStream)` pair. The callback mints a
+/// client id, then calls [`installer::install_client_quic_conn`] on
+/// the local bus.
+///
+/// QUIC stays shard-0 terminal (per `INVARIANTS.md` I6 and the
+/// non-serialisable nature of `quinn-proto::Connection` state); no
+/// cross-shard handover analog exists for this plane.
+pub type AcceptedQuicClientFn = std::rc::Rc<
+    dyn Fn(compio_quic::Connection, (compio_quic::SendStream, 
compio_quic::RecvStream)),
+>;
+
+/// Callback invoked on every accepted SDK WebSocket client connection.
+///
+/// Fires after shard 0's WS listener accepts a raw TCP socket. The
+/// HTTP-Upgrade handshake has NOT run yet; the callback hands the
+/// raw stream off to the owning shard via inter-shard fd-shipping
+/// (`ShardFramePayload::ClientWsConnectionSetup`). The owning shard
+/// runs the upgrade and the subprotocol check locally. This shape
+/// preserves invariant I5 (fd-delegation is TCP-only) because the
+/// shipped fd is plain TCP at ship-time.
+pub type AcceptedWsClientFn = std::rc::Rc<dyn Fn(compio::net::TcpStream)>;
+
 /// Notifier fired when a delegated replica connection dies.
 ///
 /// The delegated replica connection's writer / reader tasks invoke this on
diff --git a/core/message_bus/src/replica_io.rs 
b/core/message_bus/src/replica_io.rs
index f0abddecb..2b3b3802a 100644
--- a/core/message_bus/src/replica_io.rs
+++ b/core/message_bus/src/replica_io.rs
@@ -83,7 +83,7 @@ pub fn assert_listen_addrs_distinct(
             if let (Some(a), Some(b)) = (a, b)
                 && a == b
             {
-                panic!("listener address conflict: {a_name} and {b_name} both 
bind to {a}",);
+                panic!("listener address conflict: {a_name} and {b_name} both 
bind to {a}");
             }
         }
     }
diff --git a/core/message_bus/src/transports/quic.rs 
b/core/message_bus/src/transports/quic.rs
index a407b3954..b96b16408 100644
--- a/core/message_bus/src/transports/quic.rs
+++ b/core/message_bus/src/transports/quic.rs
@@ -238,6 +238,21 @@ impl QuicTransportConn {
             streams,
         }
     }
+
+    /// Deconstruct into the raw `(Connection, (SendStream, RecvStream))`
+    /// tuple, mirror of [`Self::new`].
+    ///
+    /// `client_listener_quic::run` uses this to hand the
+    /// already-accepted connection + first bidi pair to
+    /// [`crate::installer::install_client_quic_conn`] (which then wraps
+    /// them in a fresh `QuicTransportConn` and dispatches via the
+    /// generic install path). [`TransportConn::into_split`] cannot be
+    /// used at that boundary because the install path needs the raw
+    /// tuple, not the split halves.
+    #[must_use]
+    pub fn into_parts(self) -> (Connection, (SendStream, RecvStream)) {
+        (self.connection, self.streams)
+    }
 }
 
 impl TransportConn for QuicTransportConn {
diff --git a/core/message_bus/tests/common/mod.rs 
b/core/message_bus/tests/common/mod.rs
index 81cdf1fa5..4f7e88821 100644
--- a/core/message_bus/tests/common/mod.rs
+++ b/core/message_bus/tests/common/mod.rs
@@ -30,7 +30,9 @@ use iggy_binary_protocol::{Command2, GenericHeader, 
HEADER_SIZE, Message};
 use message_bus::auth::{StaticSharedSecret, TokenSource};
 use message_bus::client_listener::RequestHandler;
 use message_bus::replica_listener::MessageHandler;
-use message_bus::{AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, 
installer};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, IggyMessageBus, 
installer,
+};
 use std::cell::Cell;
 use std::net::SocketAddr;
 use std::rc::Rc;
@@ -94,3 +96,27 @@ pub fn install_clients_locally(
         installer::install_client_stream(&bus, client_id, stream, 
on_request.clone());
     })
 }
+
+/// Build an [`AcceptedQuicClientFn`] that mints a local client id and
+/// installs the QUIC connection directly on the given bus. Mirror of
+/// [`install_clients_locally`] for the QUIC plane.
+#[must_use]
+pub fn install_quic_clients_locally(
+    bus: Rc<IggyMessageBus>,
+    on_request: RequestHandler,
+) -> AcceptedQuicClientFn {
+    let counter: Rc<Cell<u128>> = Rc::new(Cell::new(1));
+    let shard_id = u128::from(bus.shard_id());
+    Rc::new(move |connection, streams| {
+        let seq = counter.get();
+        counter.set(seq.wrapping_add(1));
+        let client_id = (shard_id << 112) | seq;
+        installer::install_client_quic_conn(
+            &bus,
+            client_id,
+            connection,
+            streams,
+            on_request.clone(),
+        );
+    })
+}
diff --git a/core/message_bus/tests/quic_client_roundtrip.rs 
b/core/message_bus/tests/quic_client_roundtrip.rs
new file mode 100644
index 000000000..c3d96723a
--- /dev/null
+++ b/core/message_bus/tests/quic_client_roundtrip.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! End-to-end: a real QUIC client connects to the consensus QUIC client
+//! listener, sends a Request, the handler echoes a Reply back via
+//! `bus.send_to_client`, the client reads the Reply.
+
+mod common;
+
+use common::{header_only, install_quic_clients_locally, loopback};
+use compio_quic::{ClientBuilder, Endpoint};
+use iggy_binary_protocol::Command2;
+use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
+use iggy_binary_protocol::consensus::iobuf::Frozen;
+use message_bus::client_listener::RequestHandler;
+use message_bus::client_listener_quic::{bind, run};
+use message_bus::framing;
+use message_bus::transports::quic::{QuicTransportConn, 
server_config_with_cert};
+use message_bus::transports::{TransportConn, TransportReader, TransportWriter};
+use message_bus::{IggyMessageBus, MessageBus};
+use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
+use std::rc::Rc;
+use std::time::Duration;
+
+fn install_crypto_provider() {
+    // Idempotent across same-process retries.
+    let _ = rustls::crypto::ring::default_provider().install_default();
+}
+
+fn self_signed() -> (CertificateDer<'static>, PrivateKeyDer<'static>) {
+    let cert = 
rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("rcgen");
+    let cert_der = CertificateDer::from(cert.cert);
+    let key_der: PrivateKeyDer<'static> =
+        PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der()).into();
+    (cert_der, key_der)
+}
+
+#[allow(clippy::future_not_send)]
+async fn client_endpoint(server_cert: CertificateDer<'static>) -> Endpoint {
+    let mut builder = ClientBuilder::new_with_empty_roots()
+        .with_custom_certificate(server_cert)
+        .expect("trust cert")
+        .with_no_crls();
+    builder = builder.with_alpn_protocols(&["iggy.consensus.v1"]);
+    builder.bind("127.0.0.1:0").await.expect("client bind")
+}
+
+#[compio::test]
+async fn request_reply_round_trip() {
+    install_crypto_provider();
+
+    let bus = Rc::new(IggyMessageBus::new(7));
+
+    let bus_for_handler = bus.clone();
+    let on_request: RequestHandler = Rc::new(move |client_id, msg| {
+        assert_eq!(msg.header().command, Command2::Request);
+        let bus = bus_for_handler.clone();
+        compio::runtime::spawn(async move {
+            let reply = header_only(Command2::Reply, 42, 0);
+            bus.send_to_client(client_id, reply.into_frozen())
+                .await
+                .expect("send_to_client should succeed");
+        })
+        .detach();
+    });
+
+    let (cert, key) = self_signed();
+    let server_cfg = server_config_with_cert(vec![cert.clone()], 
key).expect("server config");
+    let (endpoint, server_addr) = bind(loopback(), 
server_cfg).await.expect("bind");
+
+    let token = bus.token();
+    let on_accepted = install_quic_clients_locally(bus.clone(), on_request);
+    let accept_handle = compio::runtime::spawn(async move {
+        run(endpoint, token, on_accepted).await;
+    });
+    bus.track_background(accept_handle);
+
+    // Dial as a real QUIC client.
+    let client = client_endpoint(cert).await;
+    let connecting = client
+        .connect(server_addr, "localhost", None)
+        .expect("connect");
+    let connection = connecting.await.expect("client handshake");
+    let (send, recv) = connection.open_bi_wait().await.expect("open_bi");
+
+    // Reuse the existing QuicTransportConn split for client-side framing.
+    let conn = QuicTransportConn::new(connection, (send, recv));
+    let (mut reader, mut writer) = conn.into_split();
+
+    // Send a Request as a single-frame batch via the writer's atomic
+    // send_batch (mirrors the production hot path).
+    let request = header_only(Command2::Request, 42, 0).into_frozen();
+    let mut batch: Vec<Frozen<MESSAGE_ALIGN>> = vec![request];
+    writer.send_batch(&mut batch).await.expect("client send");
+    assert!(batch.is_empty(), "send_batch must drain the Vec");
+
+    // Read the Reply on the RecvStream.
+    let reply = reader
+        .read_message(framing::MAX_MESSAGE_SIZE)
+        .await
+        .expect("client read");
+    assert_eq!(reply.header().command, Command2::Reply);
+    assert_eq!(reply.header().cluster, 42);
+
+    let outcome = bus.shutdown(Duration::from_secs(2)).await;
+    assert_eq!(
+        outcome.force, 0,
+        "graceful shutdown should not force-cancel"
+    );
+}

Reply via email to