This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 50557dcef fix(bench): delete stale streams on re-run to prevent
corrupted latency (#2991)
50557dcef is described below
commit 50557dcefc7cae301c1e8616d7836c387b0aa287
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Mar 21 11:11:00 2026 +0100
fix(bench): delete stale streams on re-run to prevent corrupted latency
(#2991)
---
core/bench/src/args/common.rs | 10 ++++
core/bench/src/args/examples.rs | 1 +
core/bench/src/benchmarks/benchmark.rs | 64 +++++++++++++---------
core/integration/src/bench_utils.rs | 14 +++--
.../data_integrity/verify_after_server_restart.rs | 2 +-
5 files changed, 60 insertions(+), 31 deletions(-)
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 8c1c535a5..7042f541f 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -91,6 +91,12 @@ pub struct IggyBenchArgs {
/// Password for server authentication
#[arg(long, short = 'p', default_value_t =
DEFAULT_ROOT_PASSWORD.to_string())]
pub password: String,
+
+ /// Reuse existing bench streams instead of deleting and recreating them.
+ /// Without this flag, existing bench streams are deleted to ensure
+ /// consumers start with fresh data and accurate latency measurements.
+ #[arg(long, default_value_t = false)]
+ pub reuse_streams: bool,
}
impl IggyBenchArgs {
@@ -314,6 +320,10 @@ impl IggyBenchArgs {
self.high_level_api
}
+ pub const fn reuse_streams(&self) -> bool {
+ self.reuse_streams
+ }
+
pub fn username(&self) -> &str {
&self.username
}
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index ddf888dbc..80c77b47b 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -63,6 +63,7 @@ const EXAMPLES: &str = r#"EXAMPLES:
--moving-average-window (-W): Window size for moving average [default: 20]
--username (-u): Username for server authentication [default: iggy]
--password (-p): Password for server authentication [default: iggy]
+ --reuse-streams: Reuse existing bench streams instead of deleting them
Benchmark-specific options (after the benchmark command):
--streams (-s): Number of streams
diff --git a/core/bench/src/benchmarks/benchmark.rs
b/core/bench/src/benchmarks/benchmark.rs
index 7a00c6528..fca38b9c7 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -26,7 +26,7 @@ use iggy::clients::client::IggyClient;
use iggy::prelude::*;
use std::sync::Arc;
use tokio::task::JoinSet;
-use tracing::info;
+use tracing::{info, warn};
use super::balanced_consumer_group::BalancedConsumerGroupBenchmark;
use super::balanced_producer::BalancedProducerBenchmark;
@@ -105,37 +105,49 @@ pub trait Benchmarkable: Send {
self.client_factory().password(),
)
.await;
+ let reuse_streams = self.args().reuse_streams();
let streams = client.get_streams().await?;
for i in 1..=number_of_streams {
let stream_name = format!("bench-stream-{i}");
let stream_id: Identifier = stream_name.as_str().try_into()?;
- if streams.iter().all(|s| s.name != stream_name) {
- info!("Creating the test stream '{}'", stream_name);
- client.create_stream(&stream_name).await?;
- let topic_name = "topic-1".to_string();
- let max_topic_size = self
- .args()
- .max_topic_size()
- .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
- let message_expiry = self.args().message_expiry();
-
- info!(
- "Creating the test topic '{}' for stream '{}' with max
topic size: {:?}, message expiry: {}",
- topic_name, stream_name, max_topic_size, message_expiry
+ if streams.iter().any(|s| s.name == stream_name) {
+ if reuse_streams {
+ info!("Appending to existing stream '{}'", stream_name);
+ continue;
+ }
+ warn!(
+ "Deleting pre-existing stream '{}' - {:?} benchmark
requires fresh streams to avoid stale data in consumers",
+ stream_name,
+ self.kind()
);
-
- client
- .create_topic(
- &stream_id,
- &topic_name,
- partitions_count,
- CompressionAlgorithm::default(),
- None,
- message_expiry,
- max_topic_size,
- )
- .await?;
+ client.delete_stream(&stream_id).await?;
}
+
+ info!("Creating the test stream '{}'", stream_name);
+ client.create_stream(&stream_name).await?;
+ let topic_name = "topic-1".to_string();
+ let max_topic_size = self
+ .args()
+ .max_topic_size()
+ .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
+ let message_expiry = self.args().message_expiry();
+
+ info!(
+ "Creating the test topic '{}' for stream '{}' with max topic
size: {:?}, message expiry: {}",
+ topic_name, stream_name, max_topic_size, message_expiry
+ );
+
+ client
+ .create_topic(
+ &stream_id,
+ &topic_name,
+ partitions_count,
+ CompressionAlgorithm::default(),
+ None,
+ message_expiry,
+ max_topic_size,
+ )
+ .await?;
}
Ok(())
}
diff --git a/core/integration/src/bench_utils.rs
b/core/integration/src/bench_utils.rs
index d1d670c26..026242045 100644
--- a/core/integration/src/bench_utils.rs
+++ b/core/integration/src/bench_utils.rs
@@ -58,15 +58,21 @@ pub fn run_bench_and_wait_for_finish(
let messages_total: u64 = MESSAGES_PER_BATCH * MESSAGE_BATCHES;
let message_size = total_bytes_to_process_per_stream / messages_total;
+ let messages_per_batch_str = MESSAGES_PER_BATCH.to_string();
+ let message_batches_str = MESSAGE_BATCHES.to_string();
+ let message_size_str = message_size.to_string();
+ let transport_str = transport.to_string();
+
command.args([
"--messages-per-batch",
- &MESSAGES_PER_BATCH.to_string(),
+ messages_per_batch_str.as_str(),
"--message-batches",
- &MESSAGE_BATCHES.to_string(),
+ message_batches_str.as_str(),
"--message-size",
- &message_size.to_string(),
+ message_size_str.as_str(),
+ "--reuse-streams",
bench,
- &transport.to_string(),
+ transport_str.as_str(),
"--server-address",
server_addr,
]);
diff --git
a/core/integration/tests/data_integrity/verify_after_server_restart.rs
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 865153a19..4c62ba5b6 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -174,7 +174,7 @@ async fn
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
);
}
- // Run send bench again to add more data
+ // Run send bench again to add more data (--reuse-streams is always passed)
run_bench_and_wait_for_finish(
&server_addr,
&TransportProtocol::Tcp,