This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch bench-stress
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 89c27d7099dd64c995aa379806acaeba9da1a42b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Feb 13 20:05:41 2026 +0100

    refactor
---
 Cargo.lock                                         |   2 +
 core/bench/Cargo.toml                              |   2 +
 core/bench/src/actors/stress/admin_exerciser.rs    |  16 +-
 .../src/actors/stress/control_plane_churner.rs     |  99 +++---
 core/bench/src/actors/stress/verifier.rs           | 157 +++++++---
 core/bench/src/args/kinds/stress/args.rs           |  76 ++++-
 .../src/benchmarks/balanced_consumer_group.rs      |   6 +-
 core/bench/src/benchmarks/balanced_producer.rs     |   4 +-
 .../balanced_producer_and_consumer_group.rs        |   8 +-
 core/bench/src/benchmarks/common.rs                |  35 ++-
 .../end_to_end_producing_consumer_group.rs         |   4 +-
 core/bench/src/benchmarks/mod.rs                   |   8 +
 core/bench/src/benchmarks/pinned_consumer.rs       |   4 +-
 core/bench/src/benchmarks/pinned_producer.rs       |   4 +-
 .../src/benchmarks/pinned_producer_and_consumer.rs |   6 +-
 core/bench/src/benchmarks/stress.rs                | 345 +++++++++++++++------
 core/bench/src/benchmarks/stress_report.rs         |  76 +++--
 core/bench/src/utils/finish_condition.rs           |  35 ++-
 18 files changed, 630 insertions(+), 257 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 06476ec28..27515acbd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4518,10 +4518,12 @@ dependencies = [
  "serde",
  "sysinfo 0.38.1",
  "tokio",
+ "toml 1.0.0+spec-1.1.0",
  "tracing",
  "tracing-appender",
  "tracing-subscriber",
  "uuid",
+ "zip",
 ]
 
 [[package]]
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 9f020e73b..f29c64d96 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -50,10 +50,12 @@ rayon = { workspace = true }
 serde = { workspace = true }
 sysinfo = { workspace = true }
 tokio = { workspace = true }
+toml = { workspace = true }
 tracing = { workspace = true }
 tracing-appender = { workspace = true }
 tracing-subscriber = { workspace = true }
 uuid = { workspace = true }
+zip = { workspace = true }
 
 [lints.clippy]
 enum_glob_use = "deny"
diff --git a/core/bench/src/actors/stress/admin_exerciser.rs 
b/core/bench/src/actors/stress/admin_exerciser.rs
index 46fb25bba..902cd64c4 100644
--- a/core/bench/src/actors/stress/admin_exerciser.rs
+++ b/core/bench/src/actors/stress/admin_exerciser.rs
@@ -18,7 +18,7 @@
 
 use super::error_classifier;
 use super::stress_context::StressContext;
-use crate::benchmarks::{BENCH_STREAM_PREFIX, BENCH_TOPIC_NAME};
+use crate::benchmarks::BENCH_TOPIC_NAME;
 use crate::utils::{ClientFactory, login_root};
 use iggy::clients::client::IggyClient;
 use iggy::prelude::*;
