This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch integration_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/integration_tests by this push:
new 795edc6b3 review nits
795edc6b3 is described below
commit 795edc6b3b716a7fa56fdcdf5d528da82d00670d
Author: numinex <[email protected]>
AuthorDate: Fri May 15 17:58:15 2026 +0200
review nits
---
Cargo.lock | 10 ----
Cargo.toml | 1 -
core/integration-vsr/Cargo.toml | 30 ------------
core/integration/Cargo.toml | 1 +
core/integration/src/harness/handle/server.rs | 22 +++++++--
.../src/harness/orchestrator/builder.rs | 25 +++++++++-
.../src/harness/orchestrator/harness.rs | 53 ++++++++++++++++++++++
core/integration/src/harness/port_reserver.rs | 8 ++++
core/integration/tests/mod.rs | 8 ++++
.../tests => integration/tests/sdk}/hello_world.rs | 13 ++++--
core/integration/tests/sdk/mod.rs | 2 +
core/message_bus/src/client_listener/quic.rs | 2 +-
core/message_bus/src/client_listener/tcp.rs | 2 +-
core/message_bus/src/client_listener/tcp_tls.rs | 2 +-
core/message_bus/src/client_listener/ws.rs | 2 +-
core/message_bus/src/client_listener/wss.rs | 2 +-
core/message_bus/src/replica/io.rs | 10 ++--
core/message_bus/tests/graceful_shutdown.rs | 4 +-
core/message_bus/tests/quic_client_roundtrip.rs | 4 +-
core/message_bus/tests/tcp_client_roundtrip.rs | 4 +-
core/message_bus/tests/tcp_tls_client_listener.rs | 4 +-
core/message_bus/tests/ws_client_roundtrip.rs | 4 +-
core/message_bus/tests/wss_client_listener.rs | 4 +-
core/metadata/src/stm/user.rs | 5 ++
core/sdk/src/quic/quic_client.rs | 2 +-
core/sdk/src/websocket/websocket_client.rs | 2 +-
core/server-ng/src/bootstrap.rs | 21 +++++----
27 files changed, 165 insertions(+), 82 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index ffa4b93ec..5bf35b52d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7082,16 +7082,6 @@ dependencies = [
"zip 8.6.0",
]
-[[package]]
-name = "integration-vsr"
-version = "0.0.1"
-dependencies = [
- "iggy",
- "integration",
- "serial_test",
- "tokio",
-]
-
[[package]]
name = "interpolate_name"
version = "0.2.4"
diff --git a/Cargo.toml b/Cargo.toml
index 5b06d5f05..6c8cb8434 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,7 +49,6 @@ members = [
"core/consensus",
"core/harness_derive",
"core/integration",
- "core/integration-vsr",
"core/journal",
"core/message_bus",
"core/metadata",
diff --git a/core/integration-vsr/Cargo.toml b/core/integration-vsr/Cargo.toml
deleted file mode 100644
index 4f956b2b9..000000000
--- a/core/integration-vsr/Cargo.toml
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "integration-vsr"
-version = "0.0.1"
-edition = "2024"
-license = "Apache-2.0"
-publish = false
-
-[dependencies]
-iggy = { workspace = true, features = ["vsr"] }
-integration = { path = "../integration" }
-serial_test = { workspace = true }
-tokio = { workspace = true, features = ["full", "test-util"] }
-
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 9795e41da..373b9268e 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -26,6 +26,7 @@ publish = false
# inside the docker containers. This is a temporary workaround (hopefully).
[features]
ci-qemu = []
+vsr = ["iggy/vsr"]
[dependencies]
assert_cmd = { workspace = true }
diff --git a/core/integration/src/harness/handle/server.rs
b/core/integration/src/harness/handle/server.rs
index 47839b84b..466d6808f 100644
--- a/core/integration/src/harness/handle/server.rs
+++ b/core/integration/src/harness/handle/server.rs
@@ -93,11 +93,23 @@ impl std::fmt::Debug for ServerHandle {
}
impl ServerHandle {
+ fn default_server_binary() -> &'static str {
+ #[cfg(feature = "vsr")]
+ {
+ "iggy-server-ng"
+ }
+
+ #[cfg(not(feature = "vsr"))]
+ {
+ "iggy-server"
+ }
+ }
+
fn launched_binary(&self) -> String {
if let Some(path) = &self.config.executable_path {
path.clone()
} else {
- "iggy-server".to_string()
+ Self::default_server_binary().to_string()
}
}
@@ -796,16 +808,20 @@ impl TestBinary for ServerHandle {
Command::new(path)
}
} else {
- Command::cargo_bin("iggy-server").map_err(|e|
TestBinaryError::ProcessSpawn {
+ Command::cargo_bin(Self::default_server_binary()).map_err(|e| {
+ TestBinaryError::ProcessSpawn {
binary: launched_binary.clone(),
source: std::io::Error::other(e.to_string()),
+ }
})?
};
command.env("IGGY_SYSTEM_PATH", data_path.display().to_string());
command.envs(&self.envs);
- // TODO(hubcio): Remove --follower flag when proper clustering is
implemented
+ // Legacy clustering elects node 0 externally and requires explicit
followers.
+ // VSR/server-ng elects its own primary and should see symmetric node
startup.
+ #[cfg(not(feature = "vsr"))]
if self.server_id > 0 {
command.arg("--follower");
}
diff --git a/core/integration/src/harness/orchestrator/builder.rs
b/core/integration/src/harness/orchestrator/builder.rs
index 058bd6c96..0326f5853 100644
--- a/core/integration/src/harness/orchestrator/builder.rs
+++ b/core/integration/src/harness/orchestrator/builder.rs
@@ -250,7 +250,7 @@ fn build_servers(
return Ok(Vec::new());
};
- let node_count = cluster_node_count.unwrap_or(1);
+ let node_count =
cluster_node_count.unwrap_or_else(default_cluster_node_count);
if node_count == 1 {
return Ok(vec![ServerHandle::with_config(config, context.clone())]);
@@ -296,6 +296,18 @@ fn build_servers(
Ok(servers)
}
+fn default_cluster_node_count() -> usize {
+ #[cfg(feature = "vsr")]
+ {
+ 3
+ }
+
+ #[cfg(not(feature = "vsr"))]
+ {
+ 1
+ }
+}
+
fn build_cluster_envs(
node_index: usize,
cluster_name: &str,
@@ -311,6 +323,11 @@ fn build_cluster_envs(
envs.insert("IGGY_CLUSTER_ENABLED".to_string(), "true".to_string());
envs.insert("IGGY_CLUSTER_NAME".to_string(), cluster_name.to_string());
+ #[cfg(feature = "vsr")]
+ envs.insert(
+ "IGGY_MESSAGE_BUS_RECONNECT_PERIOD".to_string(),
+ "100ms".to_string(),
+ );
// Node identity is supplied via `--replica-id` on the command line by
// ServerHandle::spawn; every cluster env var emitted here is identical
// across all spawned servers.
@@ -329,6 +346,12 @@ fn build_cluster_envs(
tcp.port().to_string(),
);
}
+ if let Some(tcp_replica) = addrs.tcp_replica {
+ envs.insert(
+ format!("IGGY_CLUSTER_NODES_{i}_PORTS_TCP_REPLICA"),
+ tcp_replica.port().to_string(),
+ );
+ }
if let Some(http) = addrs.http {
envs.insert(
format!("IGGY_CLUSTER_NODES_{i}_PORTS_HTTP"),
diff --git a/core/integration/src/harness/orchestrator/harness.rs
b/core/integration/src/harness/orchestrator/harness.rs
index 3018cb39f..5fc3ceb71 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -28,9 +28,15 @@ use crate::harness::handle::{
use crate::harness::traits::{Restartable, TestBinary};
use futures::executor::block_on;
use iggy::prelude::{ClientWrapper, IggyClient};
+#[cfg(feature = "vsr")]
+use iggy_common::Client;
use iggy_common::TransportProtocol;
use std::path::Path;
use std::sync::Arc;
+#[cfg(feature = "vsr")]
+use std::time::{Duration, Instant};
+#[cfg(feature = "vsr")]
+use tokio::time::{sleep, timeout};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
@@ -152,6 +158,8 @@ impl TestHarness {
server.start()?;
}
+ self.wait_for_cluster_ready().await?;
+
if let Some(seed_fn) = seed {
let client = self.tcp_root_client().await?;
seed_fn(client)
@@ -166,6 +174,51 @@ impl TestHarness {
Ok(())
}
+ async fn wait_for_cluster_ready(&self) -> Result<(), TestBinaryError> {
+ #[cfg(not(feature = "vsr"))]
+ {
+ Ok(())
+ }
+
+ #[cfg(feature = "vsr")]
+ {
+ if self.servers.len() <= 1 {
+ return Ok(());
+ }
+
+ const CLUSTER_READY_TIMEOUT: Duration = Duration::from_secs(15);
+ const CLUSTER_READY_RETRY_INTERVAL: Duration =
Duration::from_millis(200);
+ const LOGIN_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(750);
+
+ let deadline = Instant::now() + CLUSTER_READY_TIMEOUT;
+ let mut last_error = None;
+
+ while Instant::now() < deadline {
+ match timeout(LOGIN_ATTEMPT_TIMEOUT,
self.tcp_root_client()).await {
+ Ok(Ok(client)) => {
+ let _ = client.disconnect().await;
+ return Ok(());
+ }
+ Ok(Err(error)) => {
+ last_error = Some(error.to_string());
+ sleep(CLUSTER_READY_RETRY_INTERVAL).await;
+ }
+ Err(_) => {
+ last_error = Some("login attempt timed
out".to_string());
+ sleep(CLUSTER_READY_RETRY_INTERVAL).await;
+ }
+ }
+ }
+
+ Err(TestBinaryError::InvalidState {
+ message: format!(
+ "Timed out waiting for VSR cluster readiness: {}",
+ last_error.unwrap_or_else(|| "unknown error".to_string())
+ ),
+ })
+ }
+ }
+
async fn start_dependents(&mut self) -> Result<(), TestBinaryError> {
for server in &mut self.servers {
server.start_dependents().await?;
diff --git a/core/integration/src/harness/port_reserver.rs
b/core/integration/src/harness/port_reserver.rs
index cc59a6d02..8f658c179 100644
--- a/core/integration/src/harness/port_reserver.rs
+++ b/core/integration/src/harness/port_reserver.rs
@@ -155,6 +155,7 @@ impl ReservedPort {
/// Pre-allocated ports for all enabled protocols.
pub struct PortReserver {
tcp: Option<ReservedPort>,
+ tcp_replica: Option<ReservedPort>,
quic: Option<ReservedPort>,
http: Option<ReservedPort>,
websocket: Option<ReservedPort>,
@@ -184,6 +185,7 @@ impl SinglePortReserver {
#[derive(Debug, Clone)]
pub struct ProtocolAddresses {
pub tcp: Option<SocketAddr>,
+ pub tcp_replica: Option<SocketAddr>,
pub quic: Option<SocketAddr>,
pub http: Option<SocketAddr>,
pub websocket: Option<SocketAddr>,
@@ -226,6 +228,7 @@ impl PortReserver {
config: &TestServerConfig,
) -> Result<Self, TestBinaryError> {
let tcp = Some(ReservedPort::tcp(ip_kind)?);
+ let tcp_replica = Some(ReservedPort::tcp(ip_kind)?);
let quic = if config.quic_enabled {
Some(ReservedPort::udp(ip_kind)?)
@@ -247,6 +250,7 @@ impl PortReserver {
Ok(Self {
tcp,
+ tcp_replica,
quic,
http,
websocket,
@@ -257,6 +261,7 @@ impl PortReserver {
pub fn addresses(&self) -> ProtocolAddresses {
ProtocolAddresses {
tcp: self.tcp.as_ref().map(ReservedPort::addr),
+ tcp_replica: self.tcp_replica.as_ref().map(ReservedPort::addr),
quic: self.quic.as_ref().map(ReservedPort::addr),
http: self.http.as_ref().map(ReservedPort::addr),
websocket: self.websocket.as_ref().map(ReservedPort::addr),
@@ -268,6 +273,9 @@ impl PortReserver {
if let Some(tcp) = self.tcp {
tcp.release();
}
+ if let Some(tcp_replica) = self.tcp_replica {
+ tcp_replica.release();
+ }
if let Some(quic) = self.quic {
quic.release();
}
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index 7c543a333..4163aa175 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -31,14 +31,22 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, fmt};
+#[cfg(not(feature = "vsr"))]
mod cli;
+#[cfg(not(feature = "vsr"))]
mod cluster;
+#[cfg(not(feature = "vsr"))]
mod config_provider;
+#[cfg(not(feature = "vsr"))]
mod connectors;
+#[cfg(not(feature = "vsr"))]
mod data_integrity;
+#[cfg(not(feature = "vsr"))]
mod mcp;
mod sdk;
+#[cfg(not(feature = "vsr"))]
mod server;
+#[cfg(not(feature = "vsr"))]
mod state;
lazy_static! {
diff --git a/core/integration-vsr/tests/hello_world.rs
b/core/integration/tests/sdk/hello_world.rs
similarity index 80%
rename from core/integration-vsr/tests/hello_world.rs
rename to core/integration/tests/sdk/hello_world.rs
index 4e2af7d56..0c3746c21 100644
--- a/core/integration-vsr/tests/hello_world.rs
+++ b/core/integration/tests/sdk/hello_world.rs
@@ -19,10 +19,15 @@
use iggy::prelude::*;
use integration::iggy_harness;
-#[iggy_harness(
- test_client_transport = [Tcp, WebSocket],
- server(executable_path = "iggy-server-ng")
-)]
+#[cfg(not(feature = "vsr"))]
+#[iggy_harness]
+async fn hello_world(harness: &TestHarness) {
+ let client = harness.root_client().await.unwrap();
+ client.ping().await.unwrap();
+}
+
+#[cfg(feature = "vsr")]
+#[iggy_harness(test_client_transport = [Tcp, WebSocket])]
async fn hello_world(harness: &TestHarness) {
let client = harness.new_client().await.unwrap();
client
diff --git a/core/integration/tests/sdk/mod.rs
b/core/integration/tests/sdk/mod.rs
index 6d94bfa61..09a6a8697 100644
--- a/core/integration/tests/sdk/mod.rs
+++ b/core/integration/tests/sdk/mod.rs
@@ -16,4 +16,6 @@
* under the License.
*/
+mod hello_world;
+#[cfg(not(feature = "vsr"))]
mod producer;
diff --git a/core/message_bus/src/client_listener/quic.rs
b/core/message_bus/src/client_listener/quic.rs
index a4e191a20..806ec502a 100644
--- a/core/message_bus/src/client_listener/quic.rs
+++ b/core/message_bus/src/client_listener/quic.rs
@@ -61,7 +61,7 @@ use tracing::{debug, error, info};
///
/// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
#[allow(clippy::future_not_send)]
-pub async fn bind(
+pub fn bind(
addr: SocketAddr,
server_config: ServerConfig,
) -> Result<(Endpoint, SocketAddr), IggyError> {
diff --git a/core/message_bus/src/client_listener/tcp.rs
b/core/message_bus/src/client_listener/tcp.rs
index a79368c40..8edbc1c53 100644
--- a/core/message_bus/src/client_listener/tcp.rs
+++ b/core/message_bus/src/client_listener/tcp.rs
@@ -42,7 +42,7 @@ use tracing::{debug, error, info};
///
/// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
#[allow(clippy::future_not_send)]
-pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr),
IggyError> {
+pub fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> {
let listener = bind_reusable_tcp_listener(addr)
.map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
let actual = listener
diff --git a/core/message_bus/src/client_listener/tcp_tls.rs
b/core/message_bus/src/client_listener/tcp_tls.rs
index 9fe199bab..fce2f8ca2 100644
--- a/core/message_bus/src/client_listener/tcp_tls.rs
+++ b/core/message_bus/src/client_listener/tcp_tls.rs
@@ -71,7 +71,7 @@ use tracing::{debug, error, info};
/// from `credentials` (cert / key mismatch).
/// - [`IggyError::CannotBindToSocket`] if the TCP bind fails.
#[allow(clippy::future_not_send)]
-pub async fn bind(
+pub fn bind(
addr: SocketAddr,
credentials: TlsServerCredentials,
) -> Result<(TcpListener, Arc<rustls::ServerConfig>, SocketAddr), IggyError> {
diff --git a/core/message_bus/src/client_listener/ws.rs
b/core/message_bus/src/client_listener/ws.rs
index 961bf8e7e..f6863e9e8 100644
--- a/core/message_bus/src/client_listener/ws.rs
+++ b/core/message_bus/src/client_listener/ws.rs
@@ -56,7 +56,7 @@ use tracing::{debug, error, info};
///
/// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
#[allow(clippy::future_not_send)]
-pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr),
IggyError> {
+pub fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> {
let listener = bind_reusable_tcp_listener(addr)
.map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
let actual = listener
diff --git a/core/message_bus/src/client_listener/wss.rs
b/core/message_bus/src/client_listener/wss.rs
index 96cfd5415..f686c65f8 100644
--- a/core/message_bus/src/client_listener/wss.rs
+++ b/core/message_bus/src/client_listener/wss.rs
@@ -66,7 +66,7 @@ use tracing::{debug, error, info};
/// from `credentials` (cert / key mismatch).
/// - [`IggyError::CannotBindToSocket`] if the TCP bind fails.
#[allow(clippy::future_not_send)]
-pub async fn bind(
+pub fn bind(
addr: SocketAddr,
credentials: TlsServerCredentials,
) -> Result<(TcpListener, Arc<rustls::ServerConfig>, SocketAddr), IggyError> {
diff --git a/core/message_bus/src/replica/io.rs
b/core/message_bus/src/replica/io.rs
index 3e5da9fbc..0cde3bb12 100644
--- a/core/message_bus/src/replica/io.rs
+++ b/core/message_bus/src/replica/io.rs
@@ -207,7 +207,7 @@ pub async fn start_on_shard_zero(
);
let (replica_listener, replica_bound) =
bind_replica_listener(replica_listen_addr).await?;
- let (clients_listener, client_bound) =
client_listener::tcp::bind(client_listen_addr).await?;
+ let (clients_listener, client_bound) =
client_listener::tcp::bind(client_listen_addr)?;
let token_for_replica = bus.token();
let on_accepted_replica_for_listener = on_accepted_replica.clone();
@@ -236,7 +236,7 @@ pub async fn start_on_shard_zero(
let ws_bound = match (ws_listen_addr, on_accepted_ws_client) {
(Some(addr), Some(on_accepted_ws)) => {
- let (ws_listener, ws_bound) =
client_listener::ws::bind(addr).await?;
+ let (ws_listener, ws_bound) = client_listener::ws::bind(addr)?;
let token_for_ws = bus.token();
let ws_handle = compio::runtime::spawn(async move {
client_listener::ws::run(ws_listener, token_for_ws,
on_accepted_ws).await;
@@ -257,7 +257,7 @@ pub async fn start_on_shard_zero(
.map_err(|e| {
IggyError::IoError(format!("QUIC server config build
failed: {e}"))
})?;
- let (endpoint, quic_bound) = client_listener::quic::bind(addr,
server_config).await?;
+ let (endpoint, quic_bound) = client_listener::quic::bind(addr,
server_config)?;
let token_for_quic = bus.token();
let handshake_grace = bus.config().handshake_grace;
let quic_handle = compio::runtime::spawn(async move {
@@ -285,7 +285,7 @@ pub async fn start_on_shard_zero(
) {
(Some(addr), Some(creds), Some(on_accepted_tls)) => {
let (listener, server_config, tls_bound) =
- client_listener::tcp_tls::bind(addr, creds).await?;
+ client_listener::tcp_tls::bind(addr, creds)?;
let token_for_tls = bus.token();
let tls_handle = compio::runtime::spawn(async move {
client_listener::tcp_tls::run(
@@ -308,7 +308,7 @@ pub async fn start_on_shard_zero(
let wss_bound = match (wss_listen_addr, wss_credentials,
on_accepted_wss_client) {
(Some(addr), Some(creds), Some(on_accepted_wss)) => {
let (listener, server_config, wss_bound) =
- client_listener::wss::bind(addr, creds).await?;
+ client_listener::wss::bind(addr, creds)?;
let token_for_wss = bus.token();
let wss_handle = compio::runtime::spawn(async move {
client_listener::wss::run(listener, server_config,
token_for_wss, on_accepted_wss)
diff --git a/core/message_bus/tests/graceful_shutdown.rs
b/core/message_bus/tests/graceful_shutdown.rs
index 18038039c..6d9cf3b00 100644
--- a/core/message_bus/tests/graceful_shutdown.rs
+++ b/core/message_bus/tests/graceful_shutdown.rs
@@ -32,7 +32,7 @@ use std::time::Duration;
async fn drains_all_clients_within_timeout() {
let bus = Rc::new(IggyMessageBus::new(0));
let on_request: RequestHandler = Rc::new(|_, _| {});
- let (listener, addr) = bind(loopback()).await.unwrap();
+ let (listener, addr) = bind(loopback()).unwrap();
let token = bus.token();
let accept_delegate = install_clients_locally(bus.clone(), on_request);
@@ -87,7 +87,7 @@ async fn drains_all_clients_within_timeout() {
async fn connection_drain_precedes_slow_background() {
let bus = Rc::new(IggyMessageBus::new(0));
let on_request: RequestHandler = Rc::new(|_, _| {});
- let (listener, addr) = bind(loopback()).await.unwrap();
+ let (listener, addr) = bind(loopback()).unwrap();
let token = bus.token();
let accept_delegate = install_clients_locally(bus.clone(), on_request);
diff --git a/core/message_bus/tests/quic_client_roundtrip.rs
b/core/message_bus/tests/quic_client_roundtrip.rs
index 2a42ba040..1feee8d9f 100644
--- a/core/message_bus/tests/quic_client_roundtrip.rs
+++ b/core/message_bus/tests/quic_client_roundtrip.rs
@@ -83,7 +83,7 @@ async fn request_reply_round_trip() {
let (cert, key) = self_signed();
let server_cfg = server_config_with_cert(vec![cert.clone()], key,
&QuicTuning::default())
.expect("server config");
- let (endpoint, server_addr) = bind(loopback(),
server_cfg).await.expect("bind");
+ let (endpoint, server_addr) = bind(loopback(), server_cfg).expect("bind");
let token = bus.token();
let on_accepted = install_quic_clients_locally(bus.clone(), on_request);
@@ -160,7 +160,7 @@ async fn slow_handshake_does_not_block_subsequent_accept() {
let (cert, key) = self_signed();
let server_cfg = server_config_with_cert(vec![cert.clone()], key,
&QuicTuning::default())
.expect("server config");
- let (endpoint, server_addr) = bind(loopback(),
server_cfg).await.expect("bind");
+ let (endpoint, server_addr) = bind(loopback(), server_cfg).expect("bind");
let token = bus.token();
let on_accepted = install_quic_clients_locally(bus.clone(), on_request);
diff --git a/core/message_bus/tests/tcp_client_roundtrip.rs
b/core/message_bus/tests/tcp_client_roundtrip.rs
index 6c8cd5985..b6a22cdf0 100644
--- a/core/message_bus/tests/tcp_client_roundtrip.rs
+++ b/core/message_bus/tests/tcp_client_roundtrip.rs
@@ -49,7 +49,7 @@ async fn request_reply_round_trip() {
.detach();
});
- let (listener, addr) = bind(loopback()).await.expect("bind");
+ let (listener, addr) = bind(loopback()).expect("bind");
let token = bus.token();
let accept_delegate = install_clients_locally(bus.clone(), on_request);
let accept_handle = compio::runtime::spawn(async move {
@@ -88,7 +88,7 @@ async fn unexpected_command_is_ignored() {
let _ = tx.try_send(());
});
- let (listener, addr) = bind(loopback()).await.unwrap();
+ let (listener, addr) = bind(loopback()).unwrap();
let token = bus.token();
let accept_delegate = install_clients_locally(bus.clone(), on_request);
let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/message_bus/tests/tcp_tls_client_listener.rs
b/core/message_bus/tests/tcp_tls_client_listener.rs
index 1621e9923..b217229b2 100644
--- a/core/message_bus/tests/tcp_tls_client_listener.rs
+++ b/core/message_bus/tests/tcp_tls_client_listener.rs
@@ -68,7 +68,7 @@ async fn tcp_tls_client_listener_accepts_and_round_trips() {
let creds = self_signed_for_loopback();
let cert_chain = creds.cert_chain.clone();
let (listener, server_cfg, server_addr) =
- bind(loopback(), creds).await.expect("tls listener bind");
+ bind(loopback(), creds).expect("tls listener bind");
let token = bus.token();
let on_accepted = install_tls_clients_locally(Rc::clone(&bus), on_request);
let accept_handle = compio::runtime::spawn(async move {
@@ -149,7 +149,7 @@ async fn slow_tls_handshake_evicts_registry() {
let creds = self_signed_for_loopback();
let (listener, server_cfg, server_addr) =
- bind(loopback(), creds).await.expect("tls listener bind");
+ bind(loopback(), creds).expect("tls listener bind");
let token = bus.token();
let on_accepted = install_tls_clients_locally(Rc::clone(&bus), on_request);
let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/message_bus/tests/ws_client_roundtrip.rs
b/core/message_bus/tests/ws_client_roundtrip.rs
index e936e3394..951c109e9 100644
--- a/core/message_bus/tests/ws_client_roundtrip.rs
+++ b/core/message_bus/tests/ws_client_roundtrip.rs
@@ -67,7 +67,7 @@ async fn handshake_succeeds_and_round_trip_completes() {
.detach();
});
- let (listener, server_addr) = bind(loopback()).await.expect("bind");
+ let (listener, server_addr) = bind(loopback()).expect("bind");
let token = bus.token();
let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
let accept_handle = compio::runtime::spawn(async move {
@@ -124,7 +124,7 @@ async fn handshake_succeeds_without_subprotocol_header() {
let bus = Rc::new(IggyMessageBus::new(0));
let on_request: RequestHandler = Rc::new(|_, _| {});
- let (listener, server_addr) = bind(loopback()).await.expect("bind");
+ let (listener, server_addr) = bind(loopback()).expect("bind");
let token = bus.token();
let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/message_bus/tests/wss_client_listener.rs
b/core/message_bus/tests/wss_client_listener.rs
index cfed3e03a..01a25ef18 100644
--- a/core/message_bus/tests/wss_client_listener.rs
+++ b/core/message_bus/tests/wss_client_listener.rs
@@ -69,7 +69,7 @@ async fn wss_client_listener_accepts_and_round_trips() {
let creds = self_signed_for_loopback();
let cert_chain = creds.cert_chain.clone();
let (listener, server_cfg, server_addr) =
- bind(loopback(), creds).await.expect("wss listener bind");
+ bind(loopback(), creds).expect("wss listener bind");
let token = bus.token();
let on_accepted = install_wss_clients_locally(Rc::clone(&bus), on_request);
let accept_handle = compio::runtime::spawn(async move {
@@ -148,7 +148,7 @@ async fn slow_handshake_evicts_registry() {
let creds = self_signed_for_loopback();
let (listener, server_cfg, server_addr) =
- bind(loopback(), creds).await.expect("wss listener bind");
+ bind(loopback(), creds).expect("wss listener bind");
let token = bus.token();
let on_accepted = install_wss_clients_locally(Rc::clone(&bus), on_request);
let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 7588e049a..705f48bdb 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -131,6 +131,11 @@ impl Users {
self.inner.read(f)
}
+ /// Ensures a root user exists in an empty user set.
+ ///
+ /// # Panics
+ ///
+ /// Panics if `username` is not a valid wire-format username.
pub fn ensure_root_user(&self, username: &str, password_hash: &str) {
if self.read(|users| !users.items.is_empty()) {
return;
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 7217bc319..a4d4a590b 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -270,7 +270,7 @@ impl QuicClient {
#[cfg(feature = "vsr")]
{
- return crate::vsr::decode_response(&buffer);
+ crate::vsr::decode_response(&buffer)
}
#[cfg(not(feature = "vsr"))]
diff --git a/core/sdk/src/websocket/websocket_client.rs
b/core/sdk/src/websocket/websocket_client.rs
index 9199d648b..395c76884 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -651,7 +651,7 @@ impl WebSocketClient {
response.put_slice(&response_body);
}
- return crate::vsr::decode_response(&response.freeze());
+ crate::vsr::decode_response(&response.freeze())
}
#[cfg(not(feature = "vsr"))]
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 0ad2e6b42..bedd73393 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -1223,7 +1223,7 @@ async fn start_manual_runtime(
None
};
- let bound_clients = start_client_listeners(shard, config, topology,
&accepted_clients).await?;
+ let bound_clients = start_client_listeners(shard, config, topology,
&accepted_clients)?;
write_current_config(
config,
Some(topology.self_replica_id),
@@ -1505,7 +1505,13 @@ async fn complete_login_register(
request_header: &RequestHeader,
user_id: u32,
) -> Result<(), LoginRegisterError> {
- if let Some((_, session)) =
sessions.borrow().get_session(transport_client_id) {
+ let existing_session = {
+ let sessions = sessions.borrow();
+ sessions
+ .get_session(transport_client_id)
+ .map(|(_, session)| session)
+ };
+ if let Some(session) = existing_session {
let response = LoginRegisterResponse { user_id, session }.to_bytes();
let reply = build_login_register_reply(request_header, vsr_client_id,
session, &response);
let _ = shard
@@ -1581,13 +1587,14 @@ fn build_login_register_reply(
) -> Message<ReplyHeader> {
let total_size = std::mem::size_of::<ReplyHeader>() + body.len();
let mut reply = Message::<ReplyHeader>::new(total_size);
+ let header_size = u32::try_from(total_size).expect("reply size must fit
into u32");
let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(
&mut reply.as_mut_slice()[..std::mem::size_of::<ReplyHeader>()],
)
.expect("zeroed bytes are valid");
*header = ReplyHeader {
cluster: request_header.cluster,
- size: total_size as u32,
+ size: header_size,
view: request_header.view,
release: request_header.release,
command: Command2::Reply,
@@ -1747,7 +1754,7 @@ fn mint_client_meta(
ClientConnMeta::new((shard_id << 112) | seq, peer_addr, transport)
}
-async fn start_client_listeners(
+fn start_client_listeners(
shard: &Rc<ServerNgShard>,
config: &ServerNgConfig,
topology: &TcpTopology,
@@ -1757,7 +1764,6 @@ async fn start_client_listeners(
if config.tcp.enabled && !config.tcp.tls.enabled {
let (listener, bound_addr) =
client_listener::tcp::bind(topology.client_listen_addr)
- .await
.map_err(|source| {
error!(
addr = %topology.client_listen_addr,
@@ -1776,8 +1782,7 @@ async fn start_client_listeners(
}
if let Some(ws_addr) = topology.ws_listen_addr {
- let (listener, bound_addr) =
- client_listener::ws::bind(ws_addr).await.map_err(|source| {
+ let (listener, bound_addr) =
client_listener::ws::bind(ws_addr).map_err(|source| {
error!(addr = %ws_addr, error = %source, "failed to bind
websocket listener");
source
})?;
@@ -1805,7 +1810,6 @@ async fn start_client_listeners(
source
})?;
let (endpoint, bound_addr) = client_listener::quic::bind(quic_addr,
server_config)
- .await
.map_err(|source| {
error!(addr = %quic_addr, error = %source, "failed to bind
QUIC listener");
source
@@ -1824,7 +1828,6 @@ async fn start_client_listeners(
let credentials = load_tcp_tls_server_credentials(config)?;
let (listener, tls_config, bound_addr) =
client_listener::tcp_tls::bind(topology.client_listen_addr,
credentials)
- .await
.map_err(|source| {
error!(
addr = %topology.client_listen_addr,