hubcio commented on code in PR #3272:
URL: https://github.com/apache/iggy/pull/3272#discussion_r3276362199


##########
core/simulator/src/workload/effect.rs:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Predicted server-state mutations emitted by op modules on commit.
+//!
+//! Name-keyed throughout. Server-ng emits empty reply bodies, so the
+//! workload cannot recover server-assigned numeric ids; shadow lookups
+//! address entities by name (`WireIdentifier::named`). Id-keyed effects
+//! return once reply-body parsing lands.
+
+use iggy_common::sharding::IggyNamespace;
+
+#[derive(Debug, Clone)]
+pub enum Effect {
+    None,
+    AddStream {
+        name: String,
+    },
+    RemoveStream {
+        name: String,
+    },
+    AddTopic {
+        stream: String,
+        name: String,
+        partitions: u32,
+    },
+    RemoveTopic {
+        stream: String,
+        name: String,
+    },
+    AddUser {
+        name: String,
+    },
+    RemoveUser {
+        name: String,
+    },
+    AddPat {
+        name: String,
+    },
+    RemovePat {
+        name: String,
+    },
+    AddConsumerGroup {
+        stream: String,
+        topic: String,
+        name: String,
+    },
+    RemoveConsumerGroup {
+        stream: String,
+        topic: String,
+        name: String,
+    },
+    SendCommitted {
+        ns: IggyNamespace,
+        count: u64,
+    },
+    OffsetStored {
+        key: (IggyNamespace, u8, u32),
+        value: u64,
+    },
+    OffsetDeleted {
+        key: (IggyNamespace, u8, u32),
+    },
+    RenameStream {
+        old: String,
+        new: String,
+    },
+    RenameTopic {
+        stream: String,
+        old: String,
+        new: String,
+    },
+    RenameUser {
+        old: String,

Review Comment:
   `RenameUser { old, new }` carries no pw field; combined with the `pw-{user}` 
reconstruction at `change_password.rs:52`, a `ChangePassword` issued after 
`RenameUser` derives the wrong password. add a pw payload to the `RenameUser` 
effect and propagate it through `shadow.rs:227-231`. bundle with the 
change_password fix.



##########
core/simulator/src/workload/auditor.rs:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Auditor: reply-order and per-client commit invariants, plus the
+//! in-flight expectation table.
+//!
+//! Entity-state tracking lives in [`crate::workload::shadow::Shadow`].
+//! Stays transport-agnostic so it can drive an Antithesis-style harness
+//! against a real server binary.
+//!
+//! Server-ng currently encodes every reply as success (`context` always 0).
+//! Once the wire protocol gains an error discriminant, expand `is_success`
+//! and validate `Failure` outcomes via 
[`crate::workload::ops::classify_reply`].
+
+use std::collections::HashMap;
+
+use iggy_binary_protocol::ReplyHeader;
+use strum::EnumCount;
+
+use crate::workload::actions::Action;
+use crate::workload::ops::InFlight;
+
+#[derive(Debug, Clone)]
+pub struct AuditorStats {
+    pub replies_seen: u64,
+    pub replies_unknown: u64,
+    /// Per-action committed counter, indexed by `Action as usize`.
+    pub commits_per_action: [u64; Action::COUNT],
+}
+
+impl Default for AuditorStats {
+    fn default() -> Self {
+        Self {
+            replies_seen: 0,
+            replies_unknown: 0,
+            commits_per_action: [0u64; Action::COUNT],
+        }
+    }
+}
+
+impl AuditorStats {
+    #[must_use]
+    pub const fn commits(&self, action: Action) -> u64 {
+        self.commits_per_action[action as usize]
+    }
+}
+
+pub struct ServerAuditor {
+    in_flight: HashMap<(u128, u64), InFlight>,
+    /// Highest `commit` op observed per `(client, namespace)`. Each VSR
+    /// group (one per partition, plus metadata at `namespace = 0`) has its
+    /// own op counter; monotonicity is per-namespace, not per-client.
+    last_commit_per_client_ns: HashMap<(u128, u64), u64>,
+    stats: AuditorStats,
+}
+
+impl ServerAuditor {
+    #[must_use]
+    pub fn new() -> Self {
+        Self {
+            in_flight: HashMap::new(),
+            last_commit_per_client_ns: HashMap::new(),
+            stats: AuditorStats::default(),
+        }
+    }
+
+    /// Record a new in-flight request keyed by `(client, request)`.
+    ///
+    /// # Panics
+    /// Panics if a request with the same key is already in flight.
+    /// `SimClient` must produce strictly monotonic request ids per client.
+    pub fn record_in_flight(&mut self, key: (u128, u64), entry: InFlight) {
+        let prev = self.in_flight.insert(key, entry);
+        assert!(
+            prev.is_none(),
+            "duplicate in-flight key {key:?}: request ids must be unique per 
client"
+        );
+    }
+
+    /// Match a reply to its in-flight entry and update the per-(client,
+    /// namespace) last-commit cursor. Returns the entry so the caller can
+    /// run outcome classification and effect application.
+    ///
+    /// Returns `None` (and bumps `replies_unknown`) when the reply has no
+    /// matching in-flight entry, duplicate cached reply on the metadata
+    /// plane, or a stale at-least-once re-execution on a partition plane
+    /// or when the reply's `header.namespace` does not match the
+    /// namespace the in-flight request was submitted to.
+    ///
+    /// The in-flight lookup runs before any watermark update so a stray
+    /// reply for an unknown key cannot advance the cursor and mask a
+    /// later legitimate regression.
+    ///
+    /// The previous strict-monotonic assert was removed: with parallel
+    /// in-flight requests across namespaces and at-least-once delivery,
+    /// replies can legitimately arrive out of `commit` order. The
+    /// in-flight cross-check already rejects unknown / misrouted
+    /// replies; cross-replica commit-order invariants belong in the
+    /// quiesce-time validator (v2.7-base).
+    pub fn on_reply(&mut self, key: (u128, u64), header: &ReplyHeader) -> 
Option<InFlight> {
+        self.stats.replies_seen += 1;
+
+        // Lookup first so a stray reply cannot advance the watermark.
+        let Some(entry) = self.in_flight.remove(&key) else {
+            self.stats.replies_unknown += 1;
+            return None;
+        };
+
+        // Reply's namespace must match the namespace the request was
+        // submitted to. A mismatch means the reply landed in the wrong
+        // VSR group's bookkeeping; refuse to apply effects against the
+        // wrong shadow bucket.
+        if entry.request_namespace != header.namespace {

Review Comment:
   ns-mismatch path removes the `in_flight` entry then returns `None`; the 
caller at `mod.rs:152-156` skips the `in_flight_per_client` decrement on that 
branch. unreachable today (server echoes request namespace verbatim, request 
ids are monotonic globally per `SimClient`), but a future routing or dedup bug 
would manifest as a silent single-client wedge at `CLIENT_REQUEST_QUEUE_MAX=1` 
instead of a clean `replies_unknown` count. cleanest fix is to return an enum 
`Match(InFlight) | NsMismatch | Unknown` so the caller decrements on 
`NsMismatch` without applying effects.



##########
core/simulator/src/workload/shadow.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shadow state: the workload's prediction of server-side entity state.
+//!
+//! Name-keyed throughout. Server-ng does not yet ship reply bodies, so
+//! the workload cannot observe server-assigned numeric ids. Lookups are
+//! by name; requests route via `WireIdentifier::named(...)`. When typed
+//! response bodies land, id-keyed maps return as a parallel index;
+//! name-keyed `IndexSet`s stay the primary sampling source.
+//!
+//! `IndexSet` for state we sample randomly (O(1) `get_index`).
+//! `HashMap` for pure-lookup state.
+
+use indexmap::IndexSet;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use std::collections::HashMap;
+
+use iggy_common::sharding::IggyNamespace;
+
+use crate::workload::effect::{ApplyResult, Effect};
+use crate::workload::ids::IdPermutation;
+
+pub struct Shadow {
+    pub namespaces_live: IndexSet<IggyNamespace>,
+
+    /// Live streams by name. `CreateStream` inserts; `DeleteStream` removes.
+    pub stream_names: IndexSet<String>,
+    /// Live topics by `(stream, topic)`. Only added if parent stream lives.
+    pub topic_names: IndexSet<(String, String)>,
+    pub user_names: IndexSet<String>,
+    pub pat_names: IndexSet<String>,
+    pub consumer_group_names: IndexSet<(String, String, String)>,
+
+    pub sends_committed: HashMap<IggyNamespace, u64>,
+    pub consumer_offsets: HashMap<(IggyNamespace, u8, u32), u64>,
+
+    /// Id permutation for fabricated-id paths (`Outcome::ResourceNotFound`).
+    /// Currently unused.
+    pub id_permutation: IdPermutation,
+    next_index: u64,
+}
+
+impl Shadow {
+    #[must_use]
+    pub fn new(namespaces: Vec<IggyNamespace>, id_permutation: IdPermutation) 
-> Self {
+        let namespaces_live: IndexSet<IggyNamespace> = 
namespaces.into_iter().collect();
+        Self {
+            namespaces_live,
+            stream_names: IndexSet::new(),
+            topic_names: IndexSet::new(),
+            user_names: IndexSet::new(),
+            pat_names: IndexSet::new(),
+            consumer_group_names: IndexSet::new(),
+            sends_committed: HashMap::new(),
+            consumer_offsets: HashMap::new(),
+            id_permutation,
+            next_index: 1,
+        }
+    }
+
+    /// Pick a live namespace uniformly. `IndexSet` preserves insertion
+    /// order, so for deduplicated input this matches `Vec::get(i)`.
+    pub fn pick_namespace(&self, prng: &mut Xoshiro256Plus) -> 
Option<IggyNamespace> {
+        let n = self.namespaces_live.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.namespaces_live.get_index(i).copied()
+    }
+
+    pub fn pick_stream_name(&self, prng: &mut Xoshiro256Plus) -> 
Option<String> {
+        let n = self.stream_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.stream_names.get_index(i).cloned()
+    }
+
+    pub fn pick_topic_pair(&self, prng: &mut Xoshiro256Plus) -> 
Option<(String, String)> {
+        let n = self.topic_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.topic_names.get_index(i).cloned()
+    }
+
+    pub fn pick_user_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.user_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.user_names.get_index(i).cloned()
+    }
+
+    pub fn pick_pat_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.pat_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.pat_names.get_index(i).cloned()
+    }
+
+    pub fn pick_consumer_group_triple(
+        &self,
+        prng: &mut Xoshiro256Plus,
+    ) -> Option<(String, String, String)> {
+        let n = self.consumer_group_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.consumer_group_names.get_index(i).cloned()
+    }
+
+    /// Fresh monotonically increasing stream name. Uses the raw counter;
+    /// will route through `id_permutation` once `ResourceNotFound` is wired.
+    pub fn fresh_stream_name(&mut self) -> String {
+        self.fresh_name("stream")
+    }
+
+    /// Fresh prefixed entity name. `next_index` is shared across entity
+    /// kinds so names stay distinct regardless of which op claims them.
+    pub fn fresh_name(&mut self, prefix: &str) -> String {
+        let index = self.next_index;
+        self.next_index += 1;
+        format!("wl-{prefix}-{index:08x}")
+    }
+
+    /// Apply a predicted effect to the shadow. Returns any
+    /// [`SimCommand`](crate::workload::effect::SimCommand)s the driver
+    /// must run against the simulator (e.g. `init_partition`).
+    ///
+    /// Cascades use `IndexSet::retain` for a single-pass O(n) walk that
+    /// preserves insertion order (`shift_remove`, not `swap_remove`):
+    /// `pick_*_name` returns `get_index`-based samples, so insertion
+    /// order is part of the determinism contract.
+    pub fn apply(&mut self, e: Effect) -> ApplyResult {
+        let sim_commands = Vec::new();
+        match e {
+            Effect::None => {}
+            Effect::AddStream { name } => {
+                self.stream_names.insert(name);
+            }
+            Effect::RemoveStream { name } => {
+                self.stream_names.shift_remove(&name);
+                self.topic_names.retain(|(s, _)| s != &name);
+                self.consumer_group_names.retain(|(s, _, _)| s != &name);
+            }
+            Effect::AddTopic {
+                stream,
+                name,
+                partitions: _,
+            } => {
+                if self.stream_names.contains(&stream) {
+                    self.topic_names.insert((stream, name));
+                }

Review Comment:
   `AddTopic` silently no-ops when the parent stream is gone; same pattern at 
`AddConsumerGroup` (lines 197-205). `auditor.note_committed` at `mod.rs:174` 
runs unconditionally, so `commits_per_action` diverges from the shadow under 
multi-client interleave. same root cause as the multi-client rename collision 
below. single fix: `Shadow::apply` returns `ApplyResult { applied: bool }` and 
`note_committed` only fires on `applied = true`.



##########
core/simulator/src/workload/shadow.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shadow state: the workload's prediction of server-side entity state.
+//!
+//! Name-keyed throughout. Server-ng does not yet ship reply bodies, so
+//! the workload cannot observe server-assigned numeric ids. Lookups are
+//! by name; requests route via `WireIdentifier::named(...)`. When typed
+//! response bodies land, id-keyed maps return as a parallel index;
+//! name-keyed `IndexSet`s stay the primary sampling source.
+//!
+//! `IndexSet` for state we sample randomly (O(1) `get_index`).
+//! `HashMap` for pure-lookup state.
+
+use indexmap::IndexSet;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use std::collections::HashMap;
+
+use iggy_common::sharding::IggyNamespace;
+
+use crate::workload::effect::{ApplyResult, Effect};
+use crate::workload::ids::IdPermutation;
+
+pub struct Shadow {
+    pub namespaces_live: IndexSet<IggyNamespace>,
+
+    /// Live streams by name. `CreateStream` inserts; `DeleteStream` removes.
+    pub stream_names: IndexSet<String>,
+    /// Live topics by `(stream, topic)`. Only added if parent stream lives.
+    pub topic_names: IndexSet<(String, String)>,
+    pub user_names: IndexSet<String>,
+    pub pat_names: IndexSet<String>,
+    pub consumer_group_names: IndexSet<(String, String, String)>,
+
+    pub sends_committed: HashMap<IggyNamespace, u64>,
+    pub consumer_offsets: HashMap<(IggyNamespace, u8, u32), u64>,
+
+    /// Id permutation for fabricated-id paths (`Outcome::ResourceNotFound`).
+    /// Currently unused.
+    pub id_permutation: IdPermutation,
+    next_index: u64,
+}
+
+impl Shadow {
+    #[must_use]
+    pub fn new(namespaces: Vec<IggyNamespace>, id_permutation: IdPermutation) 
-> Self {
+        let namespaces_live: IndexSet<IggyNamespace> = 
namespaces.into_iter().collect();
+        Self {
+            namespaces_live,
+            stream_names: IndexSet::new(),
+            topic_names: IndexSet::new(),
+            user_names: IndexSet::new(),
+            pat_names: IndexSet::new(),
+            consumer_group_names: IndexSet::new(),
+            sends_committed: HashMap::new(),
+            consumer_offsets: HashMap::new(),
+            id_permutation,
+            next_index: 1,
+        }
+    }
+
+    /// Pick a live namespace uniformly. `IndexSet` preserves insertion
+    /// order, so for deduplicated input this matches `Vec::get(i)`.
+    pub fn pick_namespace(&self, prng: &mut Xoshiro256Plus) -> 
Option<IggyNamespace> {
+        let n = self.namespaces_live.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.namespaces_live.get_index(i).copied()
+    }
+
+    pub fn pick_stream_name(&self, prng: &mut Xoshiro256Plus) -> 
Option<String> {
+        let n = self.stream_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.stream_names.get_index(i).cloned()
+    }
+
+    pub fn pick_topic_pair(&self, prng: &mut Xoshiro256Plus) -> 
Option<(String, String)> {
+        let n = self.topic_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.topic_names.get_index(i).cloned()
+    }
+
+    pub fn pick_user_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.user_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.user_names.get_index(i).cloned()
+    }
+
+    pub fn pick_pat_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.pat_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.pat_names.get_index(i).cloned()
+    }
+
+    pub fn pick_consumer_group_triple(
+        &self,
+        prng: &mut Xoshiro256Plus,
+    ) -> Option<(String, String, String)> {
+        let n = self.consumer_group_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.consumer_group_names.get_index(i).cloned()
+    }
+
+    /// Fresh monotonically increasing stream name. Uses the raw counter;
+    /// will route through `id_permutation` once `ResourceNotFound` is wired.
+    pub fn fresh_stream_name(&mut self) -> String {
+        self.fresh_name("stream")
+    }
+
+    /// Fresh prefixed entity name. `next_index` is shared across entity
+    /// kinds so names stay distinct regardless of which op claims them.
+    pub fn fresh_name(&mut self, prefix: &str) -> String {
+        let index = self.next_index;
+        self.next_index += 1;
+        format!("wl-{prefix}-{index:08x}")
+    }
+
+    /// Apply a predicted effect to the shadow. Returns any
+    /// [`SimCommand`](crate::workload::effect::SimCommand)s the driver
+    /// must run against the simulator (e.g. `init_partition`).
+    ///
+    /// Cascades use `IndexSet::retain` for a single-pass O(n) walk that
+    /// preserves insertion order (`shift_remove`, not `swap_remove`):
+    /// `pick_*_name` returns `get_index`-based samples, so insertion
+    /// order is part of the determinism contract.
+    pub fn apply(&mut self, e: Effect) -> ApplyResult {
+        let sim_commands = Vec::new();
+        match e {
+            Effect::None => {}
+            Effect::AddStream { name } => {
+                self.stream_names.insert(name);
+            }
+            Effect::RemoveStream { name } => {
+                self.stream_names.shift_remove(&name);
+                self.topic_names.retain(|(s, _)| s != &name);
+                self.consumer_group_names.retain(|(s, _, _)| s != &name);
+            }
+            Effect::AddTopic {
+                stream,
+                name,
+                partitions: _,
+            } => {
+                if self.stream_names.contains(&stream) {
+                    self.topic_names.insert((stream, name));
+                }
+            }
+            Effect::RemoveTopic { stream, name } => {
+                self.topic_names
+                    .shift_remove(&(stream.clone(), name.clone()));
+                self.consumer_group_names
+                    .retain(|(s, t, _)| !(s == &stream && t == &name));
+            }
+            Effect::AddUser { name } => {
+                self.user_names.insert(name);
+            }
+            Effect::RemoveUser { name } => {
+                self.user_names.shift_remove(&name);
+            }
+            Effect::AddPat { name } => {
+                self.pat_names.insert(name);
+            }
+            Effect::RemovePat { name } => {
+                self.pat_names.shift_remove(&name);
+            }
+            Effect::AddConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                if self.topic_names.contains(&(stream.clone(), topic.clone())) 
{
+                    self.consumer_group_names.insert((stream, topic, name));
+                }
+            }
+            Effect::RemoveConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                self.consumer_group_names
+                    .shift_remove(&(stream, topic, name));
+            }
+            Effect::SendCommitted { ns, count } => {
+                *self.sends_committed.entry(ns).or_insert(0) += count;
+            }
+            Effect::OffsetStored { key, value } => {
+                self.consumer_offsets.insert(key, value);
+            }
+            Effect::OffsetDeleted { key } => {
+                self.consumer_offsets.remove(&key);
+            }
+            Effect::RenameStream { old, new } => self.rename_stream(&old, 
&new),
+            Effect::RenameTopic { stream, old, new } => {
+                self.rename_topic(&stream, &old, &new);
+            }
+            Effect::RenameUser { old, new } => {
+                if self.user_names.shift_remove(&old) {
+                    self.user_names.insert(new);
+                }
+            }
+        }
+        ApplyResult { sim_commands }
+    }
+
+    fn rename_stream(&mut self, old: &str, new: &str) {
+        if !self.stream_names.shift_remove(old) {
+            return;

Review Comment:
   `rename_stream`'s `if !shift_remove(old) { return; }` silently drops the 
second concurrent rename when two clients picked the same `old`. same pattern 
at `rename_topic` (lines 271-277). `CLIENT_REQUEST_QUEUE_MAX=1` is per-client; 
the second client's `pick_*_name` reads the shadow before the first rename's 
reply lands, so collisions are possible across clients. dormant under default 
weights but closes via the same `ApplyResult` change as the AddTopic finding 
above.



##########
core/simulator/src/workload/mod.rs:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Deterministic seed-based workload generator.
+//!
+//! - `actions::Action`: server command variants.
+//! - `ops/<name>.rs`: per-op `sample`, `build_message`, `classify_reply`,
+//!   `predicted_effect`.
+//! - `shadow::Shadow`: predicted server entity state.
+//! - `auditor::ServerAuditor`: in-flight expectations and invariants.
+//! - `effect::Effect`: predicted shadow mutation per commit.
+
+pub mod actions;
+pub mod auditor;
+pub mod effect;
+pub mod ids;
+pub mod ops;
+pub mod options;
+pub mod shadow;
+
+use std::collections::HashMap;
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+
+use crate::Simulator;
+use crate::client::SimClient;
+use actions::Action;
+use auditor::ServerAuditor;
+use effect::SimCommand;
+use options::WorkloadOptions;
+use shadow::Shadow;
+
+use crate::workload::ops::InFlight;
+
+/// Max in-flight requests per client. Must stay under the consensus
+/// pipeline's queue limits.
+pub const CLIENT_REQUEST_QUEUE_MAX: usize = 1;
+
+pub struct Workload {
+    prng: Xoshiro256Plus,
+    pub auditor: ServerAuditor,
+    pub shadow: Shadow,
+    pub options: WorkloadOptions,
+    /// Number of in-flight requests per client.
+    in_flight_per_client: HashMap<u128, usize>,

Review Comment:
   `in_flight_per_client` (and `last_commit_per_client_ns` at `auditor.rs:67`) 
never reap entries for departed clients. bounded today by the fixed client set 
at `Simulator::new`; prune on disconnect when client churn lands.



##########
core/simulator/src/workload/ids.rs:
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bijective permutations from ascending indices to test ids. Lets the
+//! auditor recover the original index from any id present in a reply.
+//!
+//! Currently only `Identity` is used; other variants exist for ops that
+//! need pseudo-uuid ids without reshaping this module.
+
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+
+#[derive(Debug, Clone, Copy)]
+pub enum IdPermutation {
+    /// `index → index`.
+    Identity,
+    /// `index → u64::MAX - index`.
+    Inversion,
+    /// `index → (index << 32) | rand32(seed ⊕ index)`. Decode discards
+    /// the random low 32 bits. Index must fit in `u32`.
+    Random(u64),

Review Comment:
   doc says `(index << 32) | rand32(seed XOR index)` but code at line 50 uses 
`seed.wrapping_add(data)`. align the doc, or replace the add with XOR.



##########
core/simulator/src/workload/actions.rs:
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Workload-emittable server commands. Variant order is part of the
+//! determinism contract; the first three positions lock the hash baseline.
+//! Append only; never reorder or insert.
+
+use strum::{EnumCount, EnumIter};
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumCount, EnumIter)]
+#[repr(u8)]
+pub enum Action {
+    // DO NOT REORDER (hash baseline depends on these indices).
+    CreateStream,
+    SendMessages,
+    StoreConsumerOffset2,

Review Comment:
   `Action::StoreConsumerOffset2` / `DeleteConsumerOffset2` mirror the 
`iggy_binary_protocol::Operation` enum, but the corresponding client methods 
and op files use the `_v2` suffix (`store_consumer_offset_v2`, 
`ops/store_consumer_offset_v2.rs`). pick one. renaming the Action variants 
without reordering preserves discriminants, so the locked PRNG/hash baseline 
stays stable.



##########
core/simulator/src/workload/mod.rs:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Deterministic seed-based workload generator.
+//!
+//! - `actions::Action`: server command variants.
+//! - `ops/<name>.rs`: per-op `sample`, `build_message`, `classify_reply`,
+//!   `predicted_effect`.
+//! - `shadow::Shadow`: predicted server entity state.
+//! - `auditor::ServerAuditor`: in-flight expectations and invariants.
+//! - `effect::Effect`: predicted shadow mutation per commit.
+
+pub mod actions;
+pub mod auditor;
+pub mod effect;
+pub mod ids;
+pub mod ops;
+pub mod options;
+pub mod shadow;
+
+use std::collections::HashMap;
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+
+use crate::Simulator;
+use crate::client::SimClient;
+use actions::Action;
+use auditor::ServerAuditor;
+use effect::SimCommand;
+use options::WorkloadOptions;
+use shadow::Shadow;
+
+use crate::workload::ops::InFlight;
+
+/// Max in-flight requests per client. Must stay under the consensus
+/// pipeline's queue limits.
+pub const CLIENT_REQUEST_QUEUE_MAX: usize = 1;
+
+pub struct Workload {
+    prng: Xoshiro256Plus,
+    pub auditor: ServerAuditor,
+    pub shadow: Shadow,
+    pub options: WorkloadOptions,
+    /// Number of in-flight requests per client.
+    in_flight_per_client: HashMap<u128, usize>,
+    /// Debug counter for `sample()` returning `None`. Useful when an
+    /// outcome-first generation lands and divergence in `apply` branches
+    /// could silently shift the PRNG trace.
+    samples_none: u64,
+}
+
+impl Workload {
+    #[must_use]
+    pub fn new(options: WorkloadOptions) -> Self {
+        let prng = Xoshiro256Plus::seed_from_u64(options.seed);
+        let shadow = Shadow::new(options.namespaces.clone(), 
ids::IdPermutation::Identity);
+        Self {
+            prng,
+            auditor: ServerAuditor::new(),
+            shadow,
+            options,
+            in_flight_per_client: HashMap::new(),
+            samples_none: 0,
+        }
+    }
+
+    /// True if the client has a free in-flight slot.
+    #[must_use]
+    pub fn client_idle(&self, client_id: u128) -> bool {
+        self.in_flight_per_client
+            .get(&client_id)
+            .copied()
+            .unwrap_or(0)
+            < CLIENT_REQUEST_QUEUE_MAX
+    }
+
+    /// Build the next request for `client`. Returns the message and target
+    /// replica index, or `None` if the client has no idle slot or
+    /// `ops::sample` could not synthesize an input.
+    ///
+    /// Note: `pick_action` and `pick_target_replica` advance the PRNG
+    /// even when `sample` returns `None` (e.g. an op needs a live stream
+    /// but the shadow is empty). Today this is harmless because every
+    /// op classifies as `Outcome::Success`. Once outcome-first generation
+    /// lands, a divergence in the Success/Failure split inside `sample`
+    /// could shift the PRNG trace; the `samples_none` counter exists to
+    /// flag that case during development.
+    pub fn build_request(&mut self, client: &SimClient) -> Option<(u8, 
Message<RequestHeader>)> {
+        if !self.client_idle(client.client_id()) {
+            return None;
+        }
+
+        let action = self.pick_action();
+        let target = self.pick_target_replica();
+
+        let Some((input, outcome)) =
+            ops::sample(action, &mut self.shadow, &mut self.prng, 
&self.options)
+        else {
+            self.samples_none += 1;

Review Comment:
   `samples_none` is incremented here but never asserted on, and 
`workload_hash_for_seed` at `lib.rs:980-1006` does not include it. the doc at 
lines 97-104 calls out the PRNG-shift risk this counter is supposed to catch. 
either feed it into the determinism hash or add a `debug_assert!(samples_none 
== 0)` at quiesce.



##########
core/simulator/src/workload/shadow.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shadow state: the workload's prediction of server-side entity state.
+//!
+//! Name-keyed throughout. Server-ng does not yet ship reply bodies, so
+//! the workload cannot observe server-assigned numeric ids. Lookups are
+//! by name; requests route via `WireIdentifier::named(...)`. When typed
+//! response bodies land, id-keyed maps return as a parallel index;
+//! name-keyed `IndexSet`s stay the primary sampling source.
+//!
+//! `IndexSet` for state we sample randomly (O(1) `get_index`).
+//! `HashMap` for pure-lookup state.
+
+use indexmap::IndexSet;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use std::collections::HashMap;
+
+use iggy_common::sharding::IggyNamespace;
+
+use crate::workload::effect::{ApplyResult, Effect};
+use crate::workload::ids::IdPermutation;
+
+pub struct Shadow {
+    pub namespaces_live: IndexSet<IggyNamespace>,
+
+    /// Live streams by name. `CreateStream` inserts; `DeleteStream` removes.
+    pub stream_names: IndexSet<String>,
+    /// Live topics by `(stream, topic)`. Only added if parent stream lives.
+    pub topic_names: IndexSet<(String, String)>,
+    pub user_names: IndexSet<String>,
+    pub pat_names: IndexSet<String>,
+    pub consumer_group_names: IndexSet<(String, String, String)>,
+
+    pub sends_committed: HashMap<IggyNamespace, u64>,
+    pub consumer_offsets: HashMap<(IggyNamespace, u8, u32), u64>,
+
+    /// Id permutation for fabricated-id paths (`Outcome::ResourceNotFound`).
+    /// Currently unused.
+    pub id_permutation: IdPermutation,
+    next_index: u64,
+}
+
+impl Shadow {
+    #[must_use]
+    pub fn new(namespaces: Vec<IggyNamespace>, id_permutation: IdPermutation) 
-> Self {
+        let namespaces_live: IndexSet<IggyNamespace> = 
namespaces.into_iter().collect();

Review Comment:
   `namespaces.into_iter().collect()` into `IndexSet` silently dedups duplicate 
`IggyNamespace`. user-error class. add `debug_assert_eq!(namespaces.len(), 
namespaces_live.len())` on construction.



##########
core/simulator/src/workload/ops/purge_topic.rs:
##########
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `PurgeTopic` op. Live (stream, topic) picked from shadow.
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use rand_xoshiro::Xoshiro256Plus;
+
+use crate::client::SimClient;
+use crate::workload::effect::Effect;
+use crate::workload::options::WorkloadOptions;
+use crate::workload::shadow::Shadow;
+
+#[derive(Debug, Clone)]
+pub struct Input {
+    pub stream: String,
+    pub topic: String,
+}
+
+#[derive(Copy, Clone, Eq, PartialEq, Debug)]
+pub enum Outcome {
+    Success,
+}
+
+pub const OUTCOMES: &[Outcome] = &[Outcome::Success];
+
+pub fn sample(
+    shadow: &mut Shadow,
+    outcome: Outcome,
+    prng: &mut Xoshiro256Plus,
+    _options: &WorkloadOptions,
+) -> Option<Input> {
+    match outcome {
+        Outcome::Success => shadow
+            .pick_topic_pair(prng)
+            .map(|(stream, topic)| Input { stream, topic }),
+    }
+}
+
+#[must_use]
+pub fn build_message(client: &SimClient, input: &Input) -> 
Message<RequestHeader> {
+    client.purge_topic(&input.stream, &input.topic)
+}
+
+#[must_use]
+pub const fn classify_reply(_reply: &ReplyHeader) -> Outcome {
+    Outcome::Success
+}
+
+#[must_use]

Review Comment:
   same as `purge_stream.rs:62`. add a matching `TODO`.



##########
core/simulator/src/workload/ids.rs:
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bijective permutations from ascending indices to test ids. Lets the
+//! auditor recover the original index from any id present in a reply.
+//!
+//! Currently only `Identity` is used; other variants exist for ops that
+//! need pseudo-uuid ids without reshaping this module.
+
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+
+#[derive(Debug, Clone, Copy)]
+pub enum IdPermutation {
+    /// `index → index`.
+    Identity,
+    /// `index → u64::MAX - index`.
+    Inversion,
+    /// `index → (index << 32) | rand32(seed ⊕ index)`. Decode discards
+    /// the random low 32 bits. Index must fit in `u32`.
+    Random(u64),
+}
+
+impl IdPermutation {
+    #[must_use]
+    pub fn encode(&self, data: u64) -> u64 {
+        match self {
+            Self::Identity => data,
+            Self::Inversion => u64::MAX - data,
+            Self::Random(seed) => {
+                debug_assert!(
+                    u32::try_from(data).is_ok(),
+                    "Random id permutation needs index <= u32::MAX, got {data}"
+                );
+                let mut prng = 
Xoshiro256Plus::seed_from_u64(seed.wrapping_add(data));

Review Comment:
   `data & 0xFFFF_FFFF` silently truncates u32+ indices in release; the 
`debug_assert!` above only fires in dev. the variant is dead today 
(`id_permutation: IdPermutation::Identity` at `shadow.rs:73`). either return 
`Result` or drop the unused variant.



##########
core/simulator/src/client.rs:
##########
@@ -200,7 +469,7 @@ impl SimClient {
         self.build_request_with_namespace(Operation::StoreConsumerOffset2, 
&payload, namespace)
     }
 
-    /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte.
+    /// Delete offset with explicit `AckLevel`.

Review Comment:
   same as above: `delete_consumer_offset_v2` is missing a `# Panics` block but 
panics the same way.



##########
core/simulator/src/workload/auditor.rs:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Auditor: reply-order and per-client commit invariants, plus the
+//! in-flight expectation table.
+//!
+//! Entity-state tracking lives in [`crate::workload::shadow::Shadow`].
+//! Stays transport-agnostic so it can drive an Antithesis-style harness
+//! against a real server binary.
+//!
+//! Server-ng currently encodes every reply as success (`context` always 0).
+//! Once the wire protocol gains an error discriminant, expand `is_success`
+//! and validate `Failure` outcomes via 
[`crate::workload::ops::classify_reply`].
+
+use std::collections::HashMap;
+
+use iggy_binary_protocol::ReplyHeader;
+use strum::EnumCount;
+
+use crate::workload::actions::Action;
+use crate::workload::ops::InFlight;
+
+#[derive(Debug, Clone)]
+pub struct AuditorStats {
+    pub replies_seen: u64,
+    pub replies_unknown: u64,
+    /// Per-action committed counter, indexed by `Action as usize`.
+    pub commits_per_action: [u64; Action::COUNT],
+}
+
+impl Default for AuditorStats {
+    fn default() -> Self {
+        Self {
+            replies_seen: 0,
+            replies_unknown: 0,
+            commits_per_action: [0u64; Action::COUNT],
+        }
+    }
+}
+
+impl AuditorStats {
+    #[must_use]
+    pub const fn commits(&self, action: Action) -> u64 {
+        self.commits_per_action[action as usize]
+    }
+}
+
+pub struct ServerAuditor {
+    in_flight: HashMap<(u128, u64), InFlight>,
+    /// Highest `commit` op observed per `(client, namespace)`. Each VSR
+    /// group (one per partition, plus metadata at `namespace = 0`) has its
+    /// own op counter; monotonicity is per-namespace, not per-client.
+    last_commit_per_client_ns: HashMap<(u128, u64), u64>,

Review Comment:
   field name `last_commit_per_client_ns` implies strict monotonicity but the 
code at lines 132-136 only advances a watermark; the relaxation rationale is 
documented at lines 108-113. rename to `last_commit_watermark_per_client_ns` 
and tighten the field doc at lines 64-66 so the name matches behavior.



##########
core/simulator/src/workload/shadow.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shadow state: the workload's prediction of server-side entity state.
+//!
+//! Name-keyed throughout. Server-ng does not yet ship reply bodies, so
+//! the workload cannot observe server-assigned numeric ids. Lookups are
+//! by name; requests route via `WireIdentifier::named(...)`. When typed
+//! response bodies land, id-keyed maps return as a parallel index;
+//! name-keyed `IndexSet`s stay the primary sampling source.
+//!
+//! `IndexSet` for state we sample randomly (O(1) `get_index`).
+//! `HashMap` for pure-lookup state.
+
+use indexmap::IndexSet;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use std::collections::HashMap;
+
+use iggy_common::sharding::IggyNamespace;
+
+use crate::workload::effect::{ApplyResult, Effect};
+use crate::workload::ids::IdPermutation;
+
+pub struct Shadow {
+    pub namespaces_live: IndexSet<IggyNamespace>,
+
+    /// Live streams by name. `CreateStream` inserts; `DeleteStream` removes.
+    pub stream_names: IndexSet<String>,
+    /// Live topics by `(stream, topic)`. Only added if parent stream lives.
+    pub topic_names: IndexSet<(String, String)>,
+    pub user_names: IndexSet<String>,
+    pub pat_names: IndexSet<String>,
+    pub consumer_group_names: IndexSet<(String, String, String)>,
+
+    pub sends_committed: HashMap<IggyNamespace, u64>,
+    pub consumer_offsets: HashMap<(IggyNamespace, u8, u32), u64>,
+
+    /// Id permutation for fabricated-id paths (`Outcome::ResourceNotFound`).
+    /// Currently unused.
+    pub id_permutation: IdPermutation,
+    next_index: u64,
+}
+
+impl Shadow {
+    #[must_use]
+    pub fn new(namespaces: Vec<IggyNamespace>, id_permutation: IdPermutation) 
-> Self {
+        let namespaces_live: IndexSet<IggyNamespace> = 
namespaces.into_iter().collect();
+        Self {
+            namespaces_live,
+            stream_names: IndexSet::new(),
+            topic_names: IndexSet::new(),
+            user_names: IndexSet::new(),
+            pat_names: IndexSet::new(),
+            consumer_group_names: IndexSet::new(),
+            sends_committed: HashMap::new(),
+            consumer_offsets: HashMap::new(),
+            id_permutation,
+            next_index: 1,
+        }
+    }
+
+    /// Pick a live namespace uniformly. `IndexSet` preserves insertion
+    /// order, so for deduplicated input this matches `Vec::get(i)`.
+    pub fn pick_namespace(&self, prng: &mut Xoshiro256Plus) -> 
Option<IggyNamespace> {
+        let n = self.namespaces_live.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.namespaces_live.get_index(i).copied()
+    }
+
+    pub fn pick_stream_name(&self, prng: &mut Xoshiro256Plus) -> 
Option<String> {
+        let n = self.stream_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.stream_names.get_index(i).cloned()
+    }
+
+    pub fn pick_topic_pair(&self, prng: &mut Xoshiro256Plus) -> 
Option<(String, String)> {
+        let n = self.topic_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.topic_names.get_index(i).cloned()
+    }
+
+    pub fn pick_user_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.user_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.user_names.get_index(i).cloned()
+    }
+
+    pub fn pick_pat_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.pat_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.pat_names.get_index(i).cloned()
+    }
+
+    pub fn pick_consumer_group_triple(
+        &self,
+        prng: &mut Xoshiro256Plus,
+    ) -> Option<(String, String, String)> {
+        let n = self.consumer_group_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.consumer_group_names.get_index(i).cloned()
+    }
+
+    /// Fresh monotonically increasing stream name. Uses the raw counter;
+    /// will route through `id_permutation` once `ResourceNotFound` is wired.
+    pub fn fresh_stream_name(&mut self) -> String {
+        self.fresh_name("stream")
+    }
+
+    /// Fresh prefixed entity name. `next_index` is shared across entity
+    /// kinds so names stay distinct regardless of which op claims them.
+    pub fn fresh_name(&mut self, prefix: &str) -> String {
+        let index = self.next_index;
+        self.next_index += 1;
+        format!("wl-{prefix}-{index:08x}")
+    }
+
+    /// Apply a predicted effect to the shadow. Returns any
+    /// [`SimCommand`](crate::workload::effect::SimCommand)s the driver
+    /// must run against the simulator (e.g. `init_partition`).
+    ///
+    /// Cascades use `IndexSet::retain` for a single-pass O(n) walk that

Review Comment:
   comment says "preserves insertion order" but the renames at lines 
240/252/267/278/293 do `shift_remove` + `insert`, which appends to the end of 
the `IndexSet`. determinism is intact (replay over the same op sequence is 
stable) but readers will conflate the two paths. tighten the wording, or use 
`shift_insert` at the original index.



##########
core/simulator/src/workload/options.rs:
##########
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::workload::actions::Action;
+use iggy_common::sharding::IggyNamespace;
+use strum::EnumCount;
+
+/// Per-action sampling weights as percentages. Unlisted variants default
+/// to 0 (never picked). Listed weights must sum to 100.
+#[derive(Debug, Clone, Copy)]
+pub struct ActionWeights {
+    weights: [u8; Action::COUNT],
+}
+
+impl ActionWeights {
+    /// # Panics
+    ///
+    /// Panics if `entries` contains a duplicate `Action`, or if the
+    /// listed weights do not sum to 100. Missing variants implicitly
+    /// weight 0.
+    #[must_use]
+    pub fn new(entries: &[(Action, u8)]) -> Self {
+        let mut weights = [0u8; Action::COUNT];
+        let mut seen = [false; Action::COUNT];
+        for &(action, w) in entries {
+            let idx = action as usize;
+            assert!(!seen[idx], "duplicate Action {action:?} in 
ActionWeights");
+            seen[idx] = true;
+            weights[idx] = w;
+        }
+        let total: u32 = weights.iter().map(|&w| u32::from(w)).sum();
+        assert!(total == 100, "ActionWeights must sum to 100, got {total}");
+        Self { weights }
+    }
+
+    #[must_use]
+    pub const fn weight(&self, action: Action) -> u8 {
+        self.weights[action as usize]
+    }
+}
+
+impl Default for ActionWeights {
+    fn default() -> Self {
+        Self::new(&[
+            (Action::CreateStream, 5),
+            (Action::SendMessages, 70),
+            (Action::StoreConsumerOffset2, 25),
+        ])
+    }
+}
+
+/// Workload generator knobs. Same `seed` reproduces the same action /
+/// payload sequence bit-for-bit.
+#[derive(Debug, Clone)]
+pub struct WorkloadOptions {
+    pub seed: u64,
+    pub replica_count: u8,
+    /// Number of clients registered with the simulator. Informational;
+    /// `Workload` is client-agnostic and tracks in-flight state per
+    /// `client_id`. Used by the CLI binary and multi-client tests.
+    pub client_count: u8,
+    /// Pre-seeded namespaces. Fixture must call `Simulator::init_partition`
+    /// for each before driving the workload.
+    pub namespaces: Vec<IggyNamespace>,
+    pub weights: ActionWeights,
+    /// Send-batch size = `batch_size_min + prng.range(batch_size_span)`.
+    pub batch_size_min: u32,
+    pub batch_size_span: u32,
+    /// Probability a `StoreConsumerOffset2` request uses `Quorum` vs `NoAck`.
+    pub ack_quorum_ratio: f32,
+    /// Probability a request targets a non-primary replica (exercises the
+    /// redirect / forward path).
+    pub target_non_primary_ratio: f32,
+    /// Probability that a request is intentionally constructed to fail

Review Comment:
   `invalid_request_ratio` is `pub` and self-documented as "Currently unused; 
reserved". drop it or land the consumer in this PR. same applies to 
`id_permutation: IdPermutation` at `shadow.rs:55`, which is only ever 
`Identity`.



##########
core/simulator/src/client.rs:
##########
@@ -181,8 +450,8 @@ impl SimClient {
         self.build_request_with_namespace(Operation::DeleteConsumerOffset, 
&payload, namespace)
     }
 
-    /// v2 of `store_consumer_offset` with an `AckLevel` byte. `NoAck` takes
-    /// the primary's fast path (no replication); `Quorum` goes through VSR.
+    /// Store offset with explicit `AckLevel`. `NoAck` takes the primary's
+    /// fast path (no replication); `Quorum` goes through VSR.
     pub fn store_consumer_offset_v2(

Review Comment:
   `store_consumer_offset_v2` lacks the `# Panics` section that every other pub 
fn in this file carries (compare line 100, 122). it panics via 
`Owned::copy_from_slice` (size assert) and 
`Message::try_from(...).expect(...)`. add a `# Panics` block.



##########
core/simulator/src/workload/ops/send_messages.rs:
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `SendMessages` op. Samples only `Success`. PRNG draw order:
+//!
+//! 1. namespace pick (`pick_namespace`)
+//! 2. batch length (`prng.random_range(0..span)`)
+//! 3. one `prng.random()` per payload to disambiguate body bytes
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use iggy_common::sharding::IggyNamespace;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+
+use crate::client::SimClient;
+use crate::workload::effect::Effect;
+use crate::workload::options::WorkloadOptions;
+use crate::workload::shadow::Shadow;
+
+#[derive(Debug, Clone)]
+pub struct Input {
+    pub ns: IggyNamespace,
+    pub batch_len: u32,
+    pub payloads: Vec<Vec<u8>>,
+}
+
+#[derive(Copy, Clone, Eq, PartialEq, Debug)]
+pub enum Outcome {
+    Success,
+}
+
+pub const OUTCOMES: &[Outcome] = &[Outcome::Success];
+
+pub fn sample(
+    shadow: &mut Shadow,
+    outcome: Outcome,
+    prng: &mut Xoshiro256Plus,
+    options: &WorkloadOptions,
+) -> Option<Input> {
+    match outcome {
+        Outcome::Success => {
+            let ns = shadow.pick_namespace(prng)?;
+            let span = options.batch_size_span.max(1);
+            let batch_len = options.batch_size_min + 
prng.random_range(0..span);
+            let mut payloads: Vec<Vec<u8>> = Vec::with_capacity(batch_len as 
usize);
+            for i in 0..batch_len {
+                let r: u32 = prng.random();
+                payloads.push(format!("wl-msg-{i:04x}-{r:08x}").into_bytes());
+            }
+            Some(Input {
+                ns,
+                batch_len,
+                payloads,
+            })
+        }
+    }
+}
+
+#[must_use]
+pub fn build_message(client: &SimClient, input: &Input) -> 
Message<RequestHeader> {
+    let refs: Vec<&[u8]> = input.payloads.iter().map(Vec::as_slice).collect();

Review Comment:
   `Vec<&[u8]>` rebuilt in `build_message` after `sample` already owns 
`Vec<Vec<u8>>`. pass `&[Vec<u8>]` directly, or have `Input` own `Vec<Bytes>` so 
the client side at `client.rs:411` can `Bytes::clone` (refcount bump) instead 
of `copy_from_slice`.



##########
core/simulator/src/workload/ops/change_password.rs:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `ChangePassword` op. Live username picked from shadow.
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+
+use crate::client::SimClient;
+use crate::workload::effect::Effect;
+use crate::workload::options::WorkloadOptions;
+use crate::workload::shadow::Shadow;
+
+#[derive(Debug, Clone)]
+pub struct Input {
+    pub user: String,
+    pub current_password: String,
+    pub new_password: String,
+}
+
+#[derive(Copy, Clone, Eq, PartialEq, Debug)]
+pub enum Outcome {
+    Success,
+}
+
+pub const OUTCOMES: &[Outcome] = &[Outcome::Success];
+
+pub fn sample(
+    shadow: &mut Shadow,
+    outcome: Outcome,
+    prng: &mut Xoshiro256Plus,
+    _options: &WorkloadOptions,
+) -> Option<Input> {
+    match outcome {
+        Outcome::Success => {
+            let user = shadow.pick_user_name(prng)?;
+            let current_password = format!("pw-{user}");

Review Comment:
   `format!("pw-{user}")` is correct only pre-first-commit. once the first 
`ChangePassword` succeeds server-side it stores `new_password`, but the next 
`sample()` rebuilds `pw-{user}` again, so a second `ChangePassword` on the same 
user mismatches the server. dormant under all-Success classification + weight=0 
default. fix: shadow tracks per-user current pw; seed `pw-{user}` on `AddUser`, 
update on `ChangePassword` effect and on `RenameUser`.



##########
core/simulator/src/workload/shadow.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shadow state: the workload's prediction of server-side entity state.
+//!
+//! Name-keyed throughout. Server-ng does not yet ship reply bodies, so
+//! the workload cannot observe server-assigned numeric ids. Lookups are
+//! by name; requests route via `WireIdentifier::named(...)`. When typed
+//! response bodies land, id-keyed maps return as a parallel index;
+//! name-keyed `IndexSet`s stay the primary sampling source.
+//!
+//! `IndexSet` for state we sample randomly (O(1) `get_index`).
+//! `HashMap` for pure-lookup state.
+
+use indexmap::IndexSet;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use std::collections::HashMap;
+
+use iggy_common::sharding::IggyNamespace;
+
+use crate::workload::effect::{ApplyResult, Effect};
+use crate::workload::ids::IdPermutation;
+
+pub struct Shadow {
+    pub namespaces_live: IndexSet<IggyNamespace>,
+
+    /// Live streams by name. `CreateStream` inserts; `DeleteStream` removes.
+    pub stream_names: IndexSet<String>,
+    /// Live topics by `(stream, topic)`. Only added if parent stream lives.
+    pub topic_names: IndexSet<(String, String)>,
+    pub user_names: IndexSet<String>,
+    pub pat_names: IndexSet<String>,
+    pub consumer_group_names: IndexSet<(String, String, String)>,
+
+    pub sends_committed: HashMap<IggyNamespace, u64>,
+    pub consumer_offsets: HashMap<(IggyNamespace, u8, u32), u64>,
+
+    /// Id permutation for fabricated-id paths (`Outcome::ResourceNotFound`).
+    /// Currently unused.
+    pub id_permutation: IdPermutation,
+    next_index: u64,
+}
+
+impl Shadow {
+    #[must_use]
+    pub fn new(namespaces: Vec<IggyNamespace>, id_permutation: IdPermutation) 
-> Self {
+        let namespaces_live: IndexSet<IggyNamespace> = 
namespaces.into_iter().collect();
+        Self {
+            namespaces_live,
+            stream_names: IndexSet::new(),
+            topic_names: IndexSet::new(),
+            user_names: IndexSet::new(),
+            pat_names: IndexSet::new(),
+            consumer_group_names: IndexSet::new(),
+            sends_committed: HashMap::new(),
+            consumer_offsets: HashMap::new(),
+            id_permutation,
+            next_index: 1,
+        }
+    }
+
+    /// Pick a live namespace uniformly. `IndexSet` preserves insertion
+    /// order, so for deduplicated input this matches `Vec::get(i)`.
+    pub fn pick_namespace(&self, prng: &mut Xoshiro256Plus) -> 
Option<IggyNamespace> {
+        let n = self.namespaces_live.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.namespaces_live.get_index(i).copied()
+    }
+
+    pub fn pick_stream_name(&self, prng: &mut Xoshiro256Plus) -> 
Option<String> {
+        let n = self.stream_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.stream_names.get_index(i).cloned()
+    }
+
+    pub fn pick_topic_pair(&self, prng: &mut Xoshiro256Plus) -> 
Option<(String, String)> {
+        let n = self.topic_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.topic_names.get_index(i).cloned()
+    }
+
+    pub fn pick_user_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.user_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.user_names.get_index(i).cloned()
+    }
+
+    pub fn pick_pat_name(&self, prng: &mut Xoshiro256Plus) -> Option<String> {
+        let n = self.pat_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.pat_names.get_index(i).cloned()
+    }
+
+    pub fn pick_consumer_group_triple(
+        &self,
+        prng: &mut Xoshiro256Plus,
+    ) -> Option<(String, String, String)> {
+        let n = self.consumer_group_names.len();
+        if n == 0 {
+            return None;
+        }
+        let i = prng.random_range(0..n);
+        self.consumer_group_names.get_index(i).cloned()
+    }
+
+    /// Fresh monotonically increasing stream name. Uses the raw counter;
+    /// will route through `id_permutation` once `ResourceNotFound` is wired.
+    pub fn fresh_stream_name(&mut self) -> String {
+        self.fresh_name("stream")
+    }
+
+    /// Fresh prefixed entity name. `next_index` is shared across entity
+    /// kinds so names stay distinct regardless of which op claims them.
+    pub fn fresh_name(&mut self, prefix: &str) -> String {
+        let index = self.next_index;
+        self.next_index += 1;
+        format!("wl-{prefix}-{index:08x}")
+    }
+
+    /// Apply a predicted effect to the shadow. Returns any
+    /// [`SimCommand`](crate::workload::effect::SimCommand)s the driver
+    /// must run against the simulator (e.g. `init_partition`).
+    ///
+    /// Cascades use `IndexSet::retain` for a single-pass O(n) walk that
+    /// preserves insertion order (`shift_remove`, not `swap_remove`):
+    /// `pick_*_name` returns `get_index`-based samples, so insertion
+    /// order is part of the determinism contract.
+    pub fn apply(&mut self, e: Effect) -> ApplyResult {
+        let sim_commands = Vec::new();
+        match e {
+            Effect::None => {}
+            Effect::AddStream { name } => {
+                self.stream_names.insert(name);
+            }
+            Effect::RemoveStream { name } => {
+                self.stream_names.shift_remove(&name);
+                self.topic_names.retain(|(s, _)| s != &name);
+                self.consumer_group_names.retain(|(s, _, _)| s != &name);
+            }
+            Effect::AddTopic {
+                stream,
+                name,
+                partitions: _,
+            } => {
+                if self.stream_names.contains(&stream) {
+                    self.topic_names.insert((stream, name));
+                }
+            }
+            Effect::RemoveTopic { stream, name } => {
+                self.topic_names
+                    .shift_remove(&(stream.clone(), name.clone()));
+                self.consumer_group_names
+                    .retain(|(s, t, _)| !(s == &stream && t == &name));
+            }
+            Effect::AddUser { name } => {
+                self.user_names.insert(name);
+            }
+            Effect::RemoveUser { name } => {
+                self.user_names.shift_remove(&name);
+            }
+            Effect::AddPat { name } => {
+                self.pat_names.insert(name);
+            }
+            Effect::RemovePat { name } => {
+                self.pat_names.shift_remove(&name);
+            }
+            Effect::AddConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                if self.topic_names.contains(&(stream.clone(), topic.clone())) 
{
+                    self.consumer_group_names.insert((stream, topic, name));
+                }
+            }
+            Effect::RemoveConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                self.consumer_group_names
+                    .shift_remove(&(stream, topic, name));
+            }
+            Effect::SendCommitted { ns, count } => {
+                *self.sends_committed.entry(ns).or_insert(0) += count;
+            }
+            Effect::OffsetStored { key, value } => {
+                self.consumer_offsets.insert(key, value);
+            }
+            Effect::OffsetDeleted { key } => {
+                self.consumer_offsets.remove(&key);
+            }
+            Effect::RenameStream { old, new } => self.rename_stream(&old, 
&new),
+            Effect::RenameTopic { stream, old, new } => {
+                self.rename_topic(&stream, &old, &new);
+            }
+            Effect::RenameUser { old, new } => {
+                if self.user_names.shift_remove(&old) {
+                    self.user_names.insert(new);
+                }
+            }
+        }
+        ApplyResult { sim_commands }
+    }
+
+    fn rename_stream(&mut self, old: &str, new: &str) {
+        if !self.stream_names.shift_remove(old) {
+            return;
+        }
+        self.stream_names.insert(new.to_string());
+        // Rename in (stream, topic) and (stream, topic, group):
+        // collect-then-rebuild keeps the loop borrow simple.
+        let old_topics: Vec<(String, String)> = self

Review Comment:
   `rename_stream` calls `shift_remove` inside the topic loop. each 
`shift_remove` is `O(N)`, total `O(N*M)`. cold path today (`UpdateStream` 
weight=0 default). single rebuild pass collapses it to `O(N)`.



##########
core/simulator/src/workload/shadow.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shadow state: the workload's prediction of server-side entity state.
+//!
+//! Name-keyed throughout. Server-ng does not yet ship reply bodies, so
+//! the workload cannot observe server-assigned numeric ids. Lookups are
+//! by name; requests route via `WireIdentifier::named(...)`. When typed
+//! response bodies land, id-keyed maps return as a parallel index;
+//! name-keyed `IndexSet`s stay the primary sampling source.
+//!
+//! `IndexSet` for state we sample randomly (O(1) `get_index`).
+//! `HashMap` for pure-lookup state.
+
+use indexmap::IndexSet;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use std::collections::HashMap;
+
+use iggy_common::sharding::IggyNamespace;
+
+use crate::workload::effect::{ApplyResult, Effect};
+use crate::workload::ids::IdPermutation;
+
+pub struct Shadow {
+    pub namespaces_live: IndexSet<IggyNamespace>,
+
+    /// Live streams by name. `CreateStream` inserts; `DeleteStream` removes.
+    pub stream_names: IndexSet<String>,

Review Comment:
   `IndexSet<String>` for entity names grows linearly with creates; 
`fresh_name` allocates a new `String` each time. bounded today by `tick_budget` 
per run. flag for `Arc<str>` interning when long-running harness lands.



##########
core/simulator/src/workload/ops/purge_stream.rs:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `PurgeStream` op. Live stream name picked from shadow.
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use rand_xoshiro::Xoshiro256Plus;
+
+use crate::client::SimClient;
+use crate::workload::effect::Effect;
+use crate::workload::options::WorkloadOptions;
+use crate::workload::shadow::Shadow;
+
+#[derive(Debug, Clone)]
+pub struct Input {
+    pub stream: String,
+}
+
+#[derive(Copy, Clone, Eq, PartialEq, Debug)]
+pub enum Outcome {
+    Success,
+}
+
+pub const OUTCOMES: &[Outcome] = &[Outcome::Success];
+
+pub fn sample(
+    shadow: &mut Shadow,
+    outcome: Outcome,
+    prng: &mut Xoshiro256Plus,
+    _options: &WorkloadOptions,
+) -> Option<Input> {
+    match outcome {
+        Outcome::Success => shadow.pick_stream_name(prng).map(|stream| Input { 
stream }),
+    }
+}
+
+#[must_use]
+pub fn build_message(client: &SimClient, input: &Input) -> 
Message<RequestHeader> {
+    client.purge_stream(&input.stream)
+}
+
+#[must_use]
+pub const fn classify_reply(_reply: &ReplyHeader) -> Outcome {
+    Outcome::Success
+}
+
+#[must_use]
+pub const fn predicted_effect(_input: &Input, outcome: Outcome) -> Effect {

Review Comment:
   `predicted_effect = Effect::None` is acceptable today because 
`shadow.sends_committed` is keyed by `IggyNamespace` (packed 
stream/topic/partition) and the purge request carries only a name-based 
`WireIdentifier`. the shadow has no name -> ns reverse index, so it cannot zero 
the right keys. add a `TODO` so this surfaces when that reverse index lands and 
the offset clamp at `store_consumer_offset.rs:63-64` starts validating against 
post-purge state.



##########
core/simulator/src/workload/mod.rs:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Deterministic seed-based workload generator.
+//!
+//! - `actions::Action`: server command variants.
+//! - `ops/<name>.rs`: per-op `sample`, `build_message`, `classify_reply`,
+//!   `predicted_effect`.
+//! - `shadow::Shadow`: predicted server entity state.
+//! - `auditor::ServerAuditor`: in-flight expectations and invariants.
+//! - `effect::Effect`: predicted shadow mutation per commit.
+
+pub mod actions;
+pub mod auditor;
+pub mod effect;
+pub mod ids;
+pub mod ops;
+pub mod options;
+pub mod shadow;
+
+use std::collections::HashMap;
+
+use iggy_binary_protocol::{Message, ReplyHeader, RequestHeader};
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+
+use crate::Simulator;
+use crate::client::SimClient;
+use actions::Action;
+use auditor::ServerAuditor;
+use effect::SimCommand;
+use options::WorkloadOptions;
+use shadow::Shadow;
+
+use crate::workload::ops::InFlight;
+
+/// Max in-flight requests per client. Must stay under the consensus
+/// pipeline's queue limits.
+pub const CLIENT_REQUEST_QUEUE_MAX: usize = 1;
+
+pub struct Workload {
+    prng: Xoshiro256Plus,
+    pub auditor: ServerAuditor,
+    pub shadow: Shadow,
+    pub options: WorkloadOptions,
+    /// Number of in-flight requests per client.
+    in_flight_per_client: HashMap<u128, usize>,
+    /// Debug counter for `sample()` returning `None`. Useful when an
+    /// outcome-first generation lands and divergence in `apply` branches
+    /// could silently shift the PRNG trace.
+    samples_none: u64,
+}
+
+impl Workload {
+    #[must_use]
+    pub fn new(options: WorkloadOptions) -> Self {
+        let prng = Xoshiro256Plus::seed_from_u64(options.seed);
+        let shadow = Shadow::new(options.namespaces.clone(), 
ids::IdPermutation::Identity);
+        Self {
+            prng,
+            auditor: ServerAuditor::new(),
+            shadow,
+            options,
+            in_flight_per_client: HashMap::new(),
+            samples_none: 0,
+        }
+    }
+
+    /// True if the client has a free in-flight slot.
+    #[must_use]
+    pub fn client_idle(&self, client_id: u128) -> bool {
+        self.in_flight_per_client
+            .get(&client_id)
+            .copied()
+            .unwrap_or(0)
+            < CLIENT_REQUEST_QUEUE_MAX
+    }
+
+    /// Build the next request for `client`. Returns the message and target
+    /// replica index, or `None` if the client has no idle slot or
+    /// `ops::sample` could not synthesize an input.
+    ///
+    /// Note: `pick_action` and `pick_target_replica` advance the PRNG
+    /// even when `sample` returns `None` (e.g. an op needs a live stream
+    /// but the shadow is empty). Today this is harmless because every
+    /// op classifies as `Outcome::Success`. Once outcome-first generation
+    /// lands, a divergence in the Success/Failure split inside `sample`
+    /// could shift the PRNG trace; the `samples_none` counter exists to
+    /// flag that case during development.
+    pub fn build_request(&mut self, client: &SimClient) -> Option<(u8, 
Message<RequestHeader>)> {
+        if !self.client_idle(client.client_id()) {
+            return None;
+        }
+
+        let action = self.pick_action();
+        let target = self.pick_target_replica();
+
+        let Some((input, outcome)) =
+            ops::sample(action, &mut self.shadow, &mut self.prng, 
&self.options)
+        else {
+            self.samples_none += 1;
+            return None;
+        };
+        let message = ops::build_message(client, &input);
+
+        let header = message.header();
+        let key = (client.client_id(), header.request);
+        self.auditor.record_in_flight(
+            key,
+            InFlight {
+                action,
+                input,
+                outcome,
+                request_namespace: header.namespace,
+            },
+        );
+        *self
+            .in_flight_per_client
+            .entry(client.client_id())
+            .or_insert(0) += 1;
+
+        Some((target, message))
+    }
+
+    /// Validate and apply a reply. Returns [`SimCommand`]s the driver
+    /// must run against the simulator (e.g. `init_partition`); the
+    /// auditor stays transport-agnostic.
+    ///
+    /// Returns an empty `Vec` when the reply has no matching in-flight
+    /// entry (duplicate cached reply or stale at-least-once
+    /// re-execution); see [`auditor::ServerAuditor::on_reply`] for the
+    /// at-least-once contract.
+    #[must_use = "returned SimCommands must be applied; call 
apply_sim_commands or use Workload::run"]
+    pub fn on_reply(&mut self, reply: &Message<ReplyHeader>) -> 
Vec<SimCommand> {
+        let header = reply.header();
+        let key = (header.client, header.request);
+        let Some(entry) = self.auditor.on_reply(key, header) else {
+            // Duplicate or otherwise unknown reply; do not double-apply
+            // effects or decrement the client's in-flight counter.
+            return Vec::new();
+        };
+
+        // v2.4: every op currently classifies as `Outcome::Success`
+        // because server-ng hardcodes `ReplyHeader.context = 0`. The
+        // `debug_assert_eq!` locks the contract from day one: once
+        // outcomes split, any classify_reply / sample mismatch will
+        // panic in debug builds before silently corrupting the audit.
+        let classified = ops::classify_reply(entry.action, header);
+        debug_assert_eq!(
+            classified, entry.outcome,
+            "classify_reply produced a different outcome than sample expected: 
\
+             action={:?} classified={classified:?} expected={:?}",
+            entry.action, entry.outcome,
+        );
+
+        let effect = ops::predicted_effect(&entry.input, &entry.outcome);
+        let result = self.shadow.apply(effect);
+
+        self.auditor.note_committed(entry.action);
+
+        if let Some(count) = self.in_flight_per_client.get_mut(&header.client) 
{

Review Comment:
   `count.saturating_sub(1)` will mask any future double-decrement once the 
auditor `on_reply` consumed-but-rejected hardening lands. switch to 
`checked_sub(1).expect("in_flight underflow")` so the invariant fails loud.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to