@@ -35,13 +35,19 @@ const ADMIN_CYCLE_INTERVAL_SECS: u64 = 15;
 pub struct AdminExerciser {
     client_factory: Arc<dyn ClientFactory>,
     ctx: Arc<StressContext>,
+    stable_stream_name: String,
 }
 
 impl AdminExerciser {
-    pub fn new(client_factory: Arc<dyn ClientFactory>, ctx: 
Arc<StressContext>) -> Self {
+    pub fn new(
+        client_factory: Arc<dyn ClientFactory>,
+        ctx: Arc<StressContext>,
+        stable_stream_name: String,
+    ) -> Self {
         Self {
             client_factory,
             ctx,
+            stable_stream_name,
         }
     }
 
@@ -142,7 +148,8 @@ impl AdminExerciser {
     }
 
     async fn offset_lifecycle(&self, client: &IggyClient, cycle: u64) {
-        let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1")
+        let stream_id: Identifier = self
+            .stable_stream_name
             .as_str()
             .try_into()
             .expect("valid identifier");
@@ -190,7 +197,8 @@ impl AdminExerciser {
     }
 
     async fn flush_buffers(&self, client: &IggyClient) {
-        let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1")
+        let stream_id: Identifier = self
+            .stable_stream_name
             .as_str()
             .try_into()
             .expect("valid identifier");
diff --git a/core/bench/src/actors/stress/control_plane_churner.rs 
b/core/bench/src/actors/stress/control_plane_churner.rs
index e9e1e930c..1eede90af 100644
--- a/core/bench/src/actors/stress/control_plane_churner.rs
+++ b/core/bench/src/actors/stress/control_plane_churner.rs
@@ -20,13 +20,14 @@ use super::error_classifier;
 use super::stress_context::StressContext;
 use crate::args::kinds::stress::args::ApiMix;
 use crate::benchmarks::{
-    BENCH_STREAM_PREFIX, BENCH_TOPIC_NAME, CONSUMER_GROUP_BASE_ID, 
CONSUMER_GROUP_NAME_PREFIX,
+    BENCH_TOPIC_NAME, CHAOS_STREAM_PREFIX, CONSUMER_GROUP_BASE_ID, 
CONSUMER_GROUP_NAME_PREFIX,
+    STABLE_STREAM_PREFIX,
 };
 use crate::utils::{ClientFactory, login_root};
 use iggy::clients::client::IggyClient;
 use iggy::prelude::*;
 use rand::rngs::StdRng;
-use rand::{Rng, SeedableRng};
+use rand::{RngExt, SeedableRng};
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use tracing::{debug, warn};
@@ -72,14 +73,11 @@ const ALL_OPS: [ChurnOp; 9] = [
 /// so that `DeleteAndRecreateTopic` can recreate with the same parameters.
 pub struct ChurnerConfig {
     pub api_mix: ApiMix,
-    pub streams: u32,
+    pub stable_streams: u32,
+    pub chaos_streams: u32,
     pub partitions: u32,
-    pub consumer_groups: u32,
     pub message_expiry: IggyExpiry,
     pub max_topic_size: MaxTopicSize,
-    /// Deadline after which data-destructive ops (`PurgeTopic`, `PurgeStream`,
-    /// `DeleteAndRecreateTopic`) are suppressed so messages survive for 
verification.
-    pub purge_cutoff: std::time::Instant,
 }
 
 /// Periodically executes CRUD lifecycle operations against the server.
@@ -95,12 +93,11 @@ pub struct ControlPlaneChurner {
     churn_interval: std::time::Duration,
     rng: StdRng,
     api_mix: ApiMix,
-    streams: u32,
+    stable_streams: u32,
+    chaos_streams: u32,
     partitions: u32,
-    consumer_groups: u32,
     message_expiry: IggyExpiry,
     max_topic_size: MaxTopicSize,
-    purge_cutoff: std::time::Instant,
 }
 
 impl ControlPlaneChurner {
@@ -120,12 +117,11 @@ impl ControlPlaneChurner {
             churn_interval: churn_interval.get_duration(),
             rng,
             api_mix: config.api_mix,
-            streams: config.streams,
+            stable_streams: config.stable_streams,
+            chaos_streams: config.chaos_streams,
             partitions: config.partitions,
-            consumer_groups: config.consumer_groups,
             message_expiry: config.message_expiry,
             max_topic_size: config.max_topic_size,
-            purge_cutoff: config.purge_cutoff,
         }
     }
 
@@ -151,8 +147,6 @@ impl ControlPlaneChurner {
                 self.churner_id, op
             );
 
-            let past_purge_cutoff = std::time::Instant::now() > 
self.purge_cutoff;
-
             match op {
                 ChurnOp::CreateDeleteTopic => {
                     self.create_delete_topic(&client, cycle).await;
@@ -163,12 +157,6 @@ impl ControlPlaneChurner {
                 ChurnOp::ConsumerGroupJoinLeave => {
                     self.consumer_group_join_leave(&client).await;
                 }
-                ChurnOp::PurgeTopic if past_purge_cutoff => {
-                    debug!(
-                        "Churner #{}: skipping PurgeTopic (past purge cutoff)",
-                        self.churner_id
-                    );
-                }
                 ChurnOp::PurgeTopic => {
                     self.purge_random_topic(&client).await;
                 }
@@ -181,21 +169,9 @@ impl ControlPlaneChurner {
                 ChurnOp::StressPoll => {
                     self.stress_poll(&client).await;
                 }
-                ChurnOp::DeleteAndRecreateTopic if past_purge_cutoff => {
-                    debug!(
-                        "Churner #{}: skipping DeleteAndRecreateTopic (past 
purge cutoff)",
-                        self.churner_id
-                    );
-                }
                 ChurnOp::DeleteAndRecreateTopic => {
                     self.delete_and_recreate_topic(&client).await;
                 }
-                ChurnOp::PurgeStream if past_purge_cutoff => {
-                    debug!(
-                        "Churner #{}: skipping PurgeStream (past purge 
cutoff)",
-                        self.churner_id
-                    );
-                }
                 ChurnOp::PurgeStream => {
                     self.purge_stream(&client).await;
                 }
@@ -206,25 +182,39 @@ impl ControlPlaneChurner {
         }
     }
 
-    fn random_stream_id(&mut self) -> Identifier {
-        let stream_idx = self.rng.random_range(1..=self.streams);
-        format!("{BENCH_STREAM_PREFIX}-{stream_idx}")
+    fn random_chaos_stream_id(&mut self) -> Identifier {
+        let idx = self.rng.random_range(1..=self.chaos_streams);
+        format!("{CHAOS_STREAM_PREFIX}-{idx}")
             .as_str()
             .try_into()
             .expect("valid identifier")
     }
 
+    fn random_any_stream_id(&mut self) -> Identifier {
+        let total = self.stable_streams + self.chaos_streams;
+        let idx = self.rng.random_range(1..=total);
+        if idx <= self.stable_streams {
+            format!("{STABLE_STREAM_PREFIX}-{idx}")
+                .as_str()
+                .try_into()
+                .expect("valid identifier")
+        } else {
+            let chaos_idx = idx - self.stable_streams;
+            format!("{CHAOS_STREAM_PREFIX}-{chaos_idx}")
+                .as_str()
+                .try_into()
+                .expect("valid identifier")
+        }
+    }
+
     fn topic_id() -> Identifier {
         BENCH_TOPIC_NAME.try_into().expect("valid identifier")
     }
 
     // --- Original ops ---
 
-    async fn create_delete_topic(&self, client: &IggyClient, cycle: u64) {
-        let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1")
-            .as_str()
-            .try_into()
-            .expect("valid identifier");
+    async fn create_delete_topic(&mut self, client: &IggyClient, cycle: u64) {
+        let stream_id = self.random_chaos_stream_id();
         let topic_name = format!("churn-{}-{cycle}", self.churner_id);
 
         match client
@@ -234,8 +224,8 @@ impl ControlPlaneChurner {
                 1,
                 CompressionAlgorithm::default(),
                 None,
-                IggyExpiry::NeverExpire,
-                MaxTopicSize::ServerDefault,
+                self.message_expiry,
+                self.max_topic_size,
             )
             .await
         {
@@ -285,11 +275,8 @@ impl ControlPlaneChurner {
         }
     }
 
-    async fn add_remove_partitions(&self, client: &IggyClient) {
-        let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1")
-            .as_str()
-            .try_into()
-            .expect("valid identifier");
+    async fn add_remove_partitions(&mut self, client: &IggyClient) {
+        let stream_id = self.random_chaos_stream_id();
         let topic_id = Self::topic_id();
 
         match client.create_partitions(&stream_id, &topic_id, 1).await {
@@ -338,8 +325,8 @@ impl ControlPlaneChurner {
     /// Joins and leaves a bench consumer group, forcing a rebalance
     /// mid-poll that races against active data-plane consumers.
     async fn consumer_group_join_leave(&mut self, client: &IggyClient) {
-        let stream_idx = self.rng.random_range(1..=self.consumer_groups);
-        let stream_id: Identifier = 
format!("{BENCH_STREAM_PREFIX}-{stream_idx}")
+        let stream_idx = self.rng.random_range(1..=self.stable_streams);
+        let stream_id: Identifier = 
format!("{STABLE_STREAM_PREFIX}-{stream_idx}")
             .as_str()
             .try_into()
             .expect("valid identifier");
@@ -391,7 +378,7 @@ impl ControlPlaneChurner {
     }
 
     async fn purge_random_topic(&mut self, client: &IggyClient) {
-        let stream_id = self.random_stream_id();
+        let stream_id = self.random_chaos_stream_id();
         let topic_id = Self::topic_id();
 
         match client.purge_topic(&stream_id, &topic_id).await {
@@ -415,7 +402,7 @@ impl ControlPlaneChurner {
     // --- New safe ops ---
 
     async fn delete_segments(&mut self, client: &IggyClient) {
-        let stream_id = self.random_stream_id();
+        let stream_id = self.random_chaos_stream_id();
         let topic_id = Self::topic_id();
         let partition_id = self.rng.random_range(0..self.partitions);
 
@@ -441,7 +428,7 @@ impl ControlPlaneChurner {
     }
 
     async fn update_topic(&mut self, client: &IggyClient) {
-        let stream_id = self.random_stream_id();
+        let stream_id = self.random_chaos_stream_id();
         let topic_id = Self::topic_id();
 
         let compression = if self.rng.random_bool(0.5) {
@@ -482,7 +469,7 @@ impl ControlPlaneChurner {
     /// One-off polls with First/Last/Timestamp strategies — these are "victim"
     /// operations that race against concurrent segment mutations.
     async fn stress_poll(&mut self, client: &IggyClient) {
-        let stream_id = self.random_stream_id();
+        let stream_id = self.random_any_stream_id();
         let topic_id = Self::topic_id();
         let partition_id = self.rng.random_range(0..self.partitions);
         let consumer = Consumer::new(Identifier::numeric(8888).expect("valid 
consumer id"));
@@ -528,7 +515,7 @@ impl ControlPlaneChurner {
     /// Exercises 23+ `expect()` panic sites reachable when consumers
     /// hold stale topic references during deletion.
     async fn delete_and_recreate_topic(&mut self, client: &IggyClient) {
-        let stream_id = self.random_stream_id();
+        let stream_id = self.random_chaos_stream_id();
         let topic_id = Self::topic_id();
 
         match client.delete_topic(&stream_id, &topic_id).await {
@@ -583,7 +570,7 @@ impl ControlPlaneChurner {
     }
 
     async fn purge_stream(&mut self, client: &IggyClient) {
-        let stream_id = self.random_stream_id();
+        let stream_id = self.random_chaos_stream_id();
 
         match client.purge_stream(&stream_id).await {
             Ok(()) => {
diff --git a/core/bench/src/actors/stress/verifier.rs 
b/core/bench/src/actors/stress/verifier.rs
index 825a63c1c..bed914690 100644
--- a/core/bench/src/actors/stress/verifier.rs
+++ b/core/bench/src/actors/stress/verifier.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::benchmarks::{BENCH_STREAM_PREFIX, BENCH_TOPIC_NAME};
+use crate::benchmarks::{BENCH_TOPIC_NAME, CHAOS_STREAM_PREFIX, 
STABLE_STREAM_PREFIX};
 use crate::utils::{ClientFactory, login_root};
 use iggy::clients::client::IggyClient;
 use iggy::prelude::*;
@@ -25,7 +25,7 @@ use std::collections::BTreeSet;
 use std::sync::Arc;
 use tracing::{info, warn};
 
-/// Post-test verification results.
+/// Post-test verification results for a single namespace.
 #[derive(Debug, Default)]
 pub struct VerificationResult {
     pub partitions_checked: u32,
@@ -38,35 +38,127 @@ pub struct VerificationResult {
     pub passed: bool,
 }
 
-/// Runs drain-phase verification: polls all partitions and checks offset 
continuity.
-///
-/// During the stress test, messages may expire, so we verify that within each
-/// partition the offsets we can still poll are monotonically increasing with 
no
-/// gaps in the remaining range.
+/// Combined verification across stable and chaos namespaces.
+#[derive(Debug)]
+pub struct StressVerificationResult {
+    pub stable: VerificationResult,
+    pub chaos: VerificationResult,
+    pub passed: bool,
+}
+
+/// Runs drain-phase verification with namespace-aware checking:
+/// - Stable streams: strict (msgs > 0, no gaps, no dups, checksums OK)
+/// - Chaos streams: integrity-only (checksums + payload length OK; gaps, 
dups, 0 msgs acceptable)
 pub struct StressVerifier {
     client_factory: Arc<dyn ClientFactory>,
-    streams: u32,
+    stable_streams: u32,
+    chaos_streams: u32,
     partitions: u32,
 }
 
 impl StressVerifier {
-    pub fn new(client_factory: Arc<dyn ClientFactory>, streams: u32, 
partitions: u32) -> Self {
+    pub fn new(
+        client_factory: Arc<dyn ClientFactory>,
+        stable_streams: u32,
+        chaos_streams: u32,
+        partitions: u32,
+    ) -> Self {
         Self {
             client_factory,
-            streams,
+            stable_streams,
+            chaos_streams,
             partitions,
         }
     }
 
-    pub async fn verify(&self) -> VerificationResult {
+    pub async fn verify(&self) -> StressVerificationResult {
         let client = self.client_factory.create_client().await;
         let client = IggyClient::create(client, None, None);
         login_root(&client).await;
 
+        let stable = self
+            .verify_namespace(&client, STABLE_STREAM_PREFIX, 
self.stable_streams)
+            .await;
+        let chaos = self
+            .verify_namespace(&client, CHAOS_STREAM_PREFIX, self.chaos_streams)
+            .await;
+
+        // Stable: strict verification
+        let stable_passed = stable.total_messages > 0
+            && stable.gaps_found == 0
+            && stable.duplicates_found == 0
+            && stable.checksum_mismatches == 0
+            && stable.payload_length_mismatches == 0;
+
+        // Chaos: integrity-only (gaps, dups, 0 msgs are acceptable)
+        let chaos_passed = chaos.checksum_mismatches == 0 && 
chaos.payload_length_mismatches == 0;
+
+        let stable = VerificationResult {
+            passed: stable_passed,
+            ..stable
+        };
+        let chaos = VerificationResult {
+            passed: chaos_passed,
+            ..chaos
+        };
+
+        let passed = stable.passed && chaos.passed;
+
+        if stable.passed {
+            info!(
+                "Stable verification PASSED: {} partitions, {} msgs, 0 gaps, 0 
dups, 0 checksum",
+                stable.partitions_checked, stable.total_messages
+            );
+        } else {
+            warn!(
+                "Stable verification FAILED: {} partitions, {} msgs, {} gaps, 
{} dups, {} checksum, {} len",
+                stable.partitions_checked,
+                stable.total_messages,
+                stable.gaps_found,
+                stable.duplicates_found,
+                stable.checksum_mismatches,
+                stable.payload_length_mismatches,
+            );
+        }
+
+        if chaos.passed {
+            info!(
+                "Chaos verification PASSED: {} partitions, {} msgs, integrity 
OK",
+                chaos.partitions_checked, chaos.total_messages
+            );
+        } else {
+            warn!(
+                "Chaos verification FAILED: {} partitions, {} checksum, {} 
len",
+                chaos.partitions_checked,
+                chaos.checksum_mismatches,
+                chaos.payload_length_mismatches,
+            );
+        }
+
+        if stable.id_missing_fingerprint + chaos.id_missing_fingerprint > 0 {
+            warn!(
+                "Verification: {} messages missing producer ID fingerprint 
(server-assigned IDs)",
+                stable.id_missing_fingerprint + chaos.id_missing_fingerprint
+            );
+        }
+
+        StressVerificationResult {
+            stable,
+            chaos,
+            passed,
+        }
+    }
+
+    async fn verify_namespace(
+        &self,
+        client: &IggyClient,
+        prefix: &str,
+        stream_count: u32,
+    ) -> VerificationResult {
         let mut result = VerificationResult::default();
 
-        for stream_idx in 1..=self.streams {
-            let stream_id: Identifier = 
format!("{BENCH_STREAM_PREFIX}-{stream_idx}")
+        for stream_idx in 1..=stream_count {
+            let stream_id: Identifier = format!("{prefix}-{stream_idx}")
                 .as_str()
                 .try_into()
                 .expect("valid identifier");
@@ -74,7 +166,7 @@ impl StressVerifier {
 
             for partition_id in 0..self.partitions {
                 let partition_result = self
-                    .verify_partition(&client, &stream_id, &topic_id, 
partition_id)
+                    .verify_partition(client, &stream_id, &topic_id, 
partition_id)
                     .await;
                 result.partitions_checked += 1;
                 result.total_messages += partition_result.total_messages;
@@ -86,41 +178,6 @@ impl StressVerifier {
             }
         }
 
-        result.passed = result.total_messages > 0
-            && result.gaps_found == 0
-            && result.duplicates_found == 0
-            && result.checksum_mismatches == 0
-            && result.payload_length_mismatches == 0;
-
-        if result.total_messages == 0 {
-            warn!(
-                "Verification FAILED: 0 messages found across {} partitions — 
topic may have been destroyed by chaos",
-                result.partitions_checked
-            );
-        } else if result.passed {
-            info!(
-                "Verification PASSED: {} partitions, {} msgs, 0 gaps, 0 dups, 
0 checksum, 0 len",
-                result.partitions_checked, result.total_messages
-            );
-        } else {
-            warn!(
-                "Verification FAILED: {} partitions, {} msgs, {} gaps, {} 
dups, {} checksum, {} len",
-                result.partitions_checked,
-                result.total_messages,
-                result.gaps_found,
-                result.duplicates_found,
-                result.checksum_mismatches,
-                result.payload_length_mismatches,
-            );
-        }
-
-        if result.id_missing_fingerprint > 0 {
-            warn!(
-                "Verification: {} messages missing producer ID fingerprint 
(server-assigned IDs)",
-                result.id_missing_fingerprint
-            );
-        }
-
         result
     }
 
@@ -162,7 +219,6 @@ impl StressVerifier {
                             result.duplicates_found += 1;
                         }
 
-                        // Checksum re-verification: serialize and hash 
everything after the checksum field
                         let raw = msg.to_bytes();
                         let recomputed = calculate_checksum(&raw[8..]);
                         if msg.header.checksum != recomputed {
@@ -191,7 +247,6 @@ impl StressVerifier {
             }
         }
 
-        // Check for gaps in the seen offsets
         if let (Some(&min), Some(&max)) = (seen_offsets.first(), 
seen_offsets.last()) {
             let expected_count = max - min + 1;
             let actual_count = seen_offsets.len() as u64;
diff --git a/core/bench/src/args/kinds/stress/args.rs 
b/core/bench/src/args/kinds/stress/args.rs
index ad17bb86f..ebac49648 100644
--- a/core/bench/src/args/kinds/stress/args.rs
+++ b/core/bench/src/args/kinds/stress/args.rs
@@ -25,9 +25,9 @@ use std::str::FromStr;
 const DEFAULT_PRODUCERS: NonZeroU32 = nonzero_lit::u32!(4);
 const DEFAULT_CONSUMERS: NonZeroU32 = nonzero_lit::u32!(4);
 const DEFAULT_CHURN_CONCURRENCY: NonZeroU32 = nonzero_lit::u32!(1);
-const DEFAULT_STREAMS: u32 = 2;
+const DEFAULT_STABLE_STREAMS: u32 = 2;
+const DEFAULT_CHAOS_STREAMS: u32 = 1;
 const DEFAULT_PARTITIONS: u32 = 4;
-const DEFAULT_CONSUMER_GROUPS: u32 = 2;
 
 /// Determines the mix of API operations exercised during the stress test.
 #[derive(Debug, Clone, Copy, ValueEnum, Default)]
@@ -81,14 +81,30 @@ pub struct StressArgs {
     #[arg(long, value_enum, default_value_t = ApiMix::All)]
     pub api_mix: ApiMix,
 
+    /// Number of stable streams (golden logs, strict verification)
+    #[arg(long, default_value_t = DEFAULT_STABLE_STREAMS)]
+    pub stable_streams: u32,
+
+    /// Number of chaos streams (full destruction, integrity-only verification)
+    #[arg(long, default_value_t = DEFAULT_CHAOS_STREAMS)]
+    pub chaos_streams: u32,
+
     /// RNG seed for reproducible chaos operations
     #[arg(long)]
     pub chaos_seed: Option<u64>,
+
+    /// Fixed duration of the baseline phase (data-plane only, no chaos)
+    #[arg(long, default_value = "15s", value_parser = IggyDuration::from_str)]
+    pub baseline_duration: IggyDuration,
+
+    /// Fixed duration of the quiesce phase (graceful shutdown window after 
cancellation)
+    #[arg(long, default_value = "10s", value_parser = IggyDuration::from_str)]
+    pub quiesce_duration: IggyDuration,
 }
 
 impl BenchmarkKindProps for StressArgs {
     fn streams(&self) -> u32 {
-        DEFAULT_STREAMS
+        self.stable_streams + self.chaos_streams
     }
 
     fn partitions(&self) -> u32 {
@@ -107,16 +123,19 @@ impl BenchmarkKindProps for StressArgs {
         &self.transport
     }
 
+    /// CGs only on stable streams
     fn number_of_consumer_groups(&self) -> u32 {
-        DEFAULT_CONSUMER_GROUPS
+        self.stable_streams
     }
 
+    /// Stable streams use `NeverExpire`; chaos uses the CLI field directly
     fn max_topic_size(&self) -> Option<IggyByteSize> {
-        Some(self.max_topic_size)
+        None
     }
 
+    /// Stable streams use `NeverExpire`; chaos uses the CLI field directly
     fn message_expiry(&self) -> IggyExpiry {
-        self.message_expiry
+        IggyExpiry::NeverExpire
     }
 
     fn validate(&self) {
@@ -128,6 +147,35 @@ impl BenchmarkKindProps for StressArgs {
                 )
                 .exit();
         }
+        if self.stable_streams < 1 {
+            crate::args::common::IggyBenchArgs::command()
+                .error(
+                    ErrorKind::ValueValidation,
+                    "At least 1 stable stream is required",
+                )
+                .exit();
+        }
+        if self.chaos_streams < 1 {
+            crate::args::common::IggyBenchArgs::command()
+                .error(
+                    ErrorKind::ValueValidation,
+                    "At least 1 chaos stream is required",
+                )
+                .exit();
+        }
+        let baseline_plus_quiesce =
+            self.baseline_duration.get_duration() + 
self.quiesce_duration.get_duration();
+        if baseline_plus_quiesce >= self.duration.get_duration() {
+            crate::args::common::IggyBenchArgs::command()
+                .error(
+                    ErrorKind::ValueValidation,
+                    format!(
+                        "baseline_duration ({}) + quiesce_duration ({}) must 
be less than total duration ({})",
+                        self.baseline_duration, self.quiesce_duration, 
self.duration
+                    ),
+                )
+                .exit();
+        }
     }
 }
 
@@ -157,4 +205,20 @@ impl StressArgs {
                 .as_nanos() as u64
         })
     }
+
+    pub const fn stable_streams(&self) -> u32 {
+        self.stable_streams
+    }
+
+    pub const fn chaos_streams(&self) -> u32 {
+        self.chaos_streams
+    }
+
+    pub const fn baseline_duration(&self) -> IggyDuration {
+        self.baseline_duration
+    }
+
+    pub const fn quiesce_duration(&self) -> IggyDuration {
+        self.quiesce_duration
+    }
 }
diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_consumer_group.rs
index 5bf6026e3..bf49c565e 100644
--- a/core/bench/src/benchmarks/balanced_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_consumer_group.rs
@@ -17,6 +17,7 @@
  */
 
 use super::benchmark::Benchmarkable;
+use crate::benchmarks::bench_stream_names;
 use crate::utils::ClientFactory;
 use crate::{
     args::common::IggyBenchArgs,
@@ -52,10 +53,11 @@ impl Benchmarkable for BalancedConsumerGroupBenchmark {
         let cf = &self.client_factory;
         let args = self.args.clone();
         let mut tasks: JoinSet<_> = JoinSet::new();
+        let names = bench_stream_names(args.number_of_consumer_groups());
 
-        init_consumer_groups(cf, &args).await?;
+        init_consumer_groups(cf, &names).await?;
 
-        let consumer_futures = build_consumer_futures(cf, &args);
+        let consumer_futures = build_consumer_futures(cf, &args, &names, None);
         for fut in consumer_futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/balanced_producer.rs 
b/core/bench/src/benchmarks/balanced_producer.rs
index b9e7aa892..53de2c05a 100644
--- a/core/bench/src/benchmarks/balanced_producer.rs
+++ b/core/bench/src/benchmarks/balanced_producer.rs
@@ -18,6 +18,7 @@
 
 use super::benchmark::Benchmarkable;
 use crate::args::common::IggyBenchArgs;
+use crate::benchmarks::bench_stream_names;
 use crate::benchmarks::common::build_producer_futures;
 use crate::utils::ClientFactory;
 use async_trait::async_trait;
@@ -50,7 +51,8 @@ impl Benchmarkable for BalancedProducerBenchmark {
         self.init_streams().await?;
         let cf = &self.client_factory;
         let args = self.args.clone();
-        let producer_futures = build_producer_futures(cf, &args);
+        let names = bench_stream_names(args.streams());
+        let producer_futures = build_producer_futures(cf, &args, &names, None);
         let mut tasks = JoinSet::new();
 
         for fut in producer_futures {
diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
index 10ed9bc4b..e8d0b0e6e 100644
--- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
@@ -17,6 +17,7 @@
  */
 
 use super::benchmark::Benchmarkable;
+use crate::benchmarks::bench_stream_names;
 use crate::utils::ClientFactory;
 use crate::{
     args::common::IggyBenchArgs,
@@ -52,11 +53,12 @@ impl Benchmarkable for 
BalancedProducerAndConsumerGroupBenchmark {
         let cf = &self.client_factory;
         let args = self.args.clone();
         let mut tasks: JoinSet<_> = JoinSet::new();
+        let names = bench_stream_names(args.streams());
 
-        init_consumer_groups(cf, &args).await?;
+        init_consumer_groups(cf, &names).await?;
 
-        let producer_futures = build_producer_futures(cf, &args);
-        let consumer_futures = build_consumer_futures(cf, &args);
+        let producer_futures = build_producer_futures(cf, &args, &names, None);
+        let consumer_futures = build_consumer_futures(cf, &args, &names, None);
 
         for fut in producer_futures {
             tasks.spawn(fut);
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index 856a6a90c..6d8d4a910 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -70,16 +70,15 @@ pub fn rate_limit_per_actor(total_rate: 
Option<IggyByteSize>, actors: u32) -> Op
 #[allow(clippy::cognitive_complexity)]
 pub async fn init_consumer_groups(
     client_factory: &Arc<dyn ClientFactory>,
-    args: &IggyBenchArgs,
+    stream_names: &[String],
 ) -> Result<(), IggyError> {
     let client = client_factory.create_client().await;
     let client = IggyClient::create(client, None, None);
-    let cg_count = args.number_of_consumer_groups();
 
     login_root(&client).await;
-    for i in 0..cg_count {
-        let consumer_group_id = CONSUMER_GROUP_BASE_ID + i;
-        let stream_name = format!("bench-stream-{}", i + 1);
+    for (i, stream_name) in stream_names.iter().enumerate() {
+        #[allow(clippy::cast_possible_truncation)]
+        let consumer_group_id = CONSUMER_GROUP_BASE_ID + i as u32;
         let stream_id: Identifier = stream_name.as_str().try_into()?;
         let topic_id: Identifier = "topic-1".try_into()?;
         let consumer_group_name = 
format!("{CONSUMER_GROUP_NAME_PREFIX}-{consumer_group_id}");
@@ -100,8 +99,10 @@ pub async fn init_consumer_groups(
 pub fn build_producer_futures(
     client_factory: &Arc<dyn ClientFactory>,
     args: &IggyBenchArgs,
+    stream_names: &[String],
+    finish_condition_override: Option<Arc<BenchmarkFinishCondition>>,
 ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
-    let streams = args.streams();
+    let stream_count = u32::try_from(stream_names.len()).expect("too many 
streams");
     let partitions = args.number_of_partitions();
     let producers = args.producers();
     let actors = args.producers() + args.consumers();
@@ -111,8 +112,9 @@ pub fn build_producer_futures(
     let sampling_time = args.sampling_time();
     let moving_average_window = args.moving_average_window();
     let kind = args.kind();
-    let shared_finish_condition =
-        BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared);
+    let shared_finish_condition = finish_condition_override.unwrap_or_else(|| {
+        BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared)
+    });
     let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
     let use_high_level_api = args.high_level_api();
 
@@ -126,8 +128,7 @@ pub fn build_producer_futures(
                 BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::PerProducer)
             };
 
-            let stream_idx = 1 + ((producer_id - 1) % streams);
-            let stream_id = format!("bench-stream-{stream_idx}");
+            let stream_id = stream_names[((producer_id - 1) % stream_count) as 
usize].clone();
 
             async move {
                 let producer = TypedBenchmarkProducer::new(
@@ -154,8 +155,11 @@ pub fn build_producer_futures(
 pub fn build_consumer_futures(
     client_factory: &Arc<dyn ClientFactory>,
     args: &IggyBenchArgs,
+    stream_names: &[String],
+    finish_condition_override: Option<Arc<BenchmarkFinishCondition>>,
 ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let cg_count = args.number_of_consumer_groups();
+    let stream_count = stream_names.len();
     let consumers = args.consumers();
     let actors = args.producers() + args.consumers();
     let warmup_time = args.warmup_time();
@@ -176,8 +180,9 @@ pub fn build_consumer_futures(
         _ => unreachable!(),
     };
 
-    let global_finish_condition =
-        BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared);
+    let global_finish_condition = finish_condition_override.unwrap_or_else(|| {
+        BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared)
+    });
     // When measuring true E2E latency, apply read_amplification multiplier to 
consumer rate
     // to ensure they can keep up with producers and not inflate latency due 
to queue buildup
     let read_amplification = args.read_amplification().unwrap_or(1.0);
@@ -205,11 +210,11 @@ pub fn build_consumer_futures(
                 BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::PerConsumer)
             };
             let stream_idx = if cg_count > 0 {
-                1 + ((consumer_id - 1) % cg_count)
+                ((consumer_id - 1) % cg_count) as usize
             } else {
-                consumer_id
+                (consumer_id - 1) as usize
             };
-            let stream_id = format!("bench-stream-{stream_idx}");
+            let stream_id = stream_names[stream_idx % stream_count].clone();
             // Each stream has exactly one CG, server assigns IDs starting 
from 0
             let consumer_group_id = if cg_count > 0 {
                 Some(CONSUMER_GROUP_BASE_ID)
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
index 2ef5cd190..27d879b57 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
@@ -17,6 +17,7 @@
  */
 
 use crate::args::common::IggyBenchArgs;
+use crate::benchmarks::bench_stream_names;
 use crate::benchmarks::common::{build_producing_consumer_groups_futures, 
init_consumer_groups};
 use crate::utils::ClientFactory;
 use async_trait::async_trait;
@@ -52,8 +53,9 @@ impl Benchmarkable for 
EndToEndProducingConsumerGroupBenchmark {
         let cf = self.client_factory.clone();
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
+        let names = bench_stream_names(args.number_of_consumer_groups());
 
-        init_consumer_groups(&cf, &args).await?;
+        init_consumer_groups(&cf, &names).await?;
 
         let futures = build_producing_consumer_groups_futures(cf, args);
         for fut in futures {
diff --git a/core/bench/src/benchmarks/mod.rs b/core/bench/src/benchmarks/mod.rs
index b97ac1f74..f6d70ea0d 100644
--- a/core/bench/src/benchmarks/mod.rs
+++ b/core/bench/src/benchmarks/mod.rs
@@ -33,4 +33,12 @@ pub mod stress_report;
 pub const CONSUMER_GROUP_BASE_ID: u32 = 0;
 pub const CONSUMER_GROUP_NAME_PREFIX: &str = "cg";
 pub const BENCH_STREAM_PREFIX: &str = "bench-stream";
+pub const STABLE_STREAM_PREFIX: &str = "stable";
+pub const CHAOS_STREAM_PREFIX: &str = "chaos";
 pub const BENCH_TOPIC_NAME: &str = "topic-1";
+
+pub fn bench_stream_names(count: u32) -> Vec<String> {
+    (1..=count)
+        .map(|i| format!("{BENCH_STREAM_PREFIX}-{i}"))
+        .collect()
+}
diff --git a/core/bench/src/benchmarks/pinned_consumer.rs 
b/core/bench/src/benchmarks/pinned_consumer.rs
index 4c44b9fec..3d5c59ac5 100644
--- a/core/bench/src/benchmarks/pinned_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_consumer.rs
@@ -17,6 +17,7 @@
  */
 
 use crate::args::common::IggyBenchArgs;
+use crate::benchmarks::bench_stream_names;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::build_consumer_futures;
 use crate::utils::ClientFactory;
@@ -51,8 +52,9 @@ impl Benchmarkable for PinnedConsumerBenchmark {
         let cf = &self.client_factory;
         let args = self.args.clone();
         let mut tasks: JoinSet<_> = JoinSet::new();
+        let names = bench_stream_names(args.streams());
 
-        let futures = build_consumer_futures(cf, &args);
+        let futures = build_consumer_futures(cf, &args, &names, None);
         for fut in futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/pinned_producer.rs 
b/core/bench/src/benchmarks/pinned_producer.rs
index 8630eb573..b4c605c53 100644
--- a/core/bench/src/benchmarks/pinned_producer.rs
+++ b/core/bench/src/benchmarks/pinned_producer.rs
@@ -17,6 +17,7 @@
  */
 
 use crate::args::common::IggyBenchArgs;
+use crate::benchmarks::bench_stream_names;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::build_producer_futures;
 use crate::utils::ClientFactory;
@@ -51,8 +52,9 @@ impl Benchmarkable for PinnedProducerBenchmark {
         let client_factory = &self.client_factory;
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
+        let names = bench_stream_names(args.streams());
 
-        let producer_futures = build_producer_futures(client_factory, &args);
+        let producer_futures = build_producer_futures(client_factory, &args, 
&names, None);
 
         for fut in producer_futures {
             tasks.spawn(fut);
diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs 
b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
index 8f2630396..f2fa95317 100644
--- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::args::common::IggyBenchArgs;
+use crate::benchmarks::bench_stream_names;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::{build_consumer_futures, 
build_producer_futures};
 use crate::utils::ClientFactory;
@@ -50,9 +51,10 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark {
         let cf = &self.client_factory;
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
+        let names = bench_stream_names(args.streams());
 
-        let producer_futures = build_producer_futures(cf, &args);
-        let consumer_futures = build_consumer_futures(cf, &args);
+        let producer_futures = build_producer_futures(cf, &args, &names, None);
+        let consumer_futures = build_consumer_futures(cf, &args, &names, None);
 
         for fut in producer_futures {
             tasks.spawn(fut);
diff --git a/core/bench/src/benchmarks/stress.rs 
b/core/bench/src/benchmarks/stress.rs
index 62bf35df8..734e2cd98 100644
--- a/core/bench/src/benchmarks/stress.rs
+++ b/core/bench/src/benchmarks/stress.rs
@@ -19,6 +19,7 @@
 use super::benchmark::Benchmarkable;
 use super::common::{build_consumer_futures, build_producer_futures, 
init_consumer_groups};
 use super::stress_report::StressReport;
+use super::{BENCH_TOPIC_NAME, CHAOS_STREAM_PREFIX, STABLE_STREAM_PREFIX};
 use crate::actors::stress::admin_exerciser::AdminExerciser;
 use crate::actors::stress::control_plane_churner::{ChurnerConfig, 
ControlPlaneChurner};
 use crate::actors::stress::error_classifier;
@@ -28,25 +29,22 @@ use crate::actors::stress::verifier::StressVerifier;
 use crate::args::common::IggyBenchArgs;
 use crate::args::kind::BenchmarkKindCommand;
 use crate::args::kinds::stress::args::ApiMix;
-use crate::utils::ClientFactory;
+use crate::utils::finish_condition::BenchmarkFinishCondition;
+use crate::utils::{ClientFactory, login_root};
 use async_trait::async_trait;
 use bench_report::{
     actor_kind::ActorKind, benchmark_kind::BenchmarkKind,
     individual_metrics::BenchmarkIndividualMetrics,
 };
 use iggy::prelude::*;
+use std::io::{Cursor, Read as _};
+use std::str::FromStr;
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use std::time::{Duration, Instant};
 use tokio::task::JoinSet;
 use tracing::{info, warn};
 
-/// Phase durations as fractions of total test time.
-const BASELINE_FRACTION: f64 = 0.15;
-const CHAOS_FRACTION: f64 = 0.65;
-/// Max drain phase duration.
-const MAX_DRAIN_SECS: u64 = 300;
-
 pub struct StressBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
@@ -69,15 +67,10 @@ impl StressBenchmark {
 
     fn compute_phase_durations(&self) -> (Duration, Duration, Duration) {
         let total = self.stress_args().duration().get_duration();
-        let total_secs = total.as_secs_f64();
-
-        let baseline = Duration::from_secs_f64(total_secs * BASELINE_FRACTION);
-        let chaos = Duration::from_secs_f64(total_secs * CHAOS_FRACTION);
-        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
-        let drain_secs = (total_secs * (1.0 - BASELINE_FRACTION - 
CHAOS_FRACTION)) as u64;
-        let drain = Duration::from_secs(drain_secs.min(MAX_DRAIN_SECS));
-
-        (baseline, chaos, drain)
+        let baseline = self.stress_args().baseline_duration().get_duration();
+        let quiesce = self.stress_args().quiesce_duration().get_duration();
+        let chaos = total.saturating_sub(baseline + quiesce);
+        (baseline, chaos, quiesce)
     }
 
     /// Spawns chaos actors: control-plane churner, admin exerciser, health 
monitor.
@@ -85,34 +78,28 @@ impl StressBenchmark {
         &self,
         tasks: &mut JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>,
         ctx: &Arc<StressContext>,
-        chaos_duration: Duration,
     ) {
         let stress_args = self.stress_args();
         let api_mix = stress_args.api_mix();
         let chaos_seed = stress_args.chaos_seed();
 
-        // Health monitor always runs
         let monitor = HealthMonitor::new(self.client_factory.clone(), 
ctx.clone());
         tasks.spawn(async move {
             monitor.run().await;
             Ok(BenchmarkIndividualMetrics::placeholder("health_monitor"))
         });
 
-        // Control-plane churner(s) unless data-plane-only
         if !matches!(api_mix, ApiMix::DataPlaneOnly) {
             let churn_concurrency = stress_args.churn_concurrency().get();
             let churn_interval = stress_args.churn_interval();
 
-            let purge_cutoff = Instant::now() + chaos_duration / 2;
-
             let churner_config = ChurnerConfig {
                 api_mix,
-                streams: self.args.streams(),
+                stable_streams: stress_args.stable_streams(),
+                chaos_streams: stress_args.chaos_streams(),
                 partitions: self.args.number_of_partitions(),
-                consumer_groups: self.args.number_of_consumer_groups(),
                 message_expiry: stress_args.message_expiry,
                 max_topic_size: 
MaxTopicSize::Custom(stress_args.max_topic_size),
-                purge_cutoff,
             };
 
             for i in 0..churn_concurrency {
@@ -133,84 +120,123 @@ impl StressBenchmark {
             }
         }
 
-        // Admin exerciser for mixed/all modes
         if matches!(api_mix, ApiMix::Mixed | ApiMix::All) {
-            let admin = AdminExerciser::new(self.client_factory.clone(), 
ctx.clone());
+            let admin = AdminExerciser::new(
+                self.client_factory.clone(),
+                ctx.clone(),
+                format!("{STABLE_STREAM_PREFIX}-1"),
+            );
             tasks.spawn(async move {
                 admin.run().await;
                 Ok(BenchmarkIndividualMetrics::placeholder("admin_exerciser"))
             });
         }
     }
-}
-
-#[async_trait]
-impl Benchmarkable for StressBenchmark {
-    async fn run(
-        &mut self,
-    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
-        let overall_start = Instant::now();
-        let (baseline_duration, chaos_duration, drain_max) = 
self.compute_phase_durations();
 
-        let total = self.stress_args().duration().get_duration();
-        let remaining_after_chaos = total.saturating_sub(baseline_duration + 
chaos_duration);
-        if drain_max < remaining_after_chaos {
-            warn!(
-                "Drain cap ({drain_max:?}) is shorter than remaining duration 
({remaining_after_chaos:?}); \
-                 data-plane actors may be aborted before finishing. Consider 
shorter --duration or raising MAX_DRAIN_SECS."
-            );
+    /// Best-effort validation of server config via snapshot API.
+    /// Warns about common misconfigurations but never blocks the test.
+    async fn validate_server_config(&self) {
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let snapshot = client
+            .snapshot(
+                SnapshotCompression::Stored,
+                vec![SystemSnapshotType::ServerConfig],
+            )
+            .await;
+
+        let Ok(snapshot) = snapshot else {
+            warn!("Could not fetch server config snapshot for validation; 
skipping checks");
+            return;
+        };
+
+        let cursor = Cursor::new(snapshot.0);
+        let Ok(mut zip) = zip::ZipArchive::new(cursor) else {
+            warn!("Could not parse server config snapshot as ZIP; skipping 
checks");
+            return;
+        };
+
+        let Ok(mut entry) = zip.by_name("server_config.txt") else {
+            warn!("server_config.txt not found in snapshot; skipping checks");
+            return;
+        };
+
+        let mut config_str = String::new();
+        if entry.read_to_string(&mut config_str).is_err() {
+            warn!("Could not read server_config.txt from snapshot; skipping 
checks");
+            return;
         }
+        drop(entry);
 
-        info!(
-            "Stress test starting: baseline={baseline_duration:?}, 
chaos={chaos_duration:?}, drain_max={drain_max:?}"
-        );
-
-        self.init_streams().await?;
-        init_consumer_groups(&self.client_factory, &self.args).await?;
-
-        let ctx = Arc::new(StressContext::new());
-
-        let mut tasks: JoinSet<Result<BenchmarkIndividualMetrics, IggyError>> 
= JoinSet::new();
-
-        // === PHASE 1: Baseline / Produce (data-plane only) ===
-        info!("=== Phase 1: Baseline ({baseline_duration:?}) ===");
+        let Ok(config) = config_str.parse::<toml::Value>() else {
+            warn!("Could not parse server config as TOML; skipping checks");
+            return;
+        };
 
-        let producer_futures = build_producer_futures(&self.client_factory, 
&self.args);
-        let consumer_futures = build_consumer_futures(&self.client_factory, 
&self.args);
+        let stress_args = self.stress_args();
 
-        for fut in producer_futures {
-            tasks.spawn(fut);
-        }
-        for fut in consumer_futures {
-            tasks.spawn(fut);
+        // Check if message cleaner is enabled when message_expiry is set
+        if !matches!(stress_args.message_expiry, IggyExpiry::NeverExpire) {
+            let cleaner_enabled = config
+                .get("data_maintenance")
+                .and_then(|dm| dm.get("messages"))
+                .and_then(|m| m.get("cleaner_enabled"))
+                .and_then(toml::Value::as_bool)
+                .unwrap_or(false);
+
+            if !cleaner_enabled {
+                warn!(
+                    "Server has 
data_maintenance.messages.cleaner_enabled=false but stress test \
+                     uses message_expiry={}. Messages will not be cleaned up. \
+                     Set IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true 
to enable cleanup.",
+                    stress_args.message_expiry
+                );
+            }
         }
 
-        tokio::time::sleep(baseline_duration).await;
-        let baseline_elapsed = overall_start.elapsed();
-        info!("Baseline phase completed in {baseline_elapsed:?}");
-
-        // === PHASE 2: Chaos (add churners + admin + health alongside ongoing 
data-plane) ===
-        info!("=== Phase 2: Chaos ({chaos_duration:?}) ===");
-
-        self.spawn_chaos_actors(&mut tasks, &ctx, chaos_duration);
-
-        tokio::time::sleep(chaos_duration).await;
-        let chaos_elapsed = overall_start.elapsed();
-        info!("Chaos phase completed in {chaos_elapsed:?}");
+        // Check segment size vs max_topic_size
+        let segment_size = config
+            .get("system")
+            .and_then(|s| s.get("segment"))
+            .and_then(|s| s.get("size"))
+            .and_then(toml::Value::as_str)
+            .and_then(|s| IggyByteSize::from_str(s).ok());
+
+        if let Some(seg_size) = segment_size {
+            let max_reasonable = IggyByteSize::from_str("64MiB").expect("valid 
constant");
+            if seg_size > max_reasonable {
+                warn!(
+                    "Server segment size ({seg_size}) is large for stress 
testing. \
+                     Consider IGGY_SYSTEM_SEGMENT_SIZE=\"16MiB\" for faster 
segment \
+                     rotation and more race surface."
+                );
+            }
+        }
 
-        // === PHASE 3: Drain ===
-        info!("=== Phase 3: Drain (max {drain_max:?}) ===");
-        ctx.cancel();
+        info!("Server config validation complete");
+    }
 
-        let drain_start = Instant::now();
-        let drain_deadline = drain_start + drain_max;
+    /// Drains the `JoinSet` within the quiesce window, collecting data-plane 
batch
+    /// counts and recording errors. Aborts remaining actors if the deadline 
expires.
+    async fn drain_actors(
+        tasks: &mut JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>,
+        ctx: &StressContext,
+        quiesce_duration: Duration,
+    ) -> Duration {
+        let quiesce_start = Instant::now();
+        let quiesce_deadline = quiesce_start + quiesce_duration;
         let mut producer_batches: u64 = 0;
         let mut consumer_batches: u64 = 0;
 
         while !tasks.is_empty() {
-            let remaining = 
drain_deadline.saturating_duration_since(Instant::now());
+            let remaining = 
quiesce_deadline.saturating_duration_since(Instant::now());
             if remaining.is_zero() {
-                warn!("Drain phase timed out, aborting remaining tasks");
+                warn!(
+                    "{} actors didn't finish within quiesce window, aborting",
+                    tasks.len()
+                );
                 tasks.abort_all();
                 break;
             }
@@ -230,11 +256,14 @@ impl Benchmarkable for StressBenchmark {
                     error_classifier::record_error(&ctx.stats, &e);
                 }
                 Ok(Some(Err(e))) => {
-                    warn!("Actor join failed: {e}");
+                    warn!("Actor join error: {e}");
                 }
                 Ok(None) => break,
                 Err(_) => {
-                    warn!("Drain phase timed out");
+                    warn!(
+                        "{} actors didn't finish within quiesce window, 
aborting",
+                        tasks.len()
+                    );
                     tasks.abort_all();
                     break;
                 }
@@ -248,13 +277,94 @@ impl Benchmarkable for StressBenchmark {
             .poll_messages_ok
             .fetch_add(consumer_batches, Ordering::Relaxed);
 
-        let drain_elapsed = drain_start.elapsed();
+        quiesce_start.elapsed()
+    }
+}
+
+#[async_trait]
+impl Benchmarkable for StressBenchmark {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
+        let overall_start = Instant::now();
+        let stress_args = self.stress_args();
+        let stable_count = stress_args.stable_streams();
+        let chaos_count = stress_args.chaos_streams();
+        let (baseline_duration, chaos_duration, quiesce_duration) = 
self.compute_phase_durations();
+
+        info!(
+            "Stress test starting: baseline={baseline_duration:?}, 
chaos={chaos_duration:?}, quiesce={quiesce_duration:?}"
+        );
+
+        self.validate_server_config().await;
+
+        self.init_streams().await?;
+
+        let stable_names: Vec<String> = (1..=stable_count)
+            .map(|i| format!("{STABLE_STREAM_PREFIX}-{i}"))
+            .collect();
+        let chaos_names: Vec<String> = (1..=chaos_count)
+            .map(|i| format!("{CHAOS_STREAM_PREFIX}-{i}"))
+            .collect();
+        let all_stream_names: Vec<String> =
+            stable_names.iter().chain(&chaos_names).cloned().collect();
+
+        init_consumer_groups(&self.client_factory, &stable_names).await?;
+
+        let ctx = Arc::new(StressContext::new());
+
+        let finish_condition = 
BenchmarkFinishCondition::new_cancellable(ctx.cancelled.clone());
+
+        let mut tasks: JoinSet<Result<BenchmarkIndividualMetrics, IggyError>> 
= JoinSet::new();
+
+        // === PHASE 1: Baseline (data-plane only) ===
+        info!("=== Phase 1: Baseline ({baseline_duration:?}) ===");
+
+        let producer_futures = build_producer_futures(
+            &self.client_factory,
+            &self.args,
+            &all_stream_names,
+            Some(finish_condition.clone()),
+        );
+        let consumer_futures = build_consumer_futures(
+            &self.client_factory,
+            &self.args,
+            &stable_names,
+            Some(finish_condition),
+        );
+
+        for fut in producer_futures {
+            tasks.spawn(fut);
+        }
+        for fut in consumer_futures {
+            tasks.spawn(fut);
+        }
+
+        tokio::time::sleep(baseline_duration).await;
+        let baseline_elapsed = overall_start.elapsed();
+        info!("Baseline phase completed in {baseline_elapsed:?}");
+
+        // === PHASE 2: Chaos (add churners + admin + health alongside ongoing 
data-plane) ===
+        info!("=== Phase 2: Chaos ({chaos_duration:?}) ===");
+
+        self.spawn_chaos_actors(&mut tasks, &ctx);
+
+        tokio::time::sleep(chaos_duration).await;
+        let chaos_elapsed = overall_start.elapsed();
+        info!("Chaos phase completed in {chaos_elapsed:?}");
+
+        // === PHASE 3: Quiesce ===
+        info!("=== Phase 3: Quiesce ({quiesce_duration:?}) ===");
+        ctx.cancel();
+
+        let quiesce_elapsed = Self::drain_actors(&mut tasks, &ctx, 
quiesce_duration).await;
 
         // === Verification ===
         info!("Running post-test verification...");
         let verifier = StressVerifier::new(
             self.client_factory.clone(),
-            self.args.streams(),
+            stable_count,
+            chaos_count,
             self.args.number_of_partitions(),
         );
         let verification = verifier.verify().await;
@@ -265,7 +375,7 @@ impl Benchmarkable for StressBenchmark {
             overall_start.elapsed(),
             baseline_duration,
             chaos_duration,
-            drain_elapsed,
+            quiesce_elapsed,
         );
         report.print_summary();
 
@@ -276,6 +386,63 @@ impl Benchmarkable for StressBenchmark {
         Ok(JoinSet::new())
     }
 
+    async fn init_streams(&self) -> Result<(), IggyError> {
+        let stress_args = self.stress_args();
+        let stable_count = stress_args.stable_streams();
+        let chaos_count = stress_args.chaos_streams();
+        let partitions_count = self.args.number_of_partitions();
+
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let streams = client.get_streams().await?;
+
+        // Stable streams: NeverExpire, ServerDefault size
+        for i in 1..=stable_count {
+            let stream_name = format!("{STABLE_STREAM_PREFIX}-{i}");
+            if streams.iter().all(|s| s.name != stream_name) {
+                info!("Creating stable stream '{stream_name}'");
+                client.create_stream(&stream_name).await?;
+                let stream_id: Identifier = stream_name.as_str().try_into()?;
+                client
+                    .create_topic(
+                        &stream_id,
+                        BENCH_TOPIC_NAME,
+                        partitions_count,
+                        CompressionAlgorithm::default(),
+                        None,
+                        IggyExpiry::NeverExpire,
+                        MaxTopicSize::ServerDefault,
+                    )
+                    .await?;
+            }
+        }
+
+        // Chaos streams: user-configured expiry and max_topic_size
+        for i in 1..=chaos_count {
+            let stream_name = format!("{CHAOS_STREAM_PREFIX}-{i}");
+            if streams.iter().all(|s| s.name != stream_name) {
+                info!("Creating chaos stream '{stream_name}'");
+                client.create_stream(&stream_name).await?;
+                let stream_id: Identifier = stream_name.as_str().try_into()?;
+                client
+                    .create_topic(
+                        &stream_id,
+                        BENCH_TOPIC_NAME,
+                        partitions_count,
+                        CompressionAlgorithm::default(),
+                        None,
+                        stress_args.message_expiry,
+                        MaxTopicSize::Custom(stress_args.max_topic_size),
+                    )
+                    .await?;
+            }
+        }
+
+        Ok(())
+    }
+
     fn kind(&self) -> BenchmarkKind {
         BenchmarkKind::Stress
     }
@@ -291,14 +458,18 @@ impl Benchmarkable for StressBenchmark {
     fn print_info(&self) {
         let stress_args = self.stress_args();
         info!(
-            "Starting Stress benchmark: duration={}, producers={}, 
consumers={}, churn_concurrency={}, churn_interval={}, api_mix={:?}, 
chaos_seed={}",
+            "Starting Stress benchmark: duration={}, producers={}, 
consumers={}, stable_streams={}, chaos_streams={}, churn_concurrency={}, 
churn_interval={}, api_mix={:?}, chaos_seed={}, baseline={}, quiesce={}",
             stress_args.duration(),
             stress_args.producers.get(),
             stress_args.consumers.get(),
+            stress_args.stable_streams(),
+            stress_args.chaos_streams(),
             stress_args.churn_concurrency().get(),
             stress_args.churn_interval(),
             stress_args.api_mix(),
             stress_args.chaos_seed(),
+            stress_args.baseline_duration(),
+            stress_args.quiesce_duration(),
         );
     }
 }
diff --git a/core/bench/src/benchmarks/stress_report.rs 
b/core/bench/src/benchmarks/stress_report.rs
index 6ff549f0c..129d5a42c 100644
--- a/core/bench/src/benchmarks/stress_report.rs
+++ b/core/bench/src/benchmarks/stress_report.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::actors::stress::{stress_context::StressStats, 
verifier::VerificationResult};
+use crate::actors::stress::{stress_context::StressStats, 
verifier::StressVerificationResult};
 use serde::Serialize;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::time::Duration;
@@ -27,7 +27,7 @@ pub struct StressReport {
     pub total_duration_secs: f64,
     pub baseline_duration_secs: f64,
     pub chaos_duration_secs: f64,
-    pub drain_duration_secs: f64,
+    pub quiesce_duration_secs: f64,
     pub api_calls: ApiCallSummary,
     pub error_tiers: ErrorTierSummary,
     pub verification: VerificationSummary,
@@ -88,6 +88,13 @@ pub struct ErrorTierSummary {
 
 #[derive(Debug, Serialize)]
 pub struct VerificationSummary {
+    pub stable: NamespaceVerification,
+    pub chaos: NamespaceVerification,
+    pub passed: bool,
+}
+
+#[derive(Debug, Serialize)]
+pub struct NamespaceVerification {
     pub partitions_checked: u32,
     pub total_messages: u64,
     pub gaps_found: u64,
@@ -101,31 +108,26 @@ pub struct VerificationSummary {
 impl StressReport {
     pub fn build(
         stats: &StressStats,
-        verification: &VerificationResult,
+        verification: &StressVerificationResult,
         total_duration: Duration,
         baseline_duration: Duration,
         chaos_duration: Duration,
-        drain_duration: Duration,
+        quiesce_duration: Duration,
     ) -> Self {
         let api_calls = ApiCallSummary::from_stats(stats);
         Self {
             total_duration_secs: total_duration.as_secs_f64(),
             baseline_duration_secs: baseline_duration.as_secs_f64(),
             chaos_duration_secs: chaos_duration.as_secs_f64(),
-            drain_duration_secs: drain_duration.as_secs_f64(),
+            quiesce_duration_secs: quiesce_duration.as_secs_f64(),
             error_tiers: ErrorTierSummary {
                 expected: stats.expected_errors.load(Ordering::Relaxed),
                 unexpected: stats.unexpected_errors.load(Ordering::Relaxed),
             },
             api_calls,
             verification: VerificationSummary {
-                partitions_checked: verification.partitions_checked,
-                total_messages: verification.total_messages,
-                gaps_found: verification.gaps_found,
-                duplicates_found: verification.duplicates_found,
-                checksum_mismatches: verification.checksum_mismatches,
-                payload_length_mismatches: 
verification.payload_length_mismatches,
-                id_missing_fingerprint: verification.id_missing_fingerprint,
+                stable: 
NamespaceVerification::from_result(&verification.stable),
+                chaos: NamespaceVerification::from_result(&verification.chaos),
                 passed: verification.passed,
             },
         }
@@ -134,11 +136,11 @@ impl StressReport {
     pub fn print_summary(&self) {
         println!("\n=== STRESS TEST REPORT ===");
         println!(
-            "Duration: {:.1}s (baseline: {:.1}s, chaos: {:.1}s, drain: 
{:.1}s)",
+            "Duration: {:.1}s (baseline: {:.1}s, chaos: {:.1}s, quiesce: 
{:.1}s)",
             self.total_duration_secs,
             self.baseline_duration_secs,
             self.chaos_duration_secs,
-            self.drain_duration_secs
+            self.quiesce_duration_secs
         );
         println!(
             "API calls: {} ok, {} err",
@@ -148,25 +150,47 @@ impl StressReport {
             "Errors: {} expected, {} unexpected",
             self.error_tiers.expected, self.error_tiers.unexpected
         );
+        let s = &self.verification.stable;
+        println!(
+            "Stable: {} partitions, {} msgs, {} gaps, {} dups, {} checksum -> 
{}",
+            s.partitions_checked,
+            s.total_messages,
+            s.gaps_found,
+            s.duplicates_found,
+            s.checksum_mismatches,
+            if s.passed { "PASSED" } else { "FAILED" }
+        );
+        let c = &self.verification.chaos;
         println!(
-            "Verification: {} partitions, {} msgs, {} gaps, {} dups, {} 
checksum, {} len, {} id -> {}",
-            self.verification.partitions_checked,
-            self.verification.total_messages,
-            self.verification.gaps_found,
-            self.verification.duplicates_found,
-            self.verification.checksum_mismatches,
-            self.verification.payload_length_mismatches,
-            self.verification.id_missing_fingerprint,
-            if self.verification.passed {
-                "PASSED"
+            "Chaos:  {} partitions, {} msgs, integrity {} -> {}",
+            c.partitions_checked,
+            c.total_messages,
+            if c.checksum_mismatches == 0 && c.payload_length_mismatches == 0 {
+                "OK"
             } else {
-                "FAILED"
-            }
+                "FAIL"
+            },
+            if c.passed { "PASSED" } else { "FAILED" }
         );
         println!("==========================\n");
     }
 }
 
+impl NamespaceVerification {
+    const fn from_result(r: 
&crate::actors::stress::verifier::VerificationResult) -> Self {
+        Self {
+            partitions_checked: r.partitions_checked,
+            total_messages: r.total_messages,
+            gaps_found: r.gaps_found,
+            duplicates_found: r.duplicates_found,
+            checksum_mismatches: r.checksum_mismatches,
+            payload_length_mismatches: r.payload_length_mismatches,
+            id_missing_fingerprint: r.id_missing_fingerprint,
+            passed: r.passed,
+        }
+    }
+}
+
 impl ApiCallSummary {
     fn from_stats(s: &StressStats) -> Self {
         Self {
diff --git a/core/bench/src/utils/finish_condition.rs 
b/core/bench/src/utils/finish_condition.rs
index 5789008a9..ede3bd35b 100644
--- a/core/bench/src/utils/finish_condition.rs
+++ b/core/bench/src/utils/finish_condition.rs
@@ -22,7 +22,7 @@ use std::{
     fmt::Display,
     sync::{
         Arc, OnceLock,
-        atomic::{AtomicI64, Ordering},
+        atomic::{AtomicBool, AtomicI64, Ordering},
     },
     time::Instant,
 };
@@ -63,6 +63,9 @@ pub struct BenchmarkFinishCondition {
     /// Lazily initialized on first `is_elapsed()` call so the timer starts
     /// when the actor begins polling, not when the condition is constructed.
     start_time: OnceLock<Instant>,
+    /// External cancellation signal. When set to `true`, `is_done()` returns
+    /// immediately. Used by the stress test to propagate shutdown to 
data-plane actors.
+    cancelled: Option<Arc<AtomicBool>>,
 }
 
 impl BenchmarkFinishCondition {
@@ -114,6 +117,7 @@ impl BenchmarkFinishCondition {
                     left_total: 
Arc::new(AtomicI64::new(i64::from(count_per_actor))),
                     mode,
                     start_time: OnceLock::new(),
+                    cancelled: None,
                 })
             }
             (Some(size), None) => {
@@ -127,6 +131,7 @@ impl BenchmarkFinishCondition {
                     )),
                     mode,
                     start_time: OnceLock::new(),
+                    cancelled: None,
                 })
             }
             (None, None) => {
@@ -152,6 +157,7 @@ impl BenchmarkFinishCondition {
             left_total: Arc::new(AtomicI64::new(0)),
             mode: BenchmarkFinishConditionMode::Shared,
             start_time: OnceLock::new(),
+            cancelled: None,
         })
     }
 
@@ -163,10 +169,28 @@ impl BenchmarkFinishCondition {
             left_total: Arc::new(AtomicI64::new(i64::MAX)),
             mode: BenchmarkFinishConditionMode::Shared,
             start_time: OnceLock::new(),
+            cancelled: None,
+        })
+    }
+
+    /// Creates a finish condition that never expires on its own — it only 
completes
+    /// when the external `cancelled` flag is set. Used by the stress test so
+    /// data-plane actors stop in sync with chaos actors on cancellation.
+    pub fn new_cancellable(cancelled: Arc<AtomicBool>) -> Arc<Self> {
+        Arc::new(Self {
+            kind: BenchmarkFinishConditionType::Duration,
+            total: u64::MAX,
+            left_total: Arc::new(AtomicI64::new(i64::MAX)),
+            mode: BenchmarkFinishConditionMode::Shared,
+            start_time: OnceLock::new(),
+            cancelled: Some(cancelled),
         })
     }
 
     pub fn account_and_check(&self, size_to_subtract: u64) -> bool {
+        if self.is_cancelled() {
+            return true;
+        }
         match self.kind {
             BenchmarkFinishConditionType::TotalData => {
                 self.left_total.fetch_sub(
@@ -184,12 +208,21 @@ impl BenchmarkFinishCondition {
     }
 
     pub fn is_done(&self) -> bool {
+        if self.is_cancelled() {
+            return true;
+        }
         match self.kind {
             BenchmarkFinishConditionType::Duration => self.is_elapsed(),
             _ => self.left() <= 0,
         }
     }
 
+    fn is_cancelled(&self) -> bool {
+        self.cancelled
+            .as_ref()
+            .is_some_and(|c| c.load(Ordering::Acquire))
+    }
+
     fn is_elapsed(&self) -> bool {
         let start = self.start_time.get_or_init(Instant::now);
         start.elapsed() >= std::time::Duration::from_micros(self.total)

Reply via email to