This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch bench-stress in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 89c27d7099dd64c995aa379806acaeba9da1a42b Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Feb 13 20:05:41 2026 +0100 refactor --- Cargo.lock | 2 + core/bench/Cargo.toml | 2 + core/bench/src/actors/stress/admin_exerciser.rs | 16 +- .../src/actors/stress/control_plane_churner.rs | 99 +++--- core/bench/src/actors/stress/verifier.rs | 157 +++++++--- core/bench/src/args/kinds/stress/args.rs | 76 ++++- .../src/benchmarks/balanced_consumer_group.rs | 6 +- core/bench/src/benchmarks/balanced_producer.rs | 4 +- .../balanced_producer_and_consumer_group.rs | 8 +- core/bench/src/benchmarks/common.rs | 35 ++- .../end_to_end_producing_consumer_group.rs | 4 +- core/bench/src/benchmarks/mod.rs | 8 + core/bench/src/benchmarks/pinned_consumer.rs | 4 +- core/bench/src/benchmarks/pinned_producer.rs | 4 +- .../src/benchmarks/pinned_producer_and_consumer.rs | 6 +- core/bench/src/benchmarks/stress.rs | 345 +++++++++++++++------ core/bench/src/benchmarks/stress_report.rs | 76 +++-- core/bench/src/utils/finish_condition.rs | 35 ++- 18 files changed, 630 insertions(+), 257 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06476ec28..27515acbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4518,10 +4518,12 @@ dependencies = [ "serde", "sysinfo 0.38.1", "tokio", + "toml 1.0.0+spec-1.1.0", "tracing", "tracing-appender", "tracing-subscriber", "uuid", + "zip", ] [[package]] diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml index 9f020e73b..f29c64d96 100644 --- a/core/bench/Cargo.toml +++ b/core/bench/Cargo.toml @@ -50,10 +50,12 @@ rayon = { workspace = true } serde = { workspace = true } sysinfo = { workspace = true } tokio = { workspace = true } +toml = { workspace = true } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } uuid = { workspace = true } +zip = { workspace = true } [lints.clippy] enum_glob_use = "deny" diff --git a/core/bench/src/actors/stress/admin_exerciser.rs b/core/bench/src/actors/stress/admin_exerciser.rs index 46fb25bba..902cd64c4 100644 --- a/core/bench/src/actors/stress/admin_exerciser.rs +++ b/core/bench/src/actors/stress/admin_exerciser.rs @@ -18,7 +18,7 @@ use super::error_classifier; use super::stress_context::StressContext; -use crate::benchmarks::{BENCH_STREAM_PREFIX, BENCH_TOPIC_NAME}; +use crate::benchmarks::BENCH_TOPIC_NAME; use crate::utils::{ClientFactory, login_root}; use iggy::clients::client::IggyClient; use iggy::prelude::*; @@ -35,13 +35,19 @@ const ADMIN_CYCLE_INTERVAL_SECS: u64 = 15; pub struct AdminExerciser { client_factory: Arc<dyn ClientFactory>, ctx: Arc<StressContext>, + stable_stream_name: String, } impl AdminExerciser { - pub fn new(client_factory: Arc<dyn ClientFactory>, ctx: Arc<StressContext>) -> Self { + pub fn new( + client_factory: Arc<dyn ClientFactory>, + ctx: Arc<StressContext>, + stable_stream_name: String, + ) -> Self { Self { client_factory, ctx, + stable_stream_name, } } @@ -142,7 +148,8 @@ impl AdminExerciser { } async fn offset_lifecycle(&self, client: &IggyClient, cycle: u64) { - let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1") + let stream_id: Identifier = self + .stable_stream_name .as_str() .try_into() .expect("valid identifier"); @@ -190,7 +197,8 @@ impl AdminExerciser { } async fn flush_buffers(&self, client: &IggyClient) { - let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1") + let stream_id: Identifier = self + .stable_stream_name .as_str() .try_into() .expect("valid identifier"); diff --git a/core/bench/src/actors/stress/control_plane_churner.rs b/core/bench/src/actors/stress/control_plane_churner.rs index e9e1e930c..1eede90af 100644 --- a/core/bench/src/actors/stress/control_plane_churner.rs +++ b/core/bench/src/actors/stress/control_plane_churner.rs @@ -20,13 +20,14 @@ use super::error_classifier; use super::stress_context::StressContext; use crate::args::kinds::stress::args::ApiMix; use crate::benchmarks::{ - BENCH_STREAM_PREFIX, BENCH_TOPIC_NAME, CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX, + BENCH_TOPIC_NAME, CHAOS_STREAM_PREFIX, CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX, + STABLE_STREAM_PREFIX, }; use crate::utils::{ClientFactory, login_root}; use iggy::clients::client::IggyClient; use iggy::prelude::*; use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; +use rand::{RngExt, SeedableRng}; use std::sync::Arc; use std::sync::atomic::Ordering; use tracing::{debug, warn}; @@ -72,14 +73,11 @@ const ALL_OPS: [ChurnOp; 9] = [ /// so that `DeleteAndRecreateTopic` can recreate with the same parameters. pub struct ChurnerConfig { pub api_mix: ApiMix, - pub streams: u32, + pub stable_streams: u32, + pub chaos_streams: u32, pub partitions: u32, - pub consumer_groups: u32, pub message_expiry: IggyExpiry, pub max_topic_size: MaxTopicSize, - /// Deadline after which data-destructive ops (`PurgeTopic`, `PurgeStream`, - /// `DeleteAndRecreateTopic`) are suppressed so messages survive for verification. - pub purge_cutoff: std::time::Instant, } /// Periodically executes CRUD lifecycle operations against the server. @@ -95,12 +93,11 @@ pub struct ControlPlaneChurner { churn_interval: std::time::Duration, rng: StdRng, api_mix: ApiMix, - streams: u32, + stable_streams: u32, + chaos_streams: u32, partitions: u32, - consumer_groups: u32, message_expiry: IggyExpiry, max_topic_size: MaxTopicSize, - purge_cutoff: std::time::Instant, } impl ControlPlaneChurner { @@ -120,12 +117,11 @@ impl ControlPlaneChurner { churn_interval: churn_interval.get_duration(), rng, api_mix: config.api_mix, - streams: config.streams, + stable_streams: config.stable_streams, + chaos_streams: config.chaos_streams, partitions: config.partitions, - consumer_groups: config.consumer_groups, message_expiry: config.message_expiry, max_topic_size: config.max_topic_size, - purge_cutoff: config.purge_cutoff, } } @@ -151,8 +147,6 @@ impl ControlPlaneChurner { self.churner_id, op ); - let past_purge_cutoff = std::time::Instant::now() > self.purge_cutoff; - match op { ChurnOp::CreateDeleteTopic => { self.create_delete_topic(&client, cycle).await; @@ -163,12 +157,6 @@ impl ControlPlaneChurner { ChurnOp::ConsumerGroupJoinLeave => { self.consumer_group_join_leave(&client).await; } - ChurnOp::PurgeTopic if past_purge_cutoff => { - debug!( - "Churner #{}: skipping PurgeTopic (past purge cutoff)", - self.churner_id - ); - } ChurnOp::PurgeTopic => { self.purge_random_topic(&client).await; } @@ -181,21 +169,9 @@ impl ControlPlaneChurner { ChurnOp::StressPoll => { self.stress_poll(&client).await; } - ChurnOp::DeleteAndRecreateTopic if past_purge_cutoff => { - debug!( - "Churner #{}: skipping DeleteAndRecreateTopic (past purge cutoff)", - self.churner_id - ); - } ChurnOp::DeleteAndRecreateTopic => { self.delete_and_recreate_topic(&client).await; } - ChurnOp::PurgeStream if past_purge_cutoff => { - debug!( - "Churner #{}: skipping PurgeStream (past purge cutoff)", - self.churner_id - ); - } ChurnOp::PurgeStream => { self.purge_stream(&client).await; } @@ -206,25 +182,39 @@ impl ControlPlaneChurner { } } - fn random_stream_id(&mut self) -> Identifier { - let stream_idx = self.rng.random_range(1..=self.streams); - format!("{BENCH_STREAM_PREFIX}-{stream_idx}") + fn random_chaos_stream_id(&mut self) -> Identifier { + let idx = self.rng.random_range(1..=self.chaos_streams); + format!("{CHAOS_STREAM_PREFIX}-{idx}") .as_str() .try_into() .expect("valid identifier") } + fn random_any_stream_id(&mut self) -> Identifier { + let total = self.stable_streams + self.chaos_streams; + let idx = self.rng.random_range(1..=total); + if idx <= self.stable_streams { + format!("{STABLE_STREAM_PREFIX}-{idx}") + .as_str() + .try_into() + .expect("valid identifier") + } else { + let chaos_idx = idx - self.stable_streams; + format!("{CHAOS_STREAM_PREFIX}-{chaos_idx}") + .as_str() + .try_into() + .expect("valid identifier") + } + } + fn topic_id() -> Identifier { BENCH_TOPIC_NAME.try_into().expect("valid identifier") } // --- Original ops --- - async fn create_delete_topic(&self, client: &IggyClient, cycle: u64) { - let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1") - .as_str() - .try_into() - .expect("valid identifier"); + async fn create_delete_topic(&mut self, client: &IggyClient, cycle: u64) { + let stream_id = self.random_chaos_stream_id(); let topic_name = format!("churn-{}-{cycle}", self.churner_id); match client @@ -234,8 +224,8 @@ impl ControlPlaneChurner { 1, CompressionAlgorithm::default(), None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, + self.message_expiry, + self.max_topic_size, ) .await { @@ -285,11 +275,8 @@ impl ControlPlaneChurner { } } - async fn add_remove_partitions(&self, client: &IggyClient) { - let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-1") - .as_str() - .try_into() - .expect("valid identifier"); + async fn add_remove_partitions(&mut self, client: &IggyClient) { + let stream_id = self.random_chaos_stream_id(); let topic_id = Self::topic_id(); match client.create_partitions(&stream_id, &topic_id, 1).await { @@ -338,8 +325,8 @@ impl ControlPlaneChurner { /// Joins and leaves a bench consumer group, forcing a rebalance /// mid-poll that races against active data-plane consumers. async fn consumer_group_join_leave(&mut self, client: &IggyClient) { - let stream_idx = self.rng.random_range(1..=self.consumer_groups); - let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-{stream_idx}") + let stream_idx = self.rng.random_range(1..=self.stable_streams); + let stream_id: Identifier = format!("{STABLE_STREAM_PREFIX}-{stream_idx}") .as_str() .try_into() .expect("valid identifier"); @@ -391,7 +378,7 @@ impl ControlPlaneChurner { } async fn purge_random_topic(&mut self, client: &IggyClient) { - let stream_id = self.random_stream_id(); + let stream_id = self.random_chaos_stream_id(); let topic_id = Self::topic_id(); match client.purge_topic(&stream_id, &topic_id).await { @@ -415,7 +402,7 @@ impl ControlPlaneChurner { // --- New safe ops --- async fn delete_segments(&mut self, client: &IggyClient) { - let stream_id = self.random_stream_id(); + let stream_id = self.random_chaos_stream_id(); let topic_id = Self::topic_id(); let partition_id = self.rng.random_range(0..self.partitions); @@ -441,7 +428,7 @@ impl ControlPlaneChurner { } async fn update_topic(&mut self, client: &IggyClient) { - let stream_id = self.random_stream_id(); + let stream_id = self.random_chaos_stream_id(); let topic_id = Self::topic_id(); let compression = if self.rng.random_bool(0.5) { @@ -482,7 +469,7 @@ impl ControlPlaneChurner { /// One-off polls with First/Last/Timestamp strategies — these are "victim" /// operations that race against concurrent segment mutations. async fn stress_poll(&mut self, client: &IggyClient) { - let stream_id = self.random_stream_id(); + let stream_id = self.random_any_stream_id(); let topic_id = Self::topic_id(); let partition_id = self.rng.random_range(0..self.partitions); let consumer = Consumer::new(Identifier::numeric(8888).expect("valid consumer id")); @@ -528,7 +515,7 @@ impl ControlPlaneChurner { /// Exercises 23+ `expect()` panic sites reachable when consumers /// hold stale topic references during deletion. async fn delete_and_recreate_topic(&mut self, client: &IggyClient) { - let stream_id = self.random_stream_id(); + let stream_id = self.random_chaos_stream_id(); let topic_id = Self::topic_id(); match client.delete_topic(&stream_id, &topic_id).await { @@ -583,7 +570,7 @@ impl ControlPlaneChurner { } async fn purge_stream(&mut self, client: &IggyClient) { - let stream_id = self.random_stream_id(); + let stream_id = self.random_chaos_stream_id(); match client.purge_stream(&stream_id).await { Ok(()) => { diff --git a/core/bench/src/actors/stress/verifier.rs b/core/bench/src/actors/stress/verifier.rs index 825a63c1c..bed914690 100644 --- a/core/bench/src/actors/stress/verifier.rs +++ b/core/bench/src/actors/stress/verifier.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::benchmarks::{BENCH_STREAM_PREFIX, BENCH_TOPIC_NAME}; +use crate::benchmarks::{BENCH_TOPIC_NAME, CHAOS_STREAM_PREFIX, STABLE_STREAM_PREFIX}; use crate::utils::{ClientFactory, login_root}; use iggy::clients::client::IggyClient; use iggy::prelude::*; @@ -25,7 +25,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use tracing::{info, warn}; -/// Post-test verification results. +/// Post-test verification results for a single namespace. #[derive(Debug, Default)] pub struct VerificationResult { pub partitions_checked: u32, @@ -38,35 +38,127 @@ pub struct VerificationResult { pub passed: bool, } -/// Runs drain-phase verification: polls all partitions and checks offset continuity. -/// -/// During the stress test, messages may expire, so we verify that within each -/// partition the offsets we can still poll are monotonically increasing with no -/// gaps in the remaining range. +/// Combined verification across stable and chaos namespaces. +#[derive(Debug)] +pub struct StressVerificationResult { + pub stable: VerificationResult, + pub chaos: VerificationResult, + pub passed: bool, +} + +/// Runs drain-phase verification with namespace-aware checking: +/// - Stable streams: strict (msgs > 0, no gaps, no dups, checksums OK) +/// - Chaos streams: integrity-only (checksums + payload length OK; gaps, dups, 0 msgs acceptable) pub struct StressVerifier { client_factory: Arc<dyn ClientFactory>, - streams: u32, + stable_streams: u32, + chaos_streams: u32, partitions: u32, } impl StressVerifier { - pub fn new(client_factory: Arc<dyn ClientFactory>, streams: u32, partitions: u32) -> Self { + pub fn new( + client_factory: Arc<dyn ClientFactory>, + stable_streams: u32, + chaos_streams: u32, + partitions: u32, + ) -> Self { Self { client_factory, - streams, + stable_streams, + chaos_streams, partitions, } } - pub async fn verify(&self) -> VerificationResult { + pub async fn verify(&self) -> StressVerificationResult { let client = self.client_factory.create_client().await; let client = IggyClient::create(client, None, None); login_root(&client).await; + let stable = self + .verify_namespace(&client, STABLE_STREAM_PREFIX, self.stable_streams) + .await; + let chaos = self + .verify_namespace(&client, CHAOS_STREAM_PREFIX, self.chaos_streams) + .await; + + // Stable: strict verification + let stable_passed = stable.total_messages > 0 + && stable.gaps_found == 0 + && stable.duplicates_found == 0 + && stable.checksum_mismatches == 0 + && stable.payload_length_mismatches == 0; + + // Chaos: integrity-only (gaps, dups, 0 msgs are acceptable) + let chaos_passed = chaos.checksum_mismatches == 0 && chaos.payload_length_mismatches == 0; + + let stable = VerificationResult { + passed: stable_passed, + ..stable + }; + let chaos = VerificationResult { + passed: chaos_passed, + ..chaos + }; + + let passed = stable.passed && chaos.passed; + + if stable.passed { + info!( + "Stable verification PASSED: {} partitions, {} msgs, 0 gaps, 0 dups, 0 checksum", + stable.partitions_checked, stable.total_messages + ); + } else { + warn!( + "Stable verification FAILED: {} partitions, {} msgs, {} gaps, {} dups, {} checksum, {} len", + stable.partitions_checked, + stable.total_messages, + stable.gaps_found, + stable.duplicates_found, + stable.checksum_mismatches, + stable.payload_length_mismatches, + ); + } + + if chaos.passed { + info!( + "Chaos verification PASSED: {} partitions, {} msgs, integrity OK", + chaos.partitions_checked, chaos.total_messages + ); + } else { + warn!( + "Chaos verification FAILED: {} partitions, {} checksum, {} len", + chaos.partitions_checked, + chaos.checksum_mismatches, + chaos.payload_length_mismatches, + ); + } + + if stable.id_missing_fingerprint + chaos.id_missing_fingerprint > 0 { + warn!( + "Verification: {} messages missing producer ID fingerprint (server-assigned IDs)", + stable.id_missing_fingerprint + chaos.id_missing_fingerprint + ); + } + + StressVerificationResult { + stable, + chaos, + passed, + } + } + + async fn verify_namespace( + &self, + client: &IggyClient, + prefix: &str, + stream_count: u32, + ) -> VerificationResult { let mut result = VerificationResult::default(); - for stream_idx in 1..=self.streams { - let stream_id: Identifier = format!("{BENCH_STREAM_PREFIX}-{stream_idx}") + for stream_idx in 1..=stream_count { + let stream_id: Identifier = format!("{prefix}-{stream_idx}") .as_str() .try_into() .expect("valid identifier"); @@ -74,7 +166,7 @@ impl StressVerifier { for partition_id in 0..self.partitions { let partition_result = self - .verify_partition(&client, &stream_id, &topic_id, partition_id) + .verify_partition(client, &stream_id, &topic_id, partition_id) .await; result.partitions_checked += 1; result.total_messages += partition_result.total_messages; @@ -86,41 +178,6 @@ impl StressVerifier { } } - result.passed = result.total_messages > 0 - && result.gaps_found == 0 - && result.duplicates_found == 0 - && result.checksum_mismatches == 0 - && result.payload_length_mismatches == 0; - - if result.total_messages == 0 { - warn!( - "Verification FAILED: 0 messages found across {} partitions — topic may have been destroyed by chaos", - result.partitions_checked - ); - } else if result.passed { - info!( - "Verification PASSED: {} partitions, {} msgs, 0 gaps, 0 dups, 0 checksum, 0 len", - result.partitions_checked, result.total_messages - ); - } else { - warn!( - "Verification FAILED: {} partitions, {} msgs, {} gaps, {} dups, {} checksum, {} len", - result.partitions_checked, - result.total_messages, - result.gaps_found, - result.duplicates_found, - result.checksum_mismatches, - result.payload_length_mismatches, - ); - } - - if result.id_missing_fingerprint > 0 { - warn!( - "Verification: {} messages missing producer ID fingerprint (server-assigned IDs)", - result.id_missing_fingerprint - ); - } - result } @@ -162,7 +219,6 @@ impl StressVerifier { result.duplicates_found += 1; } - // Checksum re-verification: serialize and hash everything after the checksum field let raw = msg.to_bytes(); let recomputed = calculate_checksum(&raw[8..]); if msg.header.checksum != recomputed { @@ -191,7 +247,6 @@ impl StressVerifier { } } - // Check for gaps in the seen offsets if let (Some(&min), Some(&max)) = (seen_offsets.first(), seen_offsets.last()) { let expected_count = max - min + 1; let actual_count = seen_offsets.len() as u64; diff --git a/core/bench/src/args/kinds/stress/args.rs b/core/bench/src/args/kinds/stress/args.rs index ad17bb86f..ebac49648 100644 --- a/core/bench/src/args/kinds/stress/args.rs +++ b/core/bench/src/args/kinds/stress/args.rs @@ -25,9 +25,9 @@ use std::str::FromStr; const DEFAULT_PRODUCERS: NonZeroU32 = nonzero_lit::u32!(4); const DEFAULT_CONSUMERS: NonZeroU32 = nonzero_lit::u32!(4); const DEFAULT_CHURN_CONCURRENCY: NonZeroU32 = nonzero_lit::u32!(1); -const DEFAULT_STREAMS: u32 = 2; +const DEFAULT_STABLE_STREAMS: u32 = 2; +const DEFAULT_CHAOS_STREAMS: u32 = 1; const DEFAULT_PARTITIONS: u32 = 4; -const DEFAULT_CONSUMER_GROUPS: u32 = 2; /// Determines the mix of API operations exercised during the stress test. #[derive(Debug, Clone, Copy, ValueEnum, Default)] @@ -81,14 +81,30 @@ pub struct StressArgs { #[arg(long, value_enum, default_value_t = ApiMix::All)] pub api_mix: ApiMix, + /// Number of stable streams (golden logs, strict verification) + #[arg(long, default_value_t = DEFAULT_STABLE_STREAMS)] + pub stable_streams: u32, + + /// Number of chaos streams (full destruction, integrity-only verification) + #[arg(long, default_value_t = DEFAULT_CHAOS_STREAMS)] + pub chaos_streams: u32, + /// RNG seed for reproducible chaos operations #[arg(long)] pub chaos_seed: Option<u64>, + + /// Fixed duration of the baseline phase (data-plane only, no chaos) + #[arg(long, default_value = "15s", value_parser = IggyDuration::from_str)] + pub baseline_duration: IggyDuration, + + /// Fixed duration of the quiesce phase (graceful shutdown window after cancellation) + #[arg(long, default_value = "10s", value_parser = IggyDuration::from_str)] + pub quiesce_duration: IggyDuration, } impl BenchmarkKindProps for StressArgs { fn streams(&self) -> u32 { - DEFAULT_STREAMS + self.stable_streams + self.chaos_streams } fn partitions(&self) -> u32 { @@ -107,16 +123,19 @@ impl BenchmarkKindProps for StressArgs { &self.transport } + /// CGs only on stable streams fn number_of_consumer_groups(&self) -> u32 { - DEFAULT_CONSUMER_GROUPS + self.stable_streams } + /// Stable streams use `NeverExpire`; chaos uses the CLI field directly fn max_topic_size(&self) -> Option<IggyByteSize> { - Some(self.max_topic_size) + None } + /// Stable streams use `NeverExpire`; chaos uses the CLI field directly fn message_expiry(&self) -> IggyExpiry { - self.message_expiry + IggyExpiry::NeverExpire } fn validate(&self) { @@ -128,6 +147,35 @@ impl BenchmarkKindProps for StressArgs { ) .exit(); } + if self.stable_streams < 1 { + crate::args::common::IggyBenchArgs::command() + .error( + ErrorKind::ValueValidation, + "At least 1 stable stream is required", + ) + .exit(); + } + if self.chaos_streams < 1 { + crate::args::common::IggyBenchArgs::command() + .error( + ErrorKind::ValueValidation, + "At least 1 chaos stream is required", + ) + .exit(); + } + let baseline_plus_quiesce = + self.baseline_duration.get_duration() + self.quiesce_duration.get_duration(); + if baseline_plus_quiesce >= self.duration.get_duration() { + crate::args::common::IggyBenchArgs::command() + .error( + ErrorKind::ValueValidation, + format!( + "baseline_duration ({}) + quiesce_duration ({}) must be less than total duration ({})", + self.baseline_duration, self.quiesce_duration, self.duration + ), + ) + .exit(); + } } } @@ -157,4 +205,20 @@ impl StressArgs { .as_nanos() as u64 }) } + + pub const fn stable_streams(&self) -> u32 { + self.stable_streams + } + + pub const fn chaos_streams(&self) -> u32 { + self.chaos_streams + } + + pub const fn baseline_duration(&self) -> IggyDuration { + self.baseline_duration + } + + pub const fn quiesce_duration(&self) -> IggyDuration { + self.quiesce_duration + } } diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs b/core/bench/src/benchmarks/balanced_consumer_group.rs index 5bf6026e3..bf49c565e 100644 --- a/core/bench/src/benchmarks/balanced_consumer_group.rs +++ b/core/bench/src/benchmarks/balanced_consumer_group.rs @@ -17,6 +17,7 @@ */ use super::benchmark::Benchmarkable; +use crate::benchmarks::bench_stream_names; use crate::utils::ClientFactory; use crate::{ args::common::IggyBenchArgs, @@ -52,10 +53,11 @@ impl Benchmarkable for BalancedConsumerGroupBenchmark { let cf = &self.client_factory; let args = self.args.clone(); let mut tasks: JoinSet<_> = JoinSet::new(); + let names = bench_stream_names(args.number_of_consumer_groups()); - init_consumer_groups(cf, &args).await?; + init_consumer_groups(cf, &names).await?; - let consumer_futures = build_consumer_futures(cf, &args); + let consumer_futures = build_consumer_futures(cf, &args, &names, None); for fut in consumer_futures { tasks.spawn(fut); } diff --git a/core/bench/src/benchmarks/balanced_producer.rs b/core/bench/src/benchmarks/balanced_producer.rs index b9e7aa892..53de2c05a 100644 --- a/core/bench/src/benchmarks/balanced_producer.rs +++ b/core/bench/src/benchmarks/balanced_producer.rs @@ -18,6 +18,7 @@ use super::benchmark::Benchmarkable; use crate::args::common::IggyBenchArgs; +use crate::benchmarks::bench_stream_names; use crate::benchmarks::common::build_producer_futures; use crate::utils::ClientFactory; use async_trait::async_trait; @@ -50,7 +51,8 @@ impl Benchmarkable for BalancedProducerBenchmark { self.init_streams().await?; let cf = &self.client_factory; let args = self.args.clone(); - let producer_futures = build_producer_futures(cf, &args); + let names = bench_stream_names(args.streams()); + let producer_futures = build_producer_futures(cf, &args, &names, None); let mut tasks = JoinSet::new(); for fut in producer_futures { diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs index 10ed9bc4b..e8d0b0e6e 100644 --- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs +++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs @@ -17,6 +17,7 @@ */ use super::benchmark::Benchmarkable; +use crate::benchmarks::bench_stream_names; use crate::utils::ClientFactory; use crate::{ args::common::IggyBenchArgs, @@ -52,11 +53,12 @@ impl Benchmarkable for BalancedProducerAndConsumerGroupBenchmark { let cf = &self.client_factory; let args = self.args.clone(); let mut tasks: JoinSet<_> = JoinSet::new(); + let names = bench_stream_names(args.streams()); - init_consumer_groups(cf, &args).await?; + init_consumer_groups(cf, &names).await?; - let producer_futures = build_producer_futures(cf, &args); - let consumer_futures = build_consumer_futures(cf, &args); + let producer_futures = build_producer_futures(cf, &args, &names, None); + let consumer_futures = build_consumer_futures(cf, &args, &names, None); for fut in producer_futures { tasks.spawn(fut); diff --git a/core/bench/src/benchmarks/common.rs b/core/bench/src/benchmarks/common.rs index 856a6a90c..6d8d4a910 100644 --- a/core/bench/src/benchmarks/common.rs +++ b/core/bench/src/benchmarks/common.rs @@ -70,16 +70,15 @@ pub fn rate_limit_per_actor(total_rate: Option<IggyByteSize>, actors: u32) -> Op #[allow(clippy::cognitive_complexity)] pub async fn init_consumer_groups( client_factory: &Arc<dyn ClientFactory>, - args: &IggyBenchArgs, + stream_names: &[String], ) -> Result<(), IggyError> { let client = client_factory.create_client().await; let client = IggyClient::create(client, None, None); - let cg_count = args.number_of_consumer_groups(); login_root(&client).await; - for i in 0..cg_count { - let consumer_group_id = CONSUMER_GROUP_BASE_ID + i; - let stream_name = format!("bench-stream-{}", i + 1); + for (i, stream_name) in stream_names.iter().enumerate() { + #[allow(clippy::cast_possible_truncation)] + let consumer_group_id = CONSUMER_GROUP_BASE_ID + i as u32; let stream_id: Identifier = stream_name.as_str().try_into()?; let topic_id: Identifier = "topic-1".try_into()?; let consumer_group_name = format!("{CONSUMER_GROUP_NAME_PREFIX}-{consumer_group_id}"); @@ -100,8 +99,10 @@ pub async fn init_consumer_groups( pub fn build_producer_futures( client_factory: &Arc<dyn ClientFactory>, args: &IggyBenchArgs, + stream_names: &[String], + finish_condition_override: Option<Arc<BenchmarkFinishCondition>>, ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send + use<>> { - let streams = args.streams(); + let stream_count = u32::try_from(stream_names.len()).expect("too many streams"); let partitions = args.number_of_partitions(); let producers = args.producers(); let actors = args.producers() + args.consumers(); @@ -111,8 +112,9 @@ pub fn build_producer_futures( let sampling_time = args.sampling_time(); let moving_average_window = args.moving_average_window(); let kind = args.kind(); - let shared_finish_condition = - BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::Shared); + let shared_finish_condition = finish_condition_override.unwrap_or_else(|| { + BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::Shared) + }); let rate_limit = rate_limit_per_actor(args.rate_limit(), actors); let use_high_level_api = args.high_level_api(); @@ -126,8 +128,7 @@ pub fn build_producer_futures( BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::PerProducer) }; - let stream_idx = 1 + ((producer_id - 1) % streams); - let stream_id = format!("bench-stream-{stream_idx}"); + let stream_id = stream_names[((producer_id - 1) % stream_count) as usize].clone(); async move { let producer = TypedBenchmarkProducer::new( @@ -154,8 +155,11 @@ pub fn build_producer_futures( pub fn build_consumer_futures( client_factory: &Arc<dyn ClientFactory>, args: &IggyBenchArgs, + stream_names: &[String], + finish_condition_override: Option<Arc<BenchmarkFinishCondition>>, ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send + use<>> { let cg_count = args.number_of_consumer_groups(); + let stream_count = stream_names.len(); let consumers = args.consumers(); let actors = args.producers() + args.consumers(); let warmup_time = args.warmup_time(); @@ -176,8 +180,9 @@ pub fn build_consumer_futures( _ => unreachable!(), }; - let global_finish_condition = - BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::Shared); + let global_finish_condition = finish_condition_override.unwrap_or_else(|| { + BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::Shared) + }); // When measuring true E2E latency, apply read_amplification multiplier to consumer rate // to ensure they can keep up with producers and not inflate latency due to queue buildup let read_amplification = args.read_amplification().unwrap_or(1.0); @@ -205,11 +210,11 @@ pub fn build_consumer_futures( BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::PerConsumer) }; let stream_idx = if cg_count > 0 { - 1 + ((consumer_id - 1) % cg_count) + ((consumer_id - 1) % cg_count) as usize } else { - consumer_id + (consumer_id - 1) as usize }; - let stream_id = format!("bench-stream-{stream_idx}"); + let stream_id = stream_names[stream_idx % stream_count].clone(); // Each stream has exactly one CG, server assigns IDs starting from 0 let consumer_group_id = if cg_count > 0 { Some(CONSUMER_GROUP_BASE_ID) diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs index 2ef5cd190..27d879b57 100644 --- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs +++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs @@ -17,6 +17,7 @@ */ use crate::args::common::IggyBenchArgs; +use crate::benchmarks::bench_stream_names; use crate::benchmarks::common::{build_producing_consumer_groups_futures, init_consumer_groups}; use crate::utils::ClientFactory; use async_trait::async_trait; @@ -52,8 +53,9 @@ impl Benchmarkable for EndToEndProducingConsumerGroupBenchmark { let cf = self.client_factory.clone(); let args = self.args.clone(); let mut tasks = JoinSet::new(); + let names = bench_stream_names(args.number_of_consumer_groups()); - init_consumer_groups(&cf, &args).await?; + init_consumer_groups(&cf, &names).await?; let futures = build_producing_consumer_groups_futures(cf, args); for fut in futures { diff --git a/core/bench/src/benchmarks/mod.rs b/core/bench/src/benchmarks/mod.rs index b97ac1f74..f6d70ea0d 100644 --- a/core/bench/src/benchmarks/mod.rs +++ b/core/bench/src/benchmarks/mod.rs @@ -33,4 +33,12 @@ pub mod stress_report; pub const CONSUMER_GROUP_BASE_ID: u32 = 0; pub const CONSUMER_GROUP_NAME_PREFIX: &str = "cg"; pub const BENCH_STREAM_PREFIX: &str = "bench-stream"; +pub const STABLE_STREAM_PREFIX: &str = "stable"; +pub const CHAOS_STREAM_PREFIX: &str = "chaos"; pub const BENCH_TOPIC_NAME: &str = "topic-1"; + +pub fn bench_stream_names(count: u32) -> Vec<String> { + (1..=count) + .map(|i| format!("{BENCH_STREAM_PREFIX}-{i}")) + .collect() +} diff --git a/core/bench/src/benchmarks/pinned_consumer.rs b/core/bench/src/benchmarks/pinned_consumer.rs index 4c44b9fec..3d5c59ac5 100644 --- a/core/bench/src/benchmarks/pinned_consumer.rs +++ b/core/bench/src/benchmarks/pinned_consumer.rs @@ -17,6 +17,7 @@ */ use crate::args::common::IggyBenchArgs; +use crate::benchmarks::bench_stream_names; use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::common::build_consumer_futures; use crate::utils::ClientFactory; @@ -51,8 +52,9 @@ impl Benchmarkable for PinnedConsumerBenchmark { let cf = &self.client_factory; let args = self.args.clone(); let mut tasks: JoinSet<_> = JoinSet::new(); + let names = bench_stream_names(args.streams()); - let futures = build_consumer_futures(cf, &args); + let futures = build_consumer_futures(cf, &args, &names, None); for fut in futures { tasks.spawn(fut); } diff --git a/core/bench/src/benchmarks/pinned_producer.rs b/core/bench/src/benchmarks/pinned_producer.rs index 8630eb573..b4c605c53 100644 --- a/core/bench/src/benchmarks/pinned_producer.rs +++ b/core/bench/src/benchmarks/pinned_producer.rs @@ -17,6 +17,7 @@ */ use crate::args::common::IggyBenchArgs; +use crate::benchmarks::bench_stream_names; use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::common::build_producer_futures; use crate::utils::ClientFactory; @@ -51,8 +52,9 @@ impl Benchmarkable for PinnedProducerBenchmark { let client_factory = &self.client_factory; let args = self.args.clone(); let mut tasks = JoinSet::new(); + let names = bench_stream_names(args.streams()); - let producer_futures = build_producer_futures(client_factory, &args); + let producer_futures = build_producer_futures(client_factory, &args, &names, None); for fut in producer_futures { tasks.spawn(fut); diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs index 8f2630396..f2fa95317 100644 --- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs +++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs @@ -16,6 +16,7 @@ // under the License. use crate::args::common::IggyBenchArgs; +use crate::benchmarks::bench_stream_names; use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::common::{build_consumer_futures, build_producer_futures}; use crate::utils::ClientFactory; @@ -50,9 +51,10 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark { let cf = &self.client_factory; let args = self.args.clone(); let mut tasks = JoinSet::new(); + let names = bench_stream_names(args.streams()); - let producer_futures = build_producer_futures(cf, &args); - let consumer_futures = build_consumer_futures(cf, &args); + let producer_futures = build_producer_futures(cf, &args, &names, None); + let consumer_futures = build_consumer_futures(cf, &args, &names, None); for fut in producer_futures { tasks.spawn(fut); diff --git a/core/bench/src/benchmarks/stress.rs b/core/bench/src/benchmarks/stress.rs index 62bf35df8..734e2cd98 100644 --- a/core/bench/src/benchmarks/stress.rs +++ b/core/bench/src/benchmarks/stress.rs @@ -19,6 +19,7 @@ use super::benchmark::Benchmarkable; use super::common::{build_consumer_futures, build_producer_futures, init_consumer_groups}; use super::stress_report::StressReport; +use super::{BENCH_TOPIC_NAME, CHAOS_STREAM_PREFIX, STABLE_STREAM_PREFIX}; use crate::actors::stress::admin_exerciser::AdminExerciser; use crate::actors::stress::control_plane_churner::{ChurnerConfig, ControlPlaneChurner}; use crate::actors::stress::error_classifier; @@ -28,25 +29,22 @@ use crate::actors::stress::verifier::StressVerifier; use crate::args::common::IggyBenchArgs; use crate::args::kind::BenchmarkKindCommand; use crate::args::kinds::stress::args::ApiMix; -use crate::utils::ClientFactory; +use crate::utils::finish_condition::BenchmarkFinishCondition; +use crate::utils::{ClientFactory, login_root}; use async_trait::async_trait; use bench_report::{ actor_kind::ActorKind, benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics, }; use iggy::prelude::*; +use std::io::{Cursor, Read as _}; +use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use tokio::task::JoinSet; use tracing::{info, warn}; -/// Phase durations as fractions of total test time. -const BASELINE_FRACTION: f64 = 0.15; -const CHAOS_FRACTION: f64 = 0.65; -/// Max drain phase duration. -const MAX_DRAIN_SECS: u64 = 300; - pub struct StressBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, @@ -69,15 +67,10 @@ impl StressBenchmark { fn compute_phase_durations(&self) -> (Duration, Duration, Duration) { let total = self.stress_args().duration().get_duration(); - let total_secs = total.as_secs_f64(); - - let baseline = Duration::from_secs_f64(total_secs * BASELINE_FRACTION); - let chaos = Duration::from_secs_f64(total_secs * CHAOS_FRACTION); - #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] - let drain_secs = (total_secs * (1.0 - BASELINE_FRACTION - CHAOS_FRACTION)) as u64; - let drain = Duration::from_secs(drain_secs.min(MAX_DRAIN_SECS)); - - (baseline, chaos, drain) + let baseline = self.stress_args().baseline_duration().get_duration(); + let quiesce = self.stress_args().quiesce_duration().get_duration(); + let chaos = total.saturating_sub(baseline + quiesce); + (baseline, chaos, quiesce) } /// Spawns chaos actors: control-plane churner, admin exerciser, health monitor. @@ -85,34 +78,28 @@ impl StressBenchmark { &self, tasks: &mut JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, ctx: &Arc<StressContext>, - chaos_duration: Duration, ) { let stress_args = self.stress_args(); let api_mix = stress_args.api_mix(); let chaos_seed = stress_args.chaos_seed(); - // Health monitor always runs let monitor = HealthMonitor::new(self.client_factory.clone(), ctx.clone()); tasks.spawn(async move { monitor.run().await; Ok(BenchmarkIndividualMetrics::placeholder("health_monitor")) }); - // Control-plane churner(s) unless data-plane-only if !matches!(api_mix, ApiMix::DataPlaneOnly) { let churn_concurrency = stress_args.churn_concurrency().get(); let churn_interval = stress_args.churn_interval(); - let purge_cutoff = Instant::now() + chaos_duration / 2; - let churner_config = ChurnerConfig { api_mix, - streams: self.args.streams(), + stable_streams: stress_args.stable_streams(), + chaos_streams: stress_args.chaos_streams(), partitions: self.args.number_of_partitions(), - consumer_groups: self.args.number_of_consumer_groups(), message_expiry: stress_args.message_expiry, max_topic_size: MaxTopicSize::Custom(stress_args.max_topic_size), - purge_cutoff, }; for i in 0..churn_concurrency { @@ -133,84 +120,123 @@ impl StressBenchmark { } } - // Admin exerciser for mixed/all modes if matches!(api_mix, ApiMix::Mixed | ApiMix::All) { - let admin = AdminExerciser::new(self.client_factory.clone(), ctx.clone()); + let admin = AdminExerciser::new( + self.client_factory.clone(), + ctx.clone(), + format!("{STABLE_STREAM_PREFIX}-1"), + ); tasks.spawn(async move { admin.run().await; Ok(BenchmarkIndividualMetrics::placeholder("admin_exerciser")) }); } } -} - -#[async_trait] -impl Benchmarkable for StressBenchmark { - async fn run( - &mut self, - ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { - let overall_start = Instant::now(); - let (baseline_duration, chaos_duration, drain_max) = self.compute_phase_durations(); - let total = self.stress_args().duration().get_duration(); - let remaining_after_chaos = total.saturating_sub(baseline_duration + chaos_duration); - if drain_max < remaining_after_chaos { - warn!( - "Drain cap ({drain_max:?}) is shorter than remaining duration ({remaining_after_chaos:?}); \ - data-plane actors may be aborted before finishing. Consider shorter --duration or raising MAX_DRAIN_SECS." - ); + /// Best-effort validation of server config via snapshot API. + /// Warns about common misconfigurations but never blocks the test. + async fn validate_server_config(&self) { + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let snapshot = client + .snapshot( + SnapshotCompression::Stored, + vec![SystemSnapshotType::ServerConfig], + ) + .await; + + let Ok(snapshot) = snapshot else { + warn!("Could not fetch server config snapshot for validation; skipping checks"); + return; + }; + + let cursor = Cursor::new(snapshot.0); + let Ok(mut zip) = zip::ZipArchive::new(cursor) else { + warn!("Could not parse server config snapshot as ZIP; skipping checks"); + return; + }; + + let Ok(mut entry) = zip.by_name("server_config.txt") else { + warn!("server_config.txt not found in snapshot; skipping checks"); + return; + }; + + let mut config_str = String::new(); + if entry.read_to_string(&mut config_str).is_err() { + warn!("Could not read server_config.txt from snapshot; skipping checks"); + return; } + drop(entry); - info!( - "Stress test starting: baseline={baseline_duration:?}, chaos={chaos_duration:?}, drain_max={drain_max:?}" - ); - - self.init_streams().await?; - init_consumer_groups(&self.client_factory, &self.args).await?; - - let ctx = Arc::new(StressContext::new()); - - let mut tasks: JoinSet<Result<BenchmarkIndividualMetrics, IggyError>> = JoinSet::new(); - - // === PHASE 1: Baseline / Produce (data-plane only) === - info!("=== Phase 1: Baseline ({baseline_duration:?}) ==="); + let Ok(config) = config_str.parse::<toml::Value>() else { + warn!("Could not parse server config as TOML; skipping checks"); + return; + }; - let producer_futures = build_producer_futures(&self.client_factory, &self.args); - let consumer_futures = build_consumer_futures(&self.client_factory, &self.args); + let stress_args = self.stress_args(); - for fut in producer_futures { - tasks.spawn(fut); - } - for fut in consumer_futures { - tasks.spawn(fut); + // Check if message cleaner is enabled when message_expiry is set + if !matches!(stress_args.message_expiry, IggyExpiry::NeverExpire) { + let cleaner_enabled = config + .get("data_maintenance") + .and_then(|dm| dm.get("messages")) + .and_then(|m| m.get("cleaner_enabled")) + .and_then(toml::Value::as_bool) + .unwrap_or(false); + + if !cleaner_enabled { + warn!( + "Server has data_maintenance.messages.cleaner_enabled=false but stress test \ + uses message_expiry={}. Messages will not be cleaned up. \ + Set IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true to enable cleanup.", + stress_args.message_expiry + ); + } } - tokio::time::sleep(baseline_duration).await; - let baseline_elapsed = overall_start.elapsed(); - info!("Baseline phase completed in {baseline_elapsed:?}"); - - // === PHASE 2: Chaos (add churners + admin + health alongside ongoing data-plane) === - info!("=== Phase 2: Chaos ({chaos_duration:?}) ==="); - - self.spawn_chaos_actors(&mut tasks, &ctx, chaos_duration); - - tokio::time::sleep(chaos_duration).await; - let chaos_elapsed = overall_start.elapsed(); - info!("Chaos phase completed in {chaos_elapsed:?}"); + // Check segment size vs max_topic_size + let segment_size = config + .get("system") + .and_then(|s| s.get("segment")) + .and_then(|s| s.get("size")) + .and_then(toml::Value::as_str) + .and_then(|s| IggyByteSize::from_str(s).ok()); + + if let Some(seg_size) = segment_size { + let max_reasonable = IggyByteSize::from_str("64MiB").expect("valid constant"); + if seg_size > max_reasonable { + warn!( + "Server segment size ({seg_size}) is large for stress testing. \ + Consider IGGY_SYSTEM_SEGMENT_SIZE=\"16MiB\" for faster segment \ + rotation and more race surface." + ); + } + } - // === PHASE 3: Drain === - info!("=== Phase 3: Drain (max {drain_max:?}) ==="); - ctx.cancel(); + info!("Server config validation complete"); + } - let drain_start = Instant::now(); - let drain_deadline = drain_start + drain_max; + /// Drains the `JoinSet` within the quiesce window, collecting data-plane batch + /// counts and recording errors. Aborts remaining actors if the deadline expires. + async fn drain_actors( + tasks: &mut JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, + ctx: &StressContext, + quiesce_duration: Duration, + ) -> Duration { + let quiesce_start = Instant::now(); + let quiesce_deadline = quiesce_start + quiesce_duration; let mut producer_batches: u64 = 0; let mut consumer_batches: u64 = 0; while !tasks.is_empty() { - let remaining = drain_deadline.saturating_duration_since(Instant::now()); + let remaining = quiesce_deadline.saturating_duration_since(Instant::now()); if remaining.is_zero() { - warn!("Drain phase timed out, aborting remaining tasks"); + warn!( + "{} actors didn't finish within quiesce window, aborting", + tasks.len() + ); tasks.abort_all(); break; } @@ -230,11 +256,14 @@ impl Benchmarkable for StressBenchmark { error_classifier::record_error(&ctx.stats, &e); } Ok(Some(Err(e))) => { - warn!("Actor join failed: {e}"); + warn!("Actor join error: {e}"); } Ok(None) => break, Err(_) => { - warn!("Drain phase timed out"); + warn!( + "{} actors didn't finish within quiesce window, aborting", + tasks.len() + ); tasks.abort_all(); break; } @@ -248,13 +277,94 @@ impl Benchmarkable for StressBenchmark { .poll_messages_ok .fetch_add(consumer_batches, Ordering::Relaxed); - let drain_elapsed = drain_start.elapsed(); + quiesce_start.elapsed() + } +} + +#[async_trait] +impl Benchmarkable for StressBenchmark { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { + let overall_start = Instant::now(); + let stress_args = self.stress_args(); + let stable_count = stress_args.stable_streams(); + let chaos_count = stress_args.chaos_streams(); + let (baseline_duration, chaos_duration, quiesce_duration) = self.compute_phase_durations(); + + info!( + "Stress test starting: baseline={baseline_duration:?}, chaos={chaos_duration:?}, quiesce={quiesce_duration:?}" + ); + + self.validate_server_config().await; + + self.init_streams().await?; + + let stable_names: Vec<String> = (1..=stable_count) + .map(|i| format!("{STABLE_STREAM_PREFIX}-{i}")) + .collect(); + let chaos_names: Vec<String> = (1..=chaos_count) + .map(|i| format!("{CHAOS_STREAM_PREFIX}-{i}")) + .collect(); + let all_stream_names: Vec<String> = + stable_names.iter().chain(&chaos_names).cloned().collect(); + + init_consumer_groups(&self.client_factory, &stable_names).await?; + + let ctx = Arc::new(StressContext::new()); + + let finish_condition = BenchmarkFinishCondition::new_cancellable(ctx.cancelled.clone()); + + let mut tasks: JoinSet<Result<BenchmarkIndividualMetrics, IggyError>> = JoinSet::new(); + + // === PHASE 1: Baseline (data-plane only) === + info!("=== Phase 1: Baseline ({baseline_duration:?}) ==="); + + let producer_futures = build_producer_futures( + &self.client_factory, + &self.args, + &all_stream_names, + Some(finish_condition.clone()), + ); + let consumer_futures = build_consumer_futures( + &self.client_factory, + &self.args, + &stable_names, + Some(finish_condition), + ); + + for fut in producer_futures { + tasks.spawn(fut); + } + for fut in consumer_futures { + tasks.spawn(fut); + } + + tokio::time::sleep(baseline_duration).await; + let baseline_elapsed = overall_start.elapsed(); + info!("Baseline phase completed in {baseline_elapsed:?}"); + + // === PHASE 2: Chaos (add churners + admin + health alongside ongoing data-plane) === + info!("=== Phase 2: Chaos ({chaos_duration:?}) ==="); + + self.spawn_chaos_actors(&mut tasks, &ctx); + + tokio::time::sleep(chaos_duration).await; + let chaos_elapsed = overall_start.elapsed(); + info!("Chaos phase completed in {chaos_elapsed:?}"); + + // === PHASE 3: Quiesce === + info!("=== Phase 3: Quiesce ({quiesce_duration:?}) ==="); + ctx.cancel(); + + let quiesce_elapsed = Self::drain_actors(&mut tasks, &ctx, quiesce_duration).await; // === Verification === info!("Running post-test verification..."); let verifier = StressVerifier::new( self.client_factory.clone(), - self.args.streams(), + stable_count, + chaos_count, self.args.number_of_partitions(), ); let verification = verifier.verify().await; @@ -265,7 +375,7 @@ impl Benchmarkable for StressBenchmark { overall_start.elapsed(), baseline_duration, chaos_duration, - drain_elapsed, + quiesce_elapsed, ); report.print_summary(); @@ -276,6 +386,63 @@ impl Benchmarkable for StressBenchmark { Ok(JoinSet::new()) } + async fn init_streams(&self) -> Result<(), IggyError> { + let stress_args = self.stress_args(); + let stable_count = stress_args.stable_streams(); + let chaos_count = stress_args.chaos_streams(); + let partitions_count = self.args.number_of_partitions(); + + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let streams = client.get_streams().await?; + + // Stable streams: NeverExpire, ServerDefault size + for i in 1..=stable_count { + let stream_name = format!("{STABLE_STREAM_PREFIX}-{i}"); + if streams.iter().all(|s| s.name != stream_name) { + info!("Creating stable stream '{stream_name}'"); + client.create_stream(&stream_name).await?; + let stream_id: Identifier = stream_name.as_str().try_into()?; + client + .create_topic( + &stream_id, + BENCH_TOPIC_NAME, + partitions_count, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await?; + } + } + + // Chaos streams: user-configured expiry and max_topic_size + for i in 1..=chaos_count { + let stream_name = format!("{CHAOS_STREAM_PREFIX}-{i}"); + if streams.iter().all(|s| s.name != stream_name) { + info!("Creating chaos stream '{stream_name}'"); + client.create_stream(&stream_name).await?; + let stream_id: Identifier = stream_name.as_str().try_into()?; + client + .create_topic( + &stream_id, + BENCH_TOPIC_NAME, + partitions_count, + CompressionAlgorithm::default(), + None, + stress_args.message_expiry, + MaxTopicSize::Custom(stress_args.max_topic_size), + ) + .await?; + } + } + + Ok(()) + } + fn kind(&self) -> BenchmarkKind { BenchmarkKind::Stress } @@ -291,14 +458,18 @@ impl Benchmarkable for StressBenchmark { fn print_info(&self) { let stress_args = self.stress_args(); info!( - "Starting Stress benchmark: duration={}, producers={}, consumers={}, churn_concurrency={}, churn_interval={}, api_mix={:?}, chaos_seed={}", + "Starting Stress benchmark: duration={}, producers={}, consumers={}, stable_streams={}, chaos_streams={}, churn_concurrency={}, churn_interval={}, api_mix={:?}, chaos_seed={}, baseline={}, quiesce={}", stress_args.duration(), stress_args.producers.get(), stress_args.consumers.get(), + stress_args.stable_streams(), + stress_args.chaos_streams(), stress_args.churn_concurrency().get(), stress_args.churn_interval(), stress_args.api_mix(), stress_args.chaos_seed(), + stress_args.baseline_duration(), + stress_args.quiesce_duration(), ); } } diff --git a/core/bench/src/benchmarks/stress_report.rs b/core/bench/src/benchmarks/stress_report.rs index 6ff549f0c..129d5a42c 100644 --- a/core/bench/src/benchmarks/stress_report.rs +++ b/core/bench/src/benchmarks/stress_report.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::actors::stress::{stress_context::StressStats, verifier::VerificationResult}; +use crate::actors::stress::{stress_context::StressStats, verifier::StressVerificationResult}; use serde::Serialize; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -27,7 +27,7 @@ pub struct StressReport { pub total_duration_secs: f64, pub baseline_duration_secs: f64, pub chaos_duration_secs: f64, - pub drain_duration_secs: f64, + pub quiesce_duration_secs: f64, pub api_calls: ApiCallSummary, pub error_tiers: ErrorTierSummary, pub verification: VerificationSummary, @@ -88,6 +88,13 @@ pub struct ErrorTierSummary { #[derive(Debug, Serialize)] pub struct VerificationSummary { + pub stable: NamespaceVerification, + pub chaos: NamespaceVerification, + pub passed: bool, +} + +#[derive(Debug, Serialize)] +pub struct NamespaceVerification { pub partitions_checked: u32, pub total_messages: u64, pub gaps_found: u64, @@ -101,31 +108,26 @@ pub struct VerificationSummary { impl StressReport { pub fn build( stats: &StressStats, - verification: &VerificationResult, + verification: &StressVerificationResult, total_duration: Duration, baseline_duration: Duration, chaos_duration: Duration, - drain_duration: Duration, + quiesce_duration: Duration, ) -> Self { let api_calls = ApiCallSummary::from_stats(stats); Self { total_duration_secs: total_duration.as_secs_f64(), baseline_duration_secs: baseline_duration.as_secs_f64(), chaos_duration_secs: chaos_duration.as_secs_f64(), - drain_duration_secs: drain_duration.as_secs_f64(), + quiesce_duration_secs: quiesce_duration.as_secs_f64(), error_tiers: ErrorTierSummary { expected: stats.expected_errors.load(Ordering::Relaxed), unexpected: stats.unexpected_errors.load(Ordering::Relaxed), }, api_calls, verification: VerificationSummary { - partitions_checked: verification.partitions_checked, - total_messages: verification.total_messages, - gaps_found: verification.gaps_found, - duplicates_found: verification.duplicates_found, - checksum_mismatches: verification.checksum_mismatches, - payload_length_mismatches: verification.payload_length_mismatches, - id_missing_fingerprint: verification.id_missing_fingerprint, + stable: NamespaceVerification::from_result(&verification.stable), + chaos: NamespaceVerification::from_result(&verification.chaos), passed: verification.passed, }, } @@ -134,11 +136,11 @@ impl StressReport { pub fn print_summary(&self) { println!("\n=== STRESS TEST REPORT ==="); println!( - "Duration: {:.1}s (baseline: {:.1}s, chaos: {:.1}s, drain: {:.1}s)", + "Duration: {:.1}s (baseline: {:.1}s, chaos: {:.1}s, quiesce: {:.1}s)", self.total_duration_secs, self.baseline_duration_secs, self.chaos_duration_secs, - self.drain_duration_secs + self.quiesce_duration_secs ); println!( "API calls: {} ok, {} err", @@ -148,25 +150,47 @@ impl StressReport { "Errors: {} expected, {} unexpected", self.error_tiers.expected, self.error_tiers.unexpected ); + let s = &self.verification.stable; + println!( + "Stable: {} partitions, {} msgs, {} gaps, {} dups, {} checksum -> {}", + s.partitions_checked, + s.total_messages, + s.gaps_found, + s.duplicates_found, + s.checksum_mismatches, + if s.passed { "PASSED" } else { "FAILED" } + ); + let c = &self.verification.chaos; println!( - "Verification: {} partitions, {} msgs, {} gaps, {} dups, {} checksum, {} len, {} id -> {}", - self.verification.partitions_checked, - self.verification.total_messages, - self.verification.gaps_found, - self.verification.duplicates_found, - self.verification.checksum_mismatches, - self.verification.payload_length_mismatches, - self.verification.id_missing_fingerprint, - if self.verification.passed { - "PASSED" + "Chaos: {} partitions, {} msgs, integrity {} -> {}", + c.partitions_checked, + c.total_messages, + if c.checksum_mismatches == 0 && c.payload_length_mismatches == 0 { + "OK" } else { - "FAILED" - } + "FAIL" + }, + if c.passed { "PASSED" } else { "FAILED" } ); println!("==========================\n"); } } +impl NamespaceVerification { + const fn from_result(r: &crate::actors::stress::verifier::VerificationResult) -> Self { + Self { + partitions_checked: r.partitions_checked, + total_messages: r.total_messages, + gaps_found: r.gaps_found, + duplicates_found: r.duplicates_found, + checksum_mismatches: r.checksum_mismatches, + payload_length_mismatches: r.payload_length_mismatches, + id_missing_fingerprint: r.id_missing_fingerprint, + passed: r.passed, + } + } +} + impl ApiCallSummary { fn from_stats(s: &StressStats) -> Self { Self { diff --git a/core/bench/src/utils/finish_condition.rs b/core/bench/src/utils/finish_condition.rs index 5789008a9..ede3bd35b 100644 --- a/core/bench/src/utils/finish_condition.rs +++ b/core/bench/src/utils/finish_condition.rs @@ -22,7 +22,7 @@ use std::{ fmt::Display, sync::{ Arc, OnceLock, - atomic::{AtomicI64, Ordering}, + atomic::{AtomicBool, AtomicI64, Ordering}, }, time::Instant, }; @@ -63,6 +63,9 @@ pub struct BenchmarkFinishCondition { /// Lazily initialized on first `is_elapsed()` call so the timer starts /// when the actor begins polling, not when the condition is constructed. start_time: OnceLock<Instant>, + /// External cancellation signal. When set to `true`, `is_done()` returns + /// immediately. Used by the stress test to propagate shutdown to data-plane actors. + cancelled: Option<Arc<AtomicBool>>, } impl BenchmarkFinishCondition { @@ -114,6 +117,7 @@ impl BenchmarkFinishCondition { left_total: Arc::new(AtomicI64::new(i64::from(count_per_actor))), mode, start_time: OnceLock::new(), + cancelled: None, }) } (Some(size), None) => { @@ -127,6 +131,7 @@ impl BenchmarkFinishCondition { )), mode, start_time: OnceLock::new(), + cancelled: None, }) } (None, None) => { @@ -152,6 +157,7 @@ impl BenchmarkFinishCondition { left_total: Arc::new(AtomicI64::new(0)), mode: BenchmarkFinishConditionMode::Shared, start_time: OnceLock::new(), + cancelled: None, }) } @@ -163,10 +169,28 @@ impl BenchmarkFinishCondition { left_total: Arc::new(AtomicI64::new(i64::MAX)), mode: BenchmarkFinishConditionMode::Shared, start_time: OnceLock::new(), + cancelled: None, + }) + } + + /// Creates a finish condition that never expires on its own — it only completes + /// when the external `cancelled` flag is set. Used by the stress test so + /// data-plane actors stop in sync with chaos actors on cancellation. + pub fn new_cancellable(cancelled: Arc<AtomicBool>) -> Arc<Self> { + Arc::new(Self { + kind: BenchmarkFinishConditionType::Duration, + total: u64::MAX, + left_total: Arc::new(AtomicI64::new(i64::MAX)), + mode: BenchmarkFinishConditionMode::Shared, + start_time: OnceLock::new(), + cancelled: Some(cancelled), }) } pub fn account_and_check(&self, size_to_subtract: u64) -> bool { + if self.is_cancelled() { + return true; + } match self.kind { BenchmarkFinishConditionType::TotalData => { self.left_total.fetch_sub( @@ -184,12 +208,21 @@ impl BenchmarkFinishCondition { } pub fn is_done(&self) -> bool { + if self.is_cancelled() { + return true; + } match self.kind { BenchmarkFinishConditionType::Duration => self.is_elapsed(), _ => self.left() <= 0, } } + fn is_cancelled(&self) -> bool { + self.cancelled + .as_ref() + .is_some_and(|c| c.load(Ordering::Acquire)) + } + fn is_elapsed(&self) -> bool { let start = self.start_time.get_or_init(Instant::now); start.elapsed() >= std::time::Duration::from_micros(self.total)
