This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch integration_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6c339b10893883e9e097d55822c34621971feaa2 Author: numinex <[email protected]> AuthorDate: Thu May 14 08:54:42 2026 +0200 test --- .../traits/binary_impls/personal_access_tokens.rs | 17 +++--- core/common/src/traits/binary_impls/users.rs | 8 +-- core/harness_derive/src/attrs.rs | 3 +- core/message_bus/src/client_listener/quic.rs | 30 +++++++++- core/message_bus/src/client_listener/tcp.rs | 10 +--- core/message_bus/src/client_listener/tcp_tls.rs | 7 +-- core/message_bus/src/client_listener/ws.rs | 10 +--- core/message_bus/src/client_listener/wss.rs | 7 +-- core/message_bus/src/replica/listener.rs | 10 +--- core/message_bus/src/socket_opts.rs | 69 +++++++++++++++++++++- core/sdk/src/tcp/tcp_client.rs | 4 +- core/sdk/src/vsr.rs | 17 +++--- core/sdk/src/websocket/websocket_client.rs | 7 ++- 13 files changed, 137 insertions(+), 62 deletions(-) diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs b/core/common/src/traits/binary_impls/personal_access_tokens.rs index 4c740d79d..b7a46d84f 100644 --- a/core/common/src/traits/binary_impls/personal_access_tokens.rs +++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs @@ -25,27 +25,28 @@ use crate::{ }; use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; -use iggy_binary_protocol::codes::{ - CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE, - GET_PERSONAL_ACCESS_TOKENS_CODE, -}; #[cfg(feature = "vsr")] use iggy_binary_protocol::codes::LOGIN_REGISTER_WITH_PAT_CODE; #[cfg(not(feature = "vsr"))] use iggy_binary_protocol::codes::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE; -use iggy_binary_protocol::requests::personal_access_tokens::{ - CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, GetPersonalAccessTokensRequest, +use iggy_binary_protocol::codes::{ + CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE, + GET_PERSONAL_ACCESS_TOKENS_CODE, }; #[cfg(not(feature = "vsr"))] use iggy_binary_protocol::requests::personal_access_tokens::LoginWithPersonalAccessTokenRequest; +use iggy_binary_protocol::requests::personal_access_tokens::{ + CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, + GetPersonalAccessTokensRequest, +}; #[cfg(feature = "vsr")] use iggy_binary_protocol::requests::users::LoginRegisterWithPatRequest; use iggy_binary_protocol::responses::personal_access_tokens::create_personal_access_token::RawPersonalAccessTokenResponse; use iggy_binary_protocol::responses::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokensResponse; -#[cfg(not(feature = "vsr"))] -use iggy_binary_protocol::responses::users::login_user::IdentityResponse; #[cfg(feature = "vsr")] use iggy_binary_protocol::responses::users::LoginRegisterResponse; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::responses::users::login_user::IdentityResponse; #[cfg(feature = "vsr")] use secrecy::SecretString; diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index 183d942ca..06ec4a7bc 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -24,20 +24,20 @@ use crate::{ }; use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE; use iggy_binary_protocol::codes::{ CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, GET_USERS_CODE, UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE, }; -#[cfg(feature = "vsr")] -use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE; #[cfg(not(feature = "vsr"))] use iggy_binary_protocol::codes::{LOGIN_USER_CODE, LOGOUT_USER_CODE}; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::requests::users::LoginRegisterRequest; use iggy_binary_protocol::requests::users::{ ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, GetUserRequest, GetUsersRequest, UpdatePermissionsRequest, UpdateUserRequest, }; -#[cfg(feature = "vsr")] -use iggy_binary_protocol::requests::users::LoginRegisterRequest; #[cfg(not(feature = "vsr"))] use iggy_binary_protocol::requests::users::{LoginUserRequest, LogoutUserRequest}; #[cfg(feature = "vsr")] diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs index 9c1018f31..d1e811025 100644 --- a/core/harness_derive/src/attrs.rs +++ b/core/harness_derive/src/attrs.rs @@ -637,8 +637,7 @@ mod tests { #[test] fn parse_server_executable_path() { - let attrs: IggyTestAttrs = - syn::parse_quote!(server(executable_path = "iggy-server-ng")); + let attrs: IggyTestAttrs = syn::parse_quote!(server(executable_path = "iggy-server-ng")); assert_eq!( attrs.server.executable_path.as_deref(), Some("iggy-server-ng") diff --git a/core/message_bus/src/client_listener/quic.rs b/core/message_bus/src/client_listener/quic.rs index 262c03075..a4e191a20 100644 --- a/core/message_bus/src/client_listener/quic.rs +++ b/core/message_bus/src/client_listener/quic.rs @@ -41,10 +41,12 @@ use crate::lifecycle::ShutdownToken; use crate::transports::quic::{QuicTransportListener, accept_handshake}; use crate::{AcceptedQuicClientFn, AcceptedQuicConn}; +use compio::net::UdpSocket; use compio::runtime::JoinHandle; -use compio_quic::{Endpoint, ServerConfig}; +use compio_quic::{Endpoint, EndpointConfig, ServerConfig}; use futures::FutureExt; use iggy_common::IggyError; +use socket2::{Domain, Protocol, Socket, Type}; use std::net::SocketAddr; use std::time::Duration; use tracing::{debug, error, info}; @@ -63,9 +65,31 @@ pub async fn bind( addr: SocketAddr, server_config: ServerConfig, ) -> Result<(Endpoint, SocketAddr), IggyError> { - let endpoint = Endpoint::server(addr, server_config) - .await + // QUIC remains shard-0 terminal; this only enables coexistence with the + // harness's placeholder UDP socket during process startup. + // + // TODO: remove `SO_REUSEPORT` again once the integration harness stops + // holding placeholder reservation sockets open across child startup. + let socket = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP)) + .map_err(|e| IggyError::IoError(e.to_string()))?; + socket + .set_reuse_address(true) + .map_err(|e| IggyError::IoError(e.to_string()))?; + #[cfg(unix)] + socket + .set_reuse_port(true) + .map_err(|e| IggyError::IoError(e.to_string()))?; + socket + .bind(&addr.into()) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; + socket + .set_nonblocking(true) + .map_err(|e| IggyError::IoError(e.to_string()))?; + + let std_socket: std::net::UdpSocket = socket.into(); + let socket = UdpSocket::from_std(std_socket).map_err(|e| IggyError::IoError(e.to_string()))?; + let endpoint = Endpoint::new(socket, EndpointConfig::default(), Some(server_config), None) + .map_err(|e| IggyError::IoError(e.to_string()))?; let actual = endpoint .local_addr() .map_err(|e| IggyError::IoError(e.to_string()))?; diff --git a/core/message_bus/src/client_listener/tcp.rs b/core/message_bus/src/client_listener/tcp.rs index e8b3ab8ce..a79368c40 100644 --- a/core/message_bus/src/client_listener/tcp.rs +++ b/core/message_bus/src/client_listener/tcp.rs @@ -28,7 +28,8 @@ use crate::AcceptedClientFn; use crate::lifecycle::ShutdownToken; -use compio::net::{SocketOpts, TcpListener}; +use crate::socket_opts::bind_reusable_tcp_listener; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -42,12 +43,7 @@ use tracing::{debug, error, info}; /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { - // `SO_REUSEPORT` intentionally not set: only shard 0 binds the client - // listener. The shard-0 coordinator round-robins accepts to owning - // shards via `ShardFramePayload::ClientConnectionSetup`. - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/client_listener/tcp_tls.rs b/core/message_bus/src/client_listener/tcp_tls.rs index 7e7004424..9fe199bab 100644 --- a/core/message_bus/src/client_listener/tcp_tls.rs +++ b/core/message_bus/src/client_listener/tcp_tls.rs @@ -39,8 +39,9 @@ use crate::AcceptedTlsClientFn; use crate::lifecycle::ShutdownToken; +use crate::socket_opts::bind_reusable_tcp_listener; use crate::transports::tls::{TlsServerCredentials, install_default_crypto_provider}; -use compio::net::{SocketOpts, TcpListener}; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -85,9 +86,7 @@ pub async fn bind( // cannot enable it accidentally. cfg.max_early_data_size = 0; - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/client_listener/ws.rs b/core/message_bus/src/client_listener/ws.rs index 90a6aa266..961bf8e7e 100644 --- a/core/message_bus/src/client_listener/ws.rs +++ b/core/message_bus/src/client_listener/ws.rs @@ -36,7 +36,8 @@ use crate::AcceptedWsClientFn; use crate::lifecycle::ShutdownToken; -use compio::net::{SocketOpts, TcpListener}; +use crate::socket_opts::bind_reusable_tcp_listener; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -56,12 +57,7 @@ use tracing::{debug, error, info}; /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { - // `SO_REUSEPORT` intentionally not set: only shard 0 binds the WS - // listener. The shard-0 coordinator round-robins accepts to owning - // shards via `ShardFramePayload::ClientWsConnectionSetup`. - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/client_listener/wss.rs b/core/message_bus/src/client_listener/wss.rs index 26ad26d1c..96cfd5415 100644 --- a/core/message_bus/src/client_listener/wss.rs +++ b/core/message_bus/src/client_listener/wss.rs @@ -44,8 +44,9 @@ //! use crate::AcceptedWssClientFn; use crate::lifecycle::ShutdownToken; +use crate::socket_opts::bind_reusable_tcp_listener; use crate::transports::tls::{TlsServerCredentials, install_default_crypto_provider}; -use compio::net::{SocketOpts, TcpListener}; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -76,9 +77,7 @@ pub async fn bind( .map_err(|e| IggyError::IoError(format!("WSS server config build failed: {e}")))?; cfg.max_early_data_size = 0; - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/replica/listener.rs b/core/message_bus/src/replica/listener.rs index 854f0c998..6a125de74 100644 --- a/core/message_bus/src/replica/listener.rs +++ b/core/message_bus/src/replica/listener.rs @@ -83,8 +83,9 @@ use crate::framing; use crate::lifecycle::ShutdownToken; +use crate::socket_opts::bind_reusable_tcp_listener; use crate::{AcceptedReplicaFn, GenericHeader, Message}; -use compio::net::{SocketOpts, TcpListener, TcpStream}; +use compio::net::{TcpListener, TcpStream}; use compio::runtime::JoinHandle; use futures::FutureExt; use iggy_binary_protocol::Command2; @@ -108,12 +109,7 @@ pub type MessageHandler = Rc<dyn Fn(u8, Message<GenericHeader>)>; /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { - // `SO_REUSEPORT` intentionally not set: only shard 0 binds the replica - // listener. Kernel-level accept distribution would fight the shard-0 - // coordinator's explicit round-robin allocation. - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/socket_opts.rs b/core/message_bus/src/socket_opts.rs index d5736dbe3..8c8695212 100644 --- a/core/message_bus/src/socket_opts.rs +++ b/core/message_bus/src/socket_opts.rs @@ -23,9 +23,10 @@ //! than by `SO_KEEPALIVE`. Only `TCP_NODELAY` toggling lives in this //! module today. -use compio::net::TcpStream; -use socket2::SockRef; +use compio::net::{TcpListener, TcpStream}; +use socket2::{Domain, Protocol, SockRef, Socket, Type}; use std::io; +use std::net::SocketAddr; /// Disable Nagle on a per-connection socket. /// @@ -43,3 +44,67 @@ use std::io; pub fn apply_nodelay_for_connection(stream: &TcpStream) -> io::Result<()> { SockRef::from(stream).set_tcp_nodelay(true) } + +/// Build a TCP listener compatible with the harness's pre-bound reservation +/// sockets. +/// +/// Only shard 0 binds the real listener; `SO_REUSEPORT` is enabled here solely +/// so the server process can claim a port already held open by the integration +/// harness during startup. +/// +/// TODO: remove `SO_REUSEPORT` again once the integration harness stops +/// holding placeholder reservation sockets open across child startup. +pub fn bind_reusable_tcp_listener(addr: SocketAddr) -> io::Result<TcpListener> { + let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; + socket.set_reuse_address(true)?; + #[cfg(all( + unix, + not(any(target_os = "illumos", target_os = "solaris", target_os = "cygwin")) + ))] + socket.set_reuse_port(true)?; + socket.bind(&addr.into())?; + socket.listen(128)?; + socket.set_nonblocking(true)?; + + let std_listener: std::net::TcpListener = socket.into(); + TcpListener::from_std(std_listener) +} + +#[cfg(test)] +mod tests { + use super::bind_reusable_tcp_listener; + use compio::runtime::Runtime; + use socket2::{Domain, Protocol, Socket, Type}; + use std::net::{Ipv4Addr, SocketAddr}; + + #[test] + fn reusable_tcp_listener_can_bind_over_reserved_port() { + let reserve_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let socket = Socket::new( + Domain::for_address(reserve_addr), + Type::STREAM, + Some(Protocol::TCP), + ) + .expect("reserve socket"); + socket.set_reuse_address(true).expect("reserve reuseaddr"); + #[cfg(all( + unix, + not(any(target_os = "illumos", target_os = "solaris", target_os = "cygwin")) + ))] + socket.set_reuse_port(true).expect("reserve reuseport"); + socket.bind(&reserve_addr.into()).expect("reserve bind"); + socket.listen(1).expect("reserve listen"); + let addr = socket + .local_addr() + .expect("reserve local addr") + .as_socket() + .expect("socket addr"); + + let runtime = Runtime::new().expect("runtime"); + runtime.enter(|| { + let listener = bind_reusable_tcp_listener(addr) + .expect("second listener should bind on reserved port"); + assert_eq!(listener.local_addr().expect("listener addr"), addr); + }); + } +} diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 5a1a1502e..3d5d3c0b4 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -27,12 +27,12 @@ use crate::tcp::tcp_tls_connection_stream::TcpTlsConnectionStream; use async_broadcast::{Receiver, Sender, broadcast}; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; +#[cfg(not(feature = "vsr"))] +use iggy_common::IggyErrorDiscriminants; use iggy_common::{ AutoLogin, ClientState, ConnectionString, ConnectionStringUtils, Credentials, DiagnosticEvent, IggyDuration, IggyError, IggyTimestamp, TcpConnectionStringOptions, TransportProtocol, }; -#[cfg(not(feature = "vsr"))] -use iggy_common::IggyErrorDiscriminants; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject}; use secrecy::ExposeSecret; diff --git a/core/sdk/src/vsr.rs b/core/sdk/src/vsr.rs index c7fcb934e..1e4b19b33 100644 --- a/core/sdk/src/vsr.rs +++ b/core/sdk/src/vsr.rs @@ -47,7 +47,8 @@ pub(crate) fn encode_request( (Operation::Register, session.register_request_id(), 0) } _ => { - let operation = Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?; + let operation = + Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?; let session_id = session.session().ok_or(IggyError::Unauthenticated)?; (operation, session.next_request_id(), session_id) } @@ -117,19 +118,17 @@ fn namespace_for_request( if payload.len() < 4 { return Err(IggyError::InvalidCommand); } - let metadata_length = - u32::from_le_bytes(payload[..4].try_into().map_err(|_| IggyError::InvalidNumberEncoding)?) - as usize; + let metadata_length = u32::from_le_bytes( + payload[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ) as usize; if payload.len() < 4 + metadata_length { return Err(IggyError::InvalidCommand); } let header = SendMessagesHeader::decode_from(&payload[4..4 + metadata_length]) .map_err(|_| IggyError::InvalidCommand)?; - namespace_from_partitioning( - &header.stream_id, - &header.topic_id, - &header.partitioning, - )? + namespace_from_partitioning(&header.stream_id, &header.topic_id, &header.partitioning)? } STORE_CONSUMER_OFFSET_CODE => { let request = StoreConsumerOffsetRequest::decode_from(payload) diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 84a849066..9199d648b 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -28,12 +28,12 @@ use crate::prelude::Client; use async_broadcast::{Receiver, Sender, broadcast}; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; +#[cfg(not(feature = "vsr"))] +use iggy_common::IggyErrorDiscriminants; use iggy_common::{ AutoLogin, ClientState, ConnectionString, Credentials, DiagnosticEvent, IggyDuration, IggyError, IggyTimestamp, WebSocketClientConfig, WebSocketConnectionStringOptions, }; -#[cfg(not(feature = "vsr"))] -use iggy_common::IggyErrorDiscriminants; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use secrecy::ExposeSecret; use std::net::SocketAddr; @@ -645,7 +645,8 @@ impl WebSocketClient { response.put_slice(&response_header); if response_size > iggy_binary_protocol::HEADER_SIZE { - let mut response_body = vec![0u8; response_size - iggy_binary_protocol::HEADER_SIZE]; + let mut response_body = + vec![0u8; response_size - iggy_binary_protocol::HEADER_SIZE]; stream.read(&mut response_body).await?; response.put_slice(&response_body); }
