This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch bench-improvement
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit cfd6e529052f277a228203fbca7a6057b06eda45
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Mar 20 16:11:28 2026 +0100

    fix(bench): delete stale streams on re-run to prevent corrupted latency
    
    Consumers in ppc/bpcg/e2e/e2ecg read old messages from
    prior runs, producing garbage origin_timestamp latency.
    init_streams() now deletes existing streams before
    recreating them. Added --reuse-streams flag to opt into
    the old append behavior (used by integration tests).
---
 core/bench/src/args/common.rs                      | 10 ++++
 core/bench/src/args/examples.rs                    |  1 +
 core/bench/src/benchmarks/benchmark.rs             | 64 +++++++++++++---------
 core/integration/src/bench_utils.rs                | 14 +++--
 .../data_integrity/verify_after_server_restart.rs  |  2 +-
 5 files changed, 60 insertions(+), 31 deletions(-)

diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 8c1c535a5..7042f541f 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -91,6 +91,12 @@ pub struct IggyBenchArgs {
     /// Password for server authentication
     #[arg(long, short = 'p', default_value_t = 
DEFAULT_ROOT_PASSWORD.to_string())]
     pub password: String,
+
+    /// Reuse existing bench streams instead of deleting and recreating them.
+    /// Without this flag, existing bench streams are deleted to ensure
+    /// consumers start with fresh data and accurate latency measurements.
+    #[arg(long, default_value_t = false)]
+    pub reuse_streams: bool,
 }
 
 impl IggyBenchArgs {
@@ -314,6 +320,10 @@ impl IggyBenchArgs {
         self.high_level_api
     }
 
+    pub const fn reuse_streams(&self) -> bool {
+        self.reuse_streams
+    }
+
     pub fn username(&self) -> &str {
         &self.username
     }
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index ddf888dbc..80c77b47b 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -63,6 +63,7 @@ const EXAMPLES: &str = r#"EXAMPLES:
     --moving-average-window (-W): Window size for moving average [default: 20]
     --username (-u): Username for server authentication [default: iggy]
     --password (-p): Password for server authentication [default: iggy]
+    --reuse-streams: Reuse existing bench streams instead of deleting them
 
     Benchmark-specific options (after the benchmark command):
     --streams (-s): Number of streams
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index 7a00c6528..fca38b9c7 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -26,7 +26,7 @@ use iggy::clients::client::IggyClient;
 use iggy::prelude::*;
 use std::sync::Arc;
 use tokio::task::JoinSet;
-use tracing::info;
+use tracing::{info, warn};
 
 use super::balanced_consumer_group::BalancedConsumerGroupBenchmark;
 use super::balanced_producer::BalancedProducerBenchmark;
@@ -105,37 +105,49 @@ pub trait Benchmarkable: Send {
             self.client_factory().password(),
         )
         .await;
+        let reuse_streams = self.args().reuse_streams();
         let streams = client.get_streams().await?;
         for i in 1..=number_of_streams {
             let stream_name = format!("bench-stream-{i}");
             let stream_id: Identifier = stream_name.as_str().try_into()?;
-            if streams.iter().all(|s| s.name != stream_name) {
-                info!("Creating the test stream '{}'", stream_name);
-                client.create_stream(&stream_name).await?;
-                let topic_name = "topic-1".to_string();
-                let max_topic_size = self
-                    .args()
-                    .max_topic_size()
-                    .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
-                let message_expiry = self.args().message_expiry();
-
-                info!(
-                    "Creating the test topic '{}' for stream '{}' with max 
topic size: {:?}, message expiry: {}",
-                    topic_name, stream_name, max_topic_size, message_expiry
+            if streams.iter().any(|s| s.name == stream_name) {
+                if reuse_streams {
+                    info!("Appending to existing stream '{}'", stream_name);
+                    continue;
+                }
+                warn!(
+                    "Deleting pre-existing stream '{}' - {:?} benchmark 
requires fresh streams to avoid stale data in consumers",
+                    stream_name,
+                    self.kind()
                 );
-
-                client
-                    .create_topic(
-                        &stream_id,
-                        &topic_name,
-                        partitions_count,
-                        CompressionAlgorithm::default(),
-                        None,
-                        message_expiry,
-                        max_topic_size,
-                    )
-                    .await?;
+                client.delete_stream(&stream_id).await?;
             }
+
+            info!("Creating the test stream '{}'", stream_name);
+            client.create_stream(&stream_name).await?;
+            let topic_name = "topic-1".to_string();
+            let max_topic_size = self
+                .args()
+                .max_topic_size()
+                .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
+            let message_expiry = self.args().message_expiry();
+
+            info!(
+                "Creating the test topic '{}' for stream '{}' with max topic 
size: {:?}, message expiry: {}",
+                topic_name, stream_name, max_topic_size, message_expiry
+            );
+
+            client
+                .create_topic(
+                    &stream_id,
+                    &topic_name,
+                    partitions_count,
+                    CompressionAlgorithm::default(),
+                    None,
+                    message_expiry,
+                    max_topic_size,
+                )
+                .await?;
         }
         Ok(())
     }
diff --git a/core/integration/src/bench_utils.rs 
b/core/integration/src/bench_utils.rs
index d1d670c26..026242045 100644
--- a/core/integration/src/bench_utils.rs
+++ b/core/integration/src/bench_utils.rs
@@ -58,15 +58,21 @@ pub fn run_bench_and_wait_for_finish(
     let messages_total: u64 = MESSAGES_PER_BATCH * MESSAGE_BATCHES;
     let message_size = total_bytes_to_process_per_stream / messages_total;
 
+    let messages_per_batch_str = MESSAGES_PER_BATCH.to_string();
+    let message_batches_str = MESSAGE_BATCHES.to_string();
+    let message_size_str = message_size.to_string();
+    let transport_str = transport.to_string();
+
     command.args([
         "--messages-per-batch",
-        &MESSAGES_PER_BATCH.to_string(),
+        messages_per_batch_str.as_str(),
         "--message-batches",
-        &MESSAGE_BATCHES.to_string(),
+        message_batches_str.as_str(),
         "--message-size",
-        &message_size.to_string(),
+        message_size_str.as_str(),
+        "--reuse-streams",
         bench,
-        &transport.to_string(),
+        transport_str.as_str(),
         "--server-address",
         server_addr,
     ]);
diff --git 
a/core/integration/tests/data_integrity/verify_after_server_restart.rs 
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 865153a19..4c62ba5b6 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -174,7 +174,7 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         );
     }
 
-    // Run send bench again to add more data
+    // Run send bench again to add more data (--reuse-streams is always passed)
     run_bench_and_wait_for_finish(
         &server_addr,
         &TransportProtocol::Tcp,

Reply via email to