This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit bc77c69a706b37b8d177172844f3b498d07a8333 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Apr 27 12:56:16 2026 +0200 feat(message_bus): add listen-address overlap validator assert_listen_addrs_distinct(replica, client, ws, quic) helper in replica_io.rs catches operator misconfiguration at boot. Pairwise- compare populated TCP-family SocketAddrs; same (ip, port) panics with operator-actionable message naming the conflicting pair. UDP (QUIC) shares port namespace with TCP and is allowed to share a port with any TCP listener (separate kernel namespaces). Backs the upcoming Phase 5 listener wiring: start_on_shard_zero will call this helper before binding QUIC + WS listeners alongside the existing TCP replica + client listeners. --- core/message_bus/src/replica_io.rs | 90 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/core/message_bus/src/replica_io.rs b/core/message_bus/src/replica_io.rs index cc2eca8c1..f0abddecb 100644 --- a/core/message_bus/src/replica_io.rs +++ b/core/message_bus/src/replica_io.rs @@ -48,6 +48,47 @@ pub struct BoundPlanes { pub client: SocketAddr, } +/// Boot-time check: every TCP listener address must occupy a distinct +/// `(ip, port)` slot. +/// +/// TCP listeners (`replica`, `client`, `ws`) all share the TCP port +/// space, so any pair sharing `(ip, port)` is a bind-time conflict. +/// The QUIC listener binds a UDP socket: it occupies a separate port +/// namespace and is allowed to share a port with any TCP listener (a +/// common operator choice for "QUIC on 443 over UDP, HTTPS on 443 over +/// TCP"). UDP-vs-UDP conflicts are not possible today because QUIC is +/// the only UDP listener. +/// +/// # Panics +/// +/// Panics with a message naming the conflicting pair. Boot-time +/// validation; surfaces operator misconfiguration loudly rather than +/// letting one listener silently lose a `EADDRINUSE` race against +/// another. +pub fn assert_listen_addrs_distinct( + replica: SocketAddr, + client: SocketAddr, + ws: Option<SocketAddr>, + _quic: Option<SocketAddr>, +) { + let tcp_slots: [(&str, Option<SocketAddr>); 3] = [ + ("replica", Some(replica)), + ("client", Some(client)), + ("ws", ws), + ]; + for i in 0..tcp_slots.len() { + for j in (i + 1)..tcp_slots.len() { + let (a_name, a) = tcp_slots[i]; + let (b_name, b) = tcp_slots[j]; + if let (Some(a), Some(b)) = (a, b) + && a == b + { + panic!("listener address conflict: {a_name} and {b_name} both bind to {a}",); + } + } + } +} + /// Bind the replica + client listeners on shard 0 and start the outbound /// replica connector. Non-zero shards early-return `Ok(None)`. /// @@ -161,3 +202,52 @@ pub async fn start_on_shard_zero_default( ) .await } + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(port: u16) -> SocketAddr { + format!("127.0.0.1:{port}").parse().unwrap() + } + + #[test] + fn distinct_addrs_pass() { + assert_listen_addrs_distinct(addr(9090), addr(8090), Some(addr(8092)), Some(addr(8080))); + } + + #[test] + fn ws_unset_passes() { + assert_listen_addrs_distinct(addr(9090), addr(8090), None, Some(addr(8080))); + } + + #[test] + fn quic_shares_port_with_tcp_replica_ok() { + // QUIC on UDP and replica on TCP can share a port (separate + // kernel namespaces). Operator choice "443 on TCP and UDP". + assert_listen_addrs_distinct(addr(443), addr(8090), Some(addr(8092)), Some(addr(443))); + } + + #[test] + fn quic_shares_port_with_tcp_client_ok() { + assert_listen_addrs_distinct(addr(9090), addr(443), Some(addr(8092)), Some(addr(443))); + } + + #[test] + #[should_panic(expected = "listener address conflict: replica and client")] + fn replica_client_overlap_panics() { + assert_listen_addrs_distinct(addr(8090), addr(8090), Some(addr(8092)), None); + } + + #[test] + #[should_panic(expected = "listener address conflict: replica and ws")] + fn replica_ws_overlap_panics() { + assert_listen_addrs_distinct(addr(9090), addr(8090), Some(addr(9090)), None); + } + + #[test] + #[should_panic(expected = "listener address conflict: client and ws")] + fn client_ws_overlap_panics() { + assert_listen_addrs_distinct(addr(9090), addr(8090), Some(addr(8090)), None); + } +}
