This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch integration_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 6c339b10893883e9e097d55822c34621971feaa2
Author: numinex <[email protected]>
AuthorDate: Thu May 14 08:54:42 2026 +0200

    test
---
 .../traits/binary_impls/personal_access_tokens.rs  | 17 +++---
 core/common/src/traits/binary_impls/users.rs       |  8 +--
 core/harness_derive/src/attrs.rs                   |  3 +-
 core/message_bus/src/client_listener/quic.rs       | 30 +++++++++-
 core/message_bus/src/client_listener/tcp.rs        | 10 +---
 core/message_bus/src/client_listener/tcp_tls.rs    |  7 +--
 core/message_bus/src/client_listener/ws.rs         | 10 +---
 core/message_bus/src/client_listener/wss.rs        |  7 +--
 core/message_bus/src/replica/listener.rs           | 10 +---
 core/message_bus/src/socket_opts.rs                | 69 +++++++++++++++++++++-
 core/sdk/src/tcp/tcp_client.rs                     |  4 +-
 core/sdk/src/vsr.rs                                | 17 +++---
 core/sdk/src/websocket/websocket_client.rs         |  7 ++-
 13 files changed, 137 insertions(+), 62 deletions(-)

diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs 
b/core/common/src/traits/binary_impls/personal_access_tokens.rs
index 4c740d79d..b7a46d84f 100644
--- a/core/common/src/traits/binary_impls/personal_access_tokens.rs
+++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs
@@ -25,27 +25,28 @@ use crate::{
 };
 use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
-use iggy_binary_protocol::codes::{
-    CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE,
-    GET_PERSONAL_ACCESS_TOKENS_CODE,
-};
 #[cfg(feature = "vsr")]
 use iggy_binary_protocol::codes::LOGIN_REGISTER_WITH_PAT_CODE;
 #[cfg(not(feature = "vsr"))]
 use iggy_binary_protocol::codes::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE;
