This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch bench-improvement in repository https://gitbox.apache.org/repos/asf/iggy.git
commit cfd6e529052f277a228203fbca7a6057b06eda45 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 20 16:11:28 2026 +0100 fix(bench): delete stale streams on re-run to prevent corrupted latency Consumers in ppc/bpcg/e2e/e2ecg read old messages from prior runs, producing garbage origin_timestamp latency. init_streams() now deletes existing streams before recreating them. Added --reuse-streams flag to opt into the old append behavior (used by integration tests). --- core/bench/src/args/common.rs | 10 ++++ core/bench/src/args/examples.rs | 1 + core/bench/src/benchmarks/benchmark.rs | 64 +++++++++++++--------- core/integration/src/bench_utils.rs | 14 +++-- .../data_integrity/verify_after_server_restart.rs | 2 +- 5 files changed, 60 insertions(+), 31 deletions(-) diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs index 8c1c535a5..7042f541f 100644 --- a/core/bench/src/args/common.rs +++ b/core/bench/src/args/common.rs @@ -91,6 +91,12 @@ pub struct IggyBenchArgs { /// Password for server authentication #[arg(long, short = 'p', default_value_t = DEFAULT_ROOT_PASSWORD.to_string())] pub password: String, + + /// Reuse existing bench streams instead of deleting and recreating them. + /// Without this flag, existing bench streams are deleted to ensure + /// consumers start with fresh data and accurate latency measurements. + #[arg(long, default_value_t = false)] + pub reuse_streams: bool, } impl IggyBenchArgs { @@ -314,6 +320,10 @@ impl IggyBenchArgs { self.high_level_api } + pub const fn reuse_streams(&self) -> bool { + self.reuse_streams + } + pub fn username(&self) -> &str { &self.username } diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs index ddf888dbc..80c77b47b 100644 --- a/core/bench/src/args/examples.rs +++ b/core/bench/src/args/examples.rs @@ -63,6 +63,7 @@ const EXAMPLES: &str = r#"EXAMPLES: --moving-average-window (-W): Window size for moving average [default: 20] --username (-u): Username for server authentication [default: iggy] --password (-p): Password for server authentication [default: iggy] + --reuse-streams: Reuse existing bench streams instead of deleting them Benchmark-specific options (after the benchmark command): --streams (-s): Number of streams diff --git a/core/bench/src/benchmarks/benchmark.rs b/core/bench/src/benchmarks/benchmark.rs index 7a00c6528..fca38b9c7 100644 --- a/core/bench/src/benchmarks/benchmark.rs +++ b/core/bench/src/benchmarks/benchmark.rs @@ -26,7 +26,7 @@ use iggy::clients::client::IggyClient; use iggy::prelude::*; use std::sync::Arc; use tokio::task::JoinSet; -use tracing::info; +use tracing::{info, warn}; use super::balanced_consumer_group::BalancedConsumerGroupBenchmark; use super::balanced_producer::BalancedProducerBenchmark; @@ -105,37 +105,49 @@ pub trait Benchmarkable: Send { self.client_factory().password(), ) .await; + let reuse_streams = self.args().reuse_streams(); let streams = client.get_streams().await?; for i in 1..=number_of_streams { let stream_name = format!("bench-stream-{i}"); let stream_id: Identifier = stream_name.as_str().try_into()?; - if streams.iter().all(|s| s.name != stream_name) { - info!("Creating the test stream '{}'", stream_name); - client.create_stream(&stream_name).await?; - let topic_name = "topic-1".to_string(); - let max_topic_size = self - .args() - .max_topic_size() - .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom); - let message_expiry = self.args().message_expiry(); - - info!( - "Creating the test topic '{}' for stream '{}' with max topic size: {:?}, message expiry: {}", - topic_name, stream_name, max_topic_size, message_expiry + if streams.iter().any(|s| s.name == stream_name) { + if reuse_streams { + info!("Appending to existing stream '{}'", stream_name); + continue; + } + warn!( + "Deleting pre-existing stream '{}' - {:?} benchmark requires fresh streams to avoid stale data in consumers", + stream_name, + self.kind() ); - - client - .create_topic( - &stream_id, - &topic_name, - partitions_count, - CompressionAlgorithm::default(), - None, - message_expiry, - max_topic_size, - ) - .await?; + client.delete_stream(&stream_id).await?; } + + info!("Creating the test stream '{}'", stream_name); + client.create_stream(&stream_name).await?; + let topic_name = "topic-1".to_string(); + let max_topic_size = self + .args() + .max_topic_size() + .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom); + let message_expiry = self.args().message_expiry(); + + info!( + "Creating the test topic '{}' for stream '{}' with max topic size: {:?}, message expiry: {}", + topic_name, stream_name, max_topic_size, message_expiry + ); + + client + .create_topic( + &stream_id, + &topic_name, + partitions_count, + CompressionAlgorithm::default(), + None, + message_expiry, + max_topic_size, + ) + .await?; } Ok(()) } diff --git a/core/integration/src/bench_utils.rs b/core/integration/src/bench_utils.rs index d1d670c26..026242045 100644 --- a/core/integration/src/bench_utils.rs +++ b/core/integration/src/bench_utils.rs @@ -58,15 +58,21 @@ pub fn run_bench_and_wait_for_finish( let messages_total: u64 = MESSAGES_PER_BATCH * MESSAGE_BATCHES; let message_size = total_bytes_to_process_per_stream / messages_total; + let messages_per_batch_str = MESSAGES_PER_BATCH.to_string(); + let message_batches_str = MESSAGE_BATCHES.to_string(); + let message_size_str = message_size.to_string(); + let transport_str = transport.to_string(); + command.args([ "--messages-per-batch", - &MESSAGES_PER_BATCH.to_string(), + messages_per_batch_str.as_str(), "--message-batches", - &MESSAGE_BATCHES.to_string(), + message_batches_str.as_str(), "--message-size", - &message_size.to_string(), + message_size_str.as_str(), + "--reuse-streams", bench, - &transport.to_string(), + transport_str.as_str(), "--server-address", server_addr, ]); diff --git a/core/integration/tests/data_integrity/verify_after_server_restart.rs b/core/integration/tests/data_integrity/verify_after_server_restart.rs index 865153a19..4c62ba5b6 100644 --- a/core/integration/tests/data_integrity/verify_after_server_restart.rs +++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs @@ -174,7 +174,7 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) ); } - // Run send bench again to add more data + // Run send bench again to add more data (--reuse-streams is always passed) run_bench_and_wait_for_finish( &server_addr, &TransportProtocol::Tcp,