-use iggy_binary_protocol::requests::personal_access_tokens::{
-    CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, 
GetPersonalAccessTokensRequest,
+use iggy_binary_protocol::codes::{
+    CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+    GET_PERSONAL_ACCESS_TOKENS_CODE,
 };
 #[cfg(not(feature = "vsr"))]
 use 
iggy_binary_protocol::requests::personal_access_tokens::LoginWithPersonalAccessTokenRequest;
+use iggy_binary_protocol::requests::personal_access_tokens::{
+    CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest,
+    GetPersonalAccessTokensRequest,
+};
 #[cfg(feature = "vsr")]
 use iggy_binary_protocol::requests::users::LoginRegisterWithPatRequest;
 use 
iggy_binary_protocol::responses::personal_access_tokens::create_personal_access_token::RawPersonalAccessTokenResponse;
 use 
iggy_binary_protocol::responses::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokensResponse;
-#[cfg(not(feature = "vsr"))]
-use iggy_binary_protocol::responses::users::login_user::IdentityResponse;
 #[cfg(feature = "vsr")]
 use iggy_binary_protocol::responses::users::LoginRegisterResponse;
+#[cfg(not(feature = "vsr"))]
+use iggy_binary_protocol::responses::users::login_user::IdentityResponse;
 #[cfg(feature = "vsr")]
 use secrecy::SecretString;
 
diff --git a/core/common/src/traits/binary_impls/users.rs 
b/core/common/src/traits/binary_impls/users.rs
index 183d942ca..06ec4a7bc 100644
--- a/core/common/src/traits/binary_impls/users.rs
+++ b/core/common/src/traits/binary_impls/users.rs
@@ -24,20 +24,20 @@ use crate::{
 };
 use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE;
 use iggy_binary_protocol::codes::{
     CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, 
GET_USERS_CODE,
     UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE,
 };
-#[cfg(feature = "vsr")]
-use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE;
 #[cfg(not(feature = "vsr"))]
 use iggy_binary_protocol::codes::{LOGIN_USER_CODE, LOGOUT_USER_CODE};
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::requests::users::LoginRegisterRequest;
 use iggy_binary_protocol::requests::users::{
     ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, 
GetUserRequest, GetUsersRequest,
     UpdatePermissionsRequest, UpdateUserRequest,
 };
-#[cfg(feature = "vsr")]
-use iggy_binary_protocol::requests::users::LoginRegisterRequest;
 #[cfg(not(feature = "vsr"))]
 use iggy_binary_protocol::requests::users::{LoginUserRequest, 
LogoutUserRequest};
 #[cfg(feature = "vsr")]
diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs
index 9c1018f31..d1e811025 100644
--- a/core/harness_derive/src/attrs.rs
+++ b/core/harness_derive/src/attrs.rs
@@ -637,8 +637,7 @@ mod tests {
 
     #[test]
     fn parse_server_executable_path() {
-        let attrs: IggyTestAttrs =
-            syn::parse_quote!(server(executable_path = "iggy-server-ng"));
+        let attrs: IggyTestAttrs = syn::parse_quote!(server(executable_path = 
"iggy-server-ng"));
         assert_eq!(
             attrs.server.executable_path.as_deref(),
             Some("iggy-server-ng")
diff --git a/core/message_bus/src/client_listener/quic.rs 
b/core/message_bus/src/client_listener/quic.rs
index 262c03075..a4e191a20 100644
--- a/core/message_bus/src/client_listener/quic.rs
+++ b/core/message_bus/src/client_listener/quic.rs
@@ -41,10 +41,12 @@
 use crate::lifecycle::ShutdownToken;
 use crate::transports::quic::{QuicTransportListener, accept_handshake};
 use crate::{AcceptedQuicClientFn, AcceptedQuicConn};
+use compio::net::UdpSocket;
 use compio::runtime::JoinHandle;
-use compio_quic::{Endpoint, ServerConfig};
+use compio_quic::{Endpoint, EndpointConfig, ServerConfig};
 use futures::FutureExt;
 use iggy_common::IggyError;
+use socket2::{Domain, Protocol, Socket, Type};
 use std::net::SocketAddr;
 use std::time::Duration;
 use tracing::{debug, error, info};
@@ -63,9 +65,31 @@ pub async fn bind(
     addr: SocketAddr,
     server_config: ServerConfig,
 ) -> Result<(Endpoint, SocketAddr), IggyError> {
-    let endpoint = Endpoint::server(addr, server_config)
-        .await
+    // QUIC remains shard-0 terminal; this only enables coexistence with the
+    // harness's placeholder UDP socket during process startup.
+    //
+    // TODO: remove `SO_REUSEPORT` again once the integration harness stops
+    // holding placeholder reservation sockets open across child startup.
+    let socket = Socket::new(Domain::for_address(addr), Type::DGRAM, 
Some(Protocol::UDP))
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
+    socket
+        .set_reuse_address(true)
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
+    #[cfg(unix)]
+    socket
+        .set_reuse_port(true)
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
+    socket
+        .bind(&addr.into())
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
+    socket
+        .set_nonblocking(true)
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
+
+    let std_socket: std::net::UdpSocket = socket.into();
+    let socket = UdpSocket::from_std(std_socket).map_err(|e| 
IggyError::IoError(e.to_string()))?;
+    let endpoint = Endpoint::new(socket, EndpointConfig::default(), 
Some(server_config), None)
+        .map_err(|e| IggyError::IoError(e.to_string()))?;
     let actual = endpoint
         .local_addr()
         .map_err(|e| IggyError::IoError(e.to_string()))?;
diff --git a/core/message_bus/src/client_listener/tcp.rs 
b/core/message_bus/src/client_listener/tcp.rs
index e8b3ab8ce..a79368c40 100644
--- a/core/message_bus/src/client_listener/tcp.rs
+++ b/core/message_bus/src/client_listener/tcp.rs
@@ -28,7 +28,8 @@
 
 use crate::AcceptedClientFn;
 use crate::lifecycle::ShutdownToken;
-use compio::net::{SocketOpts, TcpListener};
+use crate::socket_opts::bind_reusable_tcp_listener;
+use compio::net::TcpListener;
 use futures::FutureExt;
 use iggy_common::IggyError;
 use std::net::SocketAddr;
@@ -42,12 +43,7 @@ use tracing::{debug, error, info};
 /// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
 #[allow(clippy::future_not_send)]
 pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), 
IggyError> {
-    // `SO_REUSEPORT` intentionally not set: only shard 0 binds the client
-    // listener. The shard-0 coordinator round-robins accepts to owning
-    // shards via `ShardFramePayload::ClientConnectionSetup`.
-    let opts = SocketOpts::new().nodelay(true);
-    let listener = TcpListener::bind_with_options(addr, &opts)
-        .await
+    let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
         .local_addr()
diff --git a/core/message_bus/src/client_listener/tcp_tls.rs 
b/core/message_bus/src/client_listener/tcp_tls.rs
index 7e7004424..9fe199bab 100644
--- a/core/message_bus/src/client_listener/tcp_tls.rs
+++ b/core/message_bus/src/client_listener/tcp_tls.rs
@@ -39,8 +39,9 @@
 
 use crate::AcceptedTlsClientFn;
 use crate::lifecycle::ShutdownToken;
+use crate::socket_opts::bind_reusable_tcp_listener;
 use crate::transports::tls::{TlsServerCredentials, 
install_default_crypto_provider};
-use compio::net::{SocketOpts, TcpListener};
+use compio::net::TcpListener;
 use futures::FutureExt;
 use iggy_common::IggyError;
 use std::net::SocketAddr;
@@ -85,9 +86,7 @@ pub async fn bind(
     // cannot enable it accidentally.
     cfg.max_early_data_size = 0;
 
-    let opts = SocketOpts::new().nodelay(true);
-    let listener = TcpListener::bind_with_options(addr, &opts)
-        .await
+    let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
         .local_addr()
diff --git a/core/message_bus/src/client_listener/ws.rs 
b/core/message_bus/src/client_listener/ws.rs
index 90a6aa266..961bf8e7e 100644
--- a/core/message_bus/src/client_listener/ws.rs
+++ b/core/message_bus/src/client_listener/ws.rs
@@ -36,7 +36,8 @@
 
 use crate::AcceptedWsClientFn;
 use crate::lifecycle::ShutdownToken;
-use compio::net::{SocketOpts, TcpListener};
+use crate::socket_opts::bind_reusable_tcp_listener;
+use compio::net::TcpListener;
 use futures::FutureExt;
 use iggy_common::IggyError;
 use std::net::SocketAddr;
@@ -56,12 +57,7 @@ use tracing::{debug, error, info};
 /// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
 #[allow(clippy::future_not_send)]
 pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), 
IggyError> {
-    // `SO_REUSEPORT` intentionally not set: only shard 0 binds the WS
-    // listener. The shard-0 coordinator round-robins accepts to owning
-    // shards via `ShardFramePayload::ClientWsConnectionSetup`.
-    let opts = SocketOpts::new().nodelay(true);
-    let listener = TcpListener::bind_with_options(addr, &opts)
-        .await
+    let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
         .local_addr()
diff --git a/core/message_bus/src/client_listener/wss.rs 
b/core/message_bus/src/client_listener/wss.rs
index 26ad26d1c..96cfd5415 100644
--- a/core/message_bus/src/client_listener/wss.rs
+++ b/core/message_bus/src/client_listener/wss.rs
@@ -44,8 +44,9 @@
 //!
 use crate::AcceptedWssClientFn;
 use crate::lifecycle::ShutdownToken;
+use crate::socket_opts::bind_reusable_tcp_listener;
 use crate::transports::tls::{TlsServerCredentials, 
install_default_crypto_provider};
-use compio::net::{SocketOpts, TcpListener};
+use compio::net::TcpListener;
 use futures::FutureExt;
 use iggy_common::IggyError;
 use std::net::SocketAddr;
@@ -76,9 +77,7 @@ pub async fn bind(
         .map_err(|e| IggyError::IoError(format!("WSS server config build 
failed: {e}")))?;
     cfg.max_early_data_size = 0;
 
-    let opts = SocketOpts::new().nodelay(true);
-    let listener = TcpListener::bind_with_options(addr, &opts)
-        .await
+    let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
         .local_addr()
diff --git a/core/message_bus/src/replica/listener.rs 
b/core/message_bus/src/replica/listener.rs
index 854f0c998..6a125de74 100644
--- a/core/message_bus/src/replica/listener.rs
+++ b/core/message_bus/src/replica/listener.rs
@@ -83,8 +83,9 @@
 
 use crate::framing;
 use crate::lifecycle::ShutdownToken;
+use crate::socket_opts::bind_reusable_tcp_listener;
 use crate::{AcceptedReplicaFn, GenericHeader, Message};
-use compio::net::{SocketOpts, TcpListener, TcpStream};
+use compio::net::{TcpListener, TcpStream};
 use compio::runtime::JoinHandle;
 use futures::FutureExt;
 use iggy_binary_protocol::Command2;
@@ -108,12 +109,7 @@ pub type MessageHandler = Rc<dyn Fn(u8, 
Message<GenericHeader>)>;
 /// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
 #[allow(clippy::future_not_send)]
 pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), 
IggyError> {
-    // `SO_REUSEPORT` intentionally not set: only shard 0 binds the replica
-    // listener. Kernel-level accept distribution would fight the shard-0
-    // coordinator's explicit round-robin allocation.
-    let opts = SocketOpts::new().nodelay(true);
-    let listener = TcpListener::bind_with_options(addr, &opts)
-        .await
+    let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
         .local_addr()
diff --git a/core/message_bus/src/socket_opts.rs 
b/core/message_bus/src/socket_opts.rs
index d5736dbe3..8c8695212 100644
--- a/core/message_bus/src/socket_opts.rs
+++ b/core/message_bus/src/socket_opts.rs
@@ -23,9 +23,10 @@
 //! than by `SO_KEEPALIVE`. Only `TCP_NODELAY` toggling lives in this
 //! module today.
 
-use compio::net::TcpStream;
-use socket2::SockRef;
+use compio::net::{TcpListener, TcpStream};
+use socket2::{Domain, Protocol, SockRef, Socket, Type};
 use std::io;
+use std::net::SocketAddr;
 
 /// Disable Nagle on a per-connection socket.
 ///
@@ -43,3 +44,67 @@ use std::io;
 pub fn apply_nodelay_for_connection(stream: &TcpStream) -> io::Result<()> {
     SockRef::from(stream).set_tcp_nodelay(true)
 }
+
+/// Build a TCP listener compatible with the harness's pre-bound reservation
+/// sockets.
+///
+/// Only shard 0 binds the real listener; `SO_REUSEPORT` is enabled here solely
+/// so the server process can claim a port already held open by the integration
+/// harness during startup.
+///
+/// TODO: remove `SO_REUSEPORT` again once the integration harness stops
+/// holding placeholder reservation sockets open across child startup.
+pub fn bind_reusable_tcp_listener(addr: SocketAddr) -> io::Result<TcpListener> 
{
+    let socket = Socket::new(Domain::for_address(addr), Type::STREAM, 
Some(Protocol::TCP))?;
+    socket.set_reuse_address(true)?;
+    #[cfg(all(
+        unix,
+        not(any(target_os = "illumos", target_os = "solaris", target_os = 
"cygwin"))
+    ))]
+    socket.set_reuse_port(true)?;
+    socket.bind(&addr.into())?;
+    socket.listen(128)?;
+    socket.set_nonblocking(true)?;
+
+    let std_listener: std::net::TcpListener = socket.into();
+    TcpListener::from_std(std_listener)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::bind_reusable_tcp_listener;
+    use compio::runtime::Runtime;
+    use socket2::{Domain, Protocol, Socket, Type};
+    use std::net::{Ipv4Addr, SocketAddr};
+
+    #[test]
+    fn reusable_tcp_listener_can_bind_over_reserved_port() {
+        let reserve_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
+        let socket = Socket::new(
+            Domain::for_address(reserve_addr),
+            Type::STREAM,
+            Some(Protocol::TCP),
+        )
+        .expect("reserve socket");
+        socket.set_reuse_address(true).expect("reserve reuseaddr");
+        #[cfg(all(
+            unix,
+            not(any(target_os = "illumos", target_os = "solaris", target_os = 
"cygwin"))
+        ))]
+        socket.set_reuse_port(true).expect("reserve reuseport");
+        socket.bind(&reserve_addr.into()).expect("reserve bind");
+        socket.listen(1).expect("reserve listen");
+        let addr = socket
+            .local_addr()
+            .expect("reserve local addr")
+            .as_socket()
+            .expect("socket addr");
+
+        let runtime = Runtime::new().expect("runtime");
+        runtime.enter(|| {
+            let listener = bind_reusable_tcp_listener(addr)
+                .expect("second listener should bind on reserved port");
+            assert_eq!(listener.local_addr().expect("listener addr"), addr);
+        });
+    }
+}
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 5a1a1502e..3d5d3c0b4 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -27,12 +27,12 @@ use 
crate::tcp::tcp_tls_connection_stream::TcpTlsConnectionStream;
 use async_broadcast::{Receiver, Sender, broadcast};
 use async_trait::async_trait;
 use bytes::{BufMut, Bytes, BytesMut};
+#[cfg(not(feature = "vsr"))]
+use iggy_common::IggyErrorDiscriminants;
 use iggy_common::{
     AutoLogin, ClientState, ConnectionString, ConnectionStringUtils, 
Credentials, DiagnosticEvent,
     IggyDuration, IggyError, IggyTimestamp, TcpConnectionStringOptions, 
TransportProtocol,
 };
-#[cfg(not(feature = "vsr"))]
-use iggy_common::IggyErrorDiscriminants;
 use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, 
UserClient};
 use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject};
 use secrecy::ExposeSecret;
diff --git a/core/sdk/src/vsr.rs b/core/sdk/src/vsr.rs
index c7fcb934e..1e4b19b33 100644
--- a/core/sdk/src/vsr.rs
+++ b/core/sdk/src/vsr.rs
@@ -47,7 +47,8 @@ pub(crate) fn encode_request(
             (Operation::Register, session.register_request_id(), 0)
         }
         _ => {
-            let operation = 
Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?;
+            let operation =
+                
Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?;
             let session_id = 
session.session().ok_or(IggyError::Unauthenticated)?;
             (operation, session.next_request_id(), session_id)
         }
@@ -117,19 +118,17 @@ fn namespace_for_request(
             if payload.len() < 4 {
                 return Err(IggyError::InvalidCommand);
             }
-            let metadata_length =
-                u32::from_le_bytes(payload[..4].try_into().map_err(|_| 
IggyError::InvalidNumberEncoding)?)
-                    as usize;
+            let metadata_length = u32::from_le_bytes(
+                payload[..4]
+                    .try_into()
+                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
+            ) as usize;
             if payload.len() < 4 + metadata_length {
                 return Err(IggyError::InvalidCommand);
             }
             let header = SendMessagesHeader::decode_from(&payload[4..4 + 
metadata_length])
                 .map_err(|_| IggyError::InvalidCommand)?;
-            namespace_from_partitioning(
-                &header.stream_id,
-                &header.topic_id,
-                &header.partitioning,
-            )?
+            namespace_from_partitioning(&header.stream_id, &header.topic_id, 
&header.partitioning)?
         }
         STORE_CONSUMER_OFFSET_CODE => {
             let request = StoreConsumerOffsetRequest::decode_from(payload)
diff --git a/core/sdk/src/websocket/websocket_client.rs 
b/core/sdk/src/websocket/websocket_client.rs
index 84a849066..9199d648b 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -28,12 +28,12 @@ use crate::prelude::Client;
 use async_broadcast::{Receiver, Sender, broadcast};
 use async_trait::async_trait;
 use bytes::{BufMut, Bytes, BytesMut};
+#[cfg(not(feature = "vsr"))]
+use iggy_common::IggyErrorDiscriminants;
 use iggy_common::{
     AutoLogin, ClientState, ConnectionString, Credentials, DiagnosticEvent, 
IggyDuration,
     IggyError, IggyTimestamp, WebSocketClientConfig, 
WebSocketConnectionStringOptions,
 };
-#[cfg(not(feature = "vsr"))]
-use iggy_common::IggyErrorDiscriminants;
 use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, 
UserClient};
 use secrecy::ExposeSecret;
 use std::net::SocketAddr;
@@ -645,7 +645,8 @@ impl WebSocketClient {
             response.put_slice(&response_header);
 
             if response_size > iggy_binary_protocol::HEADER_SIZE {
-                let mut response_body = vec![0u8; response_size - 
iggy_binary_protocol::HEADER_SIZE];
+                let mut response_body =
+                    vec![0u8; response_size - 
iggy_binary_protocol::HEADER_SIZE];
                 stream.read(&mut response_body).await?;
                 response.put_slice(&response_body);
             }

Reply via email to