This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 702c9b3e5 feat(simulator): wire Network into tick loop with
per-replica outboxes and replica_crash (#3049)
702c9b3e5 is described below
commit 702c9b3e55c215d9578a0858e90d982977e68208
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Apr 2 02:15:01 2026 +0530
feat(simulator): wire Network into tick loop with per-replica outboxes and
replica_crash (#3049)
## Summary
- Replace shared `MemBus` FIFO queue with per-replica `SimOutbox`
instances that stage outbound messages locally
- Wire `PacketSimulator`-backed `Network` into the simulator tick loop
for delivery with configurable delay, loss, and partitioning
- Add `replica_crash()` which disables network links, discards outbox,
tracks crashed nodes via `HashSet<u8>`
Implements the outbox-only architecture from #3017. `replica_restart` is
defered since it requires consensus durability support that does not yet
exist.
Closes #3048
---------
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
core/shard/src/lib.rs | 27 +++++
core/simulator/src/bus.rs | 42 +++----
core/simulator/src/lib.rs | 259 ++++++++++++++++++++++++++++--------------
core/simulator/src/main.rs | 228 +++++++++++++++----------------------
core/simulator/src/network.rs | 12 ++
core/simulator/src/packet.rs | 27 +++++
core/simulator/src/replica.rs | 24 +---
7 files changed, 361 insertions(+), 258 deletions(-)
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index a38a79143..3e17f4ba2 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -141,6 +141,33 @@ where
}
}
+ /// Create a shard without inter-shard channels.
+ ///
+ /// Useful for the simulator where inbound messages are delivered directly
+ /// via [`on_message`](Self::on_message) instead of through an inbox
channel.
+ #[must_use]
+ pub fn without_inbox(
+ id: u16,
+ name: String,
+ metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
+ partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+ shards_table: T,
+ ) -> Self {
+ // TODO: previously we used unbounded channel with flume,
+ // but this is not possible with crossfire without mangling types due
to Flavor trait in crossfire.
+ // This needs to be revisited in the future.
+ let (_tx, inbox) = channel(1);
+ let plane = MuxPlane::new(variadic!(metadata, partitions));
+ Self {
+ id,
+ name,
+ plane,
+ senders: Vec::new(),
+ inbox,
+ shards_table,
+ }
+ }
+
#[must_use]
pub const fn shards_table(&self) -> &T {
&self.shards_table
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 7781f5dbf..8dbd16f6e 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -37,37 +37,41 @@ pub struct Envelope {
pub payload: EnvelopePayload,
}
-// TODO: Proper bus with an `Network` component which would simulate sending
packets.
-// Tigerbeetle handles this by having an list of "buses", and calling
callbacks for clients when an response is send.
-// This requires self-referntial structs (as message_bus has to store
collection of other buses), which is overcomplilcated.
-// I think the way we could handle that is by having an dedicated collection
for client responses (clients_table).
-#[derive(Debug, Default)]
-pub struct MemBus {
+/// Per-replica outbox for staging outbound messages.
+///
+/// Consensus code calls `send_to_replica()` / `send_to_client()` which stage
+/// messages here. The simulator's tick loop drains each replica's outbox and
+/// feeds the messages into the [`Network`] for simulated delivery.
+#[derive(Debug)]
+pub struct SimOutbox {
+ /// Replica id that owns this outbox. Populated as `from_replica` on every
envelope.
+ self_id: u8,
clients: Mutex<HashSet<u128>>,
replicas: Mutex<HashSet<u8>>,
pending_messages: Mutex<VecDeque<Envelope>>,
}
-impl MemBus {
+impl SimOutbox {
#[must_use]
- pub fn new() -> Self {
+ pub fn new(self_id: u8) -> Self {
Self {
+ self_id,
clients: Mutex::new(HashSet::new()),
replicas: Mutex::new(HashSet::new()),
pending_messages: Mutex::new(VecDeque::new()),
}
}
- /// Get the next pending message from the bus
+ /// Drain all staged messages from this outbox.
///
/// # Panics
/// Panics if the internal mutex is poisoned.
- pub fn receive(&self) -> Option<Envelope> {
- self.pending_messages.lock().unwrap().pop_front()
+ pub fn drain(&self) -> Vec<Envelope> {
+ self.pending_messages.lock().unwrap().drain(..).collect()
}
}
-impl MessageBus for MemBus {
+impl MessageBus for SimOutbox {
type Client = u128;
type Replica = u8;
type Data = Message<GenericHeader>;
@@ -108,7 +112,7 @@ impl MessageBus for MemBus {
}
self.pending_messages.lock().unwrap().push_back(Envelope {
- from_replica: None,
+ from_replica: Some(self.self_id),
to_replica: None,
to_client: Some(client_id),
payload: EnvelopePayload::Client(message.deep_copy()),
@@ -127,7 +131,7 @@ impl MessageBus for MemBus {
}
self.pending_messages.lock().unwrap().push_back(Envelope {
- from_replica: None,
+ from_replica: Some(self.self_id),
to_replica: Some(replica),
to_client: None,
payload: EnvelopePayload::Replica(message.deep_copy()),
@@ -137,18 +141,18 @@ impl MessageBus for MemBus {
}
}
-/// Newtype wrapper for shared [`MemBus`] that implements [`MessageBus`]
+/// Newtype wrapper for shared [`SimOutbox`] that implements [`MessageBus`]
#[derive(Debug, Clone)]
-pub struct SharedMemBus(pub Arc<MemBus>);
+pub struct SharedSimOutbox(pub Arc<SimOutbox>);
-impl Deref for SharedMemBus {
- type Target = MemBus;
+impl Deref for SharedSimOutbox {
+ type Target = SimOutbox;
fn deref(&self) -> &Self::Target {
&self.0
}
}
-impl MessageBus for SharedMemBus {
+impl MessageBus for SharedSimOutbox {
type Client = u128;
type Replica = u8;
type Data = Message<GenericHeader>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 95cdb36ec..a6a6b09cb 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -23,133 +23,214 @@ pub mod packet;
pub mod ready_queue;
pub mod replica;
-use bus::{EnvelopePayload, MemBus};
+use bus::SimOutbox;
use consensus::PartitionsHandle;
use iggy_binary_protocol::{GenericHeader, Message, ReplyHeader};
use iggy_common::IggyError;
use iggy_common::sharding::IggyNamespace;
use message_bus::MessageBus;
+use network::Network;
+use packet::{PacketSimulatorOptions, ProcessId};
use partitions::{Partition, PartitionOffsets, PollQueryResult, PollingArgs,
PollingConsumer};
use replica::{Replica, new_replica};
+use std::collections::HashSet;
use std::sync::Arc;
pub struct Simulator {
+ /// All replicas, indexed by replica id. Always fully populated — crashed
+ /// replicas are kept alive but skipped during dispatch.
pub replicas: Vec<Replica>,
- pub message_bus: Arc<MemBus>,
+ /// Per-replica outbox, indexed by replica id. Shared with consensus inside
+ /// each replica via [`SharedSimOutbox`](bus::SharedSimOutbox).
+ pub outboxes: Vec<Arc<SimOutbox>>,
+ /// Set of replica ids that are currently crashed. Dispatch and outbox
drain
+ /// are skipped for these ids.
+ pub crashed: HashSet<u8>,
+ pub network: Network,
+ pub replica_count: u8,
+ pub client_ids: Vec<u128>,
}
impl Simulator {
- /// Initialize a partition with its own consensus group on all replicas.
- pub fn init_partition(&mut self, namespace:
iggy_common::sharding::IggyNamespace) {
- for replica in &mut self.replicas {
- replica.init_partition(namespace);
- }
- }
-
+ /// Create a new simulator with per-replica outboxes routed through a
[`Network`].
#[allow(clippy::cast_possible_truncation)]
- pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) ->
Self {
- let mut message_bus = MemBus::new();
- for client in clients {
- message_bus.add_client(client, ());
- }
+ pub fn new(
+ replica_count: usize,
+ clients: impl Iterator<Item = u128>,
+ network_options: PacketSimulatorOptions,
+ ) -> Self {
+ let client_ids: Vec<u128> = clients.collect();
+ let mut network = Network::new(network_options);
- for i in 0..replica_count as u8 {
- message_bus.add_replica(i);
+ for &cid in &client_ids {
+ network.register_client(cid);
}
- let message_bus = Arc::new(message_bus);
- let replicas = (0..replica_count)
- .map(|i| {
- new_replica(
- i as u8,
- format!("replica-{i}"),
- &message_bus,
- replica_count as u8,
- )
- })
- .collect();
+ let rc = replica_count as u8;
+ let mut replicas = Vec::with_capacity(replica_count);
+ let mut outboxes = Vec::with_capacity(replica_count);
+
+ for i in 0..replica_count {
+ let id = i as u8;
+ let mut bus = SimOutbox::new(id);
+ for &cid in &client_ids {
+ bus.add_client(cid, ());
+ }
+ for j in 0..rc {
+ bus.add_replica(j);
+ }
+ let outbox = Arc::new(bus);
+ let replica = new_replica(id, format!("replica-{i}"), &outbox, rc);
+ replicas.push(replica);
+ outboxes.push(outbox);
+ }
Self {
replicas,
- message_bus,
+ outboxes,
+ crashed: HashSet::new(),
+ network,
+ replica_count: rc,
+ client_ids,
}
}
+ /// Initialize a partition with its own consensus group on all live
replicas.
#[allow(clippy::cast_possible_truncation)]
- pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) ->
Self {
- for i in 0..replica_count as u8 {
- message_bus.add_replica(i);
- }
-
- let message_bus = Arc::new(message_bus);
- let replicas = (0..replica_count)
- .map(|i| {
- new_replica(
- i as u8,
- format!("replica-{i}"),
- &message_bus,
- replica_count as u8,
- )
- })
- .collect();
-
- Self {
- replicas,
- message_bus,
+ pub fn init_partition(&mut self, namespace: IggyNamespace) {
+ for (i, replica) in self.replicas.iter_mut().enumerate() {
+ if !self.crashed.contains(&(i as u8)) {
+ replica.init_partition(namespace);
+ }
}
}
-}
-impl Simulator {
+ /// Advance the simulation by one tick.
+ ///
+ /// Returns all client replies delivered during this tick.
+ ///
+ /// The tick has three phases that never borrow replicas and network
+ /// simultaneously:
+ ///
+ /// 1. **Deliver**: `network.step()` returns ready packets; each is
+ /// dispatched to its target replica (or collected as a client reply).
+ /// 2. **Drain**: each live replica's outbox is drained and fed into
+ /// `network.submit()`.
+ /// 3. **Tick**: `network.tick()` advances network time (partitions,
+ /// clogging, etc.).
+ ///
/// # Panics
- /// Panics if a client response message has an invalid command type.
- #[allow(clippy::future_not_send)]
- pub async fn step(&self) -> Option<Message<ReplyHeader>> {
- if let Some(envelope) = self.message_bus.receive() {
- if let Some(_client_id) = envelope.to_client {
- let EnvelopePayload::Client(message) = envelope.payload else {
- panic!("client envelope must carry a reply message");
- };
- let reply: Message<ReplyHeader> = message
- .try_into_typed()
- .expect("invalid message, wrong command type for a client
response");
- return Some(reply);
+ /// Panics if a packet addressed to a client cannot be converted to a
+ /// `ReplyHeader` message.
+ #[allow(clippy::cast_possible_truncation)]
+ pub fn step(&mut self) -> Vec<Message<ReplyHeader>> {
+ let mut client_replies = Vec::new();
+
+ // Phase 1: Deliver ready packets from the network.
+ let packets = self.network.step();
+ for packet in &packets {
+ match packet.to {
+ ProcessId::Replica(id) => {
+ if !self.crashed.contains(&id)
+ && let Some(replica) = self.replicas.get(id as usize)
+ {
+ Self::dispatch_to_replica(replica,
packet.message.deep_copy());
+ }
+ // Crashed or missing: packet silently dropped.
+ }
+ ProcessId::Client(_) => {
+ let reply: Message<ReplyHeader> = packet
+ .message
+ .deep_copy()
+ .try_into_typed()
+ .expect("invalid message, wrong command type for a
client response");
+ client_replies.push(reply);
+ }
}
+ }
+ self.network.recycle_buffer(packets);
- if let Some(replica_id) = envelope.to_replica
- && let Some(replica) = self.replicas.get(replica_id as usize)
- {
- let EnvelopePayload::Replica(message) = envelope.payload else {
- panic!("replica envelope must carry a replica message");
+ // Phase 2: Drain each replica's outbox into the network.
+ for (i, outbox) in self.outboxes.iter().enumerate() {
+ let envelopes = outbox.drain();
+ if self.crashed.contains(&(i as u8)) {
+ // Defensive: discard any messages from a crashed node's
outbox.
+ continue;
+ }
+ for envelope in envelopes {
+ let from = ProcessId::Replica(i as u8);
+ let to = if let Some(rid) = envelope.to_replica {
+ ProcessId::Replica(rid)
+ } else if let Some(cid) = envelope.to_client {
+ ProcessId::Client(cid)
+ } else {
+ continue;
+ };
+ let message = match envelope.payload {
+ bus::EnvelopePayload::Replica(m) |
bus::EnvelopePayload::Client(m) => m,
};
- self.dispatch_to_replica(replica, message).await;
+ self.network.submit(from, to, message);
}
}
- None
+ // Phase 3: Advance network time.
+ self.network.tick();
+
+ client_replies
}
- #[allow(clippy::future_not_send)]
- async fn dispatch_to_replica(&self, replica: &Replica, message:
Message<GenericHeader>) {
- replica.on_message(message).await;
+ /// Submit a client request into the simulated network.
+ ///
+ /// This is the simulator equivalent of a client opening a TCP connection
+ /// and sending a message to a replica.
+ pub fn submit_request(
+ &mut self,
+ client_id: u128,
+ target_replica: u8,
+ message: Message<GenericHeader>,
+ ) {
+ self.network.submit(
+ ProcessId::Client(client_id),
+ ProcessId::Replica(target_replica),
+ message,
+ );
+ }
- let mut buf = Vec::new();
- replica.process_loopback(&mut buf).await;
- debug_assert_eq!(
- replica.process_loopback(&mut buf).await,
- 0,
- "on_ack must not re-enqueue loopback messages"
+ /// Crash a replica: disable its network links and discard its outbox.
+ ///
+ /// The replica object is kept alive but will not receive any messages or
+ /// have its outbox drained until a future `replica_restart` (not yet
+ /// implemented and it requires consensus durability support).
+ ///
+ /// # Panics
+ /// Panics if the replica is already crashed.
+ pub fn replica_crash(&mut self, replica_index: u8) {
+ assert!(
+ !self.crashed.contains(&replica_index),
+ "cannot crash replica {replica_index}: already down"
);
+
+ // Discard any unsent messages (never reached the wire).
+ self.outboxes[replica_index as usize].drain();
+
+ // Block all network links to/from this process.
+ self.network
+ .process_disable(ProcessId::Replica(replica_index));
+
+ self.crashed.insert(replica_index);
+ }
+
+ /// Returns `true` if the given replica is currently crashed.
+ #[must_use]
+ pub fn is_crashed(&self, replica_index: u8) -> bool {
+ self.crashed.contains(&replica_index)
}
-}
-impl Simulator {
/// Poll messages directly from a replica's partition.
///
/// # Errors
/// Returns `IggyError::ResourceNotFound` if the namespace does not exist
on this replica.
- #[allow(clippy::future_not_send)]
- pub async fn poll_messages(
+ pub fn poll_messages(
&self,
replica_idx: usize,
namespace: IggyNamespace,
@@ -165,7 +246,7 @@ impl Simulator {
.ok_or(IggyError::ResourceNotFound(format!(
"partition not found for namespace {namespace:?} on
replica {replica_idx}"
)))?;
- partition.poll_messages(consumer, args).await
+ futures::executor::block_on(partition.poll_messages(consumer, args))
}
/// Get partition offsets from a replica.
@@ -179,6 +260,18 @@ impl Simulator {
let partition = replica.plane.partitions().get_by_ns(&namespace)?;
Some(partition.offsets())
}
+
+ fn dispatch_to_replica(replica: &Replica, message: Message<GenericHeader>)
{
+ futures::executor::block_on(replica.on_message(message));
+
+ let mut buf = Vec::new();
+ futures::executor::block_on(replica.process_loopback(&mut buf));
+ let loopback_count =
futures::executor::block_on(replica.process_loopback(&mut buf));
+ debug_assert_eq!(
+ loopback_count, 0,
+ "on_ack must not re-enqueue loopback messages"
+ );
+ }
}
// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index fe91cda8e..dc34745ea 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -16,32 +16,26 @@
// under the License.
use iggy_binary_protocol::{Message, ReplyHeader};
-use iggy_common::PollingStrategy;
use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
-use message_bus::MessageBus;
+use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther,
PollingStrategy};
use partitions::{PollingArgs, PollingConsumer};
-use simulator::{Simulator, client::SimClient};
-use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
-
-/// Shared response queue for client replies
-#[derive(Default)]
-pub struct Responses {
- queue: VecDeque<Message<ReplyHeader>>,
-}
-
-impl Responses {
- pub fn push(&mut self, msg: Message<ReplyHeader>) {
- self.queue.push_back(msg);
- }
-
- pub fn pop(&mut self) -> Option<Message<ReplyHeader>> {
- self.queue.pop_front()
+use simulator::Simulator;
+use simulator::client::SimClient;
+use simulator::packet::PacketSimulatorOptions;
+
+/// Step the simulator until at least one client reply is received,
+/// or `max_ticks` is reached. Returns all collected replies.
+fn step_until_reply(sim: &mut Simulator, max_ticks: u64) ->
Vec<Message<ReplyHeader>> {
+ let mut all_replies = Vec::new();
+ for _ in 0..max_ticks {
+ all_replies.extend(sim.step());
+ if !all_replies.is_empty() {
+ return all_replies;
+ }
}
+ all_replies
}
-#[allow(clippy::too_many_lines)]
fn main() {
// PooledBuffer::from (used by poll_messages) panics if the global pool is
uninitialized.
// Disabled pooling just falls through to the system allocator.
@@ -53,8 +47,16 @@ fn main() {
let client_id: u128 = 1;
let leader: u8 = 0;
- let mut sim = Simulator::new(3, std::iter::once(client_id));
- let bus = sim.message_bus.clone();
+
+ // Deterministic network: minimum delay, no loss, no partitions.
+ let network_opts = PacketSimulatorOptions {
+ node_count: 3,
+ client_count: 1,
+ ..PacketSimulatorOptions::default()
+ };
+
+ let mut sim = Simulator::new(3, std::iter::once(client_id), network_opts);
+ let client = SimClient::new(client_id);
// Hardcoded partition for testing: stream_id=1, topic_id=1, partition_id=0
let test_namespace = IggyNamespace::new(1, 1, 0);
@@ -63,125 +65,75 @@ fn main() {
println!("[sim] Initializing test partition: {test_namespace:?}");
sim.init_partition(test_namespace);
- // Responses queue
- let responses = Arc::new(Mutex::new(Responses::default()));
- let responses_clone = responses.clone();
-
- // TODO: Scuffed client/simulator setup.
- // We need a better interface on simulator
- let client_handle = std::thread::spawn(move || {
- futures::executor::block_on(async {
- let client = SimClient::new(client_id);
-
- // Send some test messages to the partition
- println!("[client] Sending messages to partition");
- let test_messages = vec![
- b"Hello, partition!".as_slice(),
- b"Message 2".as_slice(),
- b"Message 3".as_slice(),
- ];
-
- let send_msg = client.send_messages(test_namespace,
&test_messages);
- bus.send_to_replica(leader, send_msg.into_generic())
- .await
- .expect("failed to send messages");
-
- loop {
- let reply = responses_clone.lock().unwrap().pop();
- if let Some(reply) = reply {
- println!("[client] Got send_messages reply: {:?}",
reply.header());
- break;
- }
- std::thread::sleep(std::time::Duration::from_millis(1));
- }
-
- // Send metadata operations
- let create_msg = client.create_stream("test-stream");
- bus.send_to_replica(leader, create_msg.into_generic())
- .await
- .expect("failed to send create_stream");
-
- loop {
- let reply = responses_clone.lock().unwrap().pop();
- if let Some(reply) = reply {
- println!("[client] Got create_stream reply: {:?}",
reply.header());
- break;
- }
- std::thread::sleep(std::time::Duration::from_millis(1));
- }
-
- let delete_msg = client.delete_stream("test-stream");
- bus.send_to_replica(leader, delete_msg.into_generic())
- .await
- .expect("failed to send delete_stream");
-
- loop {
- let reply = responses_clone.lock().unwrap().pop();
- if let Some(reply) = reply {
- println!("[client] Got delete_stream reply: {:?}",
reply.header());
- break;
- }
- std::thread::sleep(std::time::Duration::from_millis(1));
- }
- });
- });
-
- println!("[sim] Starting simulator loop");
- futures::executor::block_on(async {
- loop {
- if let Some(reply) = sim.step().await {
- responses.lock().unwrap().push(reply);
- }
-
- if client_handle.is_finished() {
- break;
- }
- }
-
- // Poll messages directly from the leader's partition (bypassing
consensus)
- let consumer = PollingConsumer::Consumer(1, 0);
- let args = PollingArgs::new(PollingStrategy::first(), 10, false);
- match sim
- .poll_messages(leader as usize, test_namespace, consumer, args)
- .await
- {
- Ok((fragments, _last_matching_offset)) => {
- println!("[sim] Poll returned {} fragments", fragments.len());
- }
- Err(e) => {
- println!("[sim] Poll failed: {e}");
- }
- }
-
- let args_auto = PollingArgs::new(PollingStrategy::first(), 2, true);
- if let Ok(batch) = sim
- .poll_messages(leader as usize, test_namespace, consumer,
args_auto)
- .await
- {
+ // 1. Send messages to a partition
+ println!("[sim] Sending messages to partition");
+ let test_messages = vec![
+ b"Hello, partition!".as_slice(),
+ b"Message 2".as_slice(),
+ b"Message 3".as_slice(),
+ ];
+ let send_msg = client.send_messages(test_namespace, &test_messages);
+ sim.submit_request(client_id, leader, send_msg.into_generic());
+
+ let replies = step_until_reply(&mut sim, 100);
+ assert!(!replies.is_empty(), "expected send_messages reply");
+ println!("[sim] Got send_messages reply: {:?}", replies[0].header());
+
+ // 2. Metadata operations (create + delete stream)
+ let create_msg = client.create_stream("test-stream");
+ sim.submit_request(client_id, leader, create_msg.into_generic());
+
+ let replies = step_until_reply(&mut sim, 100);
+ assert!(!replies.is_empty(), "expected create_stream reply");
+ println!("[sim] Got create_stream reply: {:?}", replies[0].header());
+
+ let delete_msg = client.delete_stream("test-stream");
+ sim.submit_request(client_id, leader, delete_msg.into_generic());
+
+ let replies = step_until_reply(&mut sim, 100);
+ assert!(!replies.is_empty(), "expected delete_stream reply");
+ println!("[sim] Got delete_stream reply: {:?}", replies[0].header());
+
+ // 3. Crash a follower and verify the cluster still commits
+ println!("\n[sim] === Crash demo ===");
+ println!("[sim] Crashing replica 2 (follower)");
+ sim.replica_crash(2);
+ assert!(sim.is_crashed(2));
+
+ let send_msg2 = client.send_messages(test_namespace, &[b"After
crash".as_slice()]);
+ sim.submit_request(client_id, leader, send_msg2.into_generic());
+
+ let replies = step_until_reply(&mut sim, 100);
+ assert!(
+ !replies.is_empty(),
+ "expected reply even with one follower crashed"
+ );
+ println!(
+ "[sim] Got send_messages reply with replica 2 down: {:?}",
+ replies[0].header()
+ );
+
+ // 4. Poll messages and check offsets on the leader
+ let consumer = PollingConsumer::Consumer(1, 0);
+ let args = PollingArgs::new(PollingStrategy::first(), 10, false);
+ match sim.poll_messages(leader as usize, test_namespace, consumer, args) {
+ Ok((fragments, _last_matching_offset)) => {
println!(
- "[sim] Auto-commit poll returned {} fragments",
- batch.0.len()
+ "[sim] Poll returned {} fragments (expected 4)",
+ fragments.len()
);
}
-
- // Next poll should start from offset 2 (after auto-commit of 0,1)
- let args_next = PollingArgs::new(PollingStrategy::next(), 10, false);
- if let Ok(batch) = sim
- .poll_messages(leader as usize, test_namespace, consumer,
args_next)
- .await
- {
- println!("[sim] Next poll returned {} fragments", batch.0.len());
+ Err(e) => {
+ println!("[sim] Poll failed: {e}");
}
+ }
- // Check offsets
- if let Some(offsets) = sim.offsets(leader as usize, test_namespace) {
- println!(
- "[sim] Partition offsets: commit={}, write={}",
- offsets.commit_offset, offsets.write_offset
- );
- }
- });
+ if let Some(offsets) = sim.offsets(leader as usize, test_namespace) {
+ println!(
+ "[sim] Partition offsets: commit={}, write={}",
+ offsets.commit_offset, offsets.write_offset
+ );
+ }
- client_handle.join().expect("client thread panicked");
- println!("[sim] Simulator loop ended");
+ println!("[sim] Simulator finished successfully");
}
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
index 687add486..269541a10 100644
--- a/core/simulator/src/network.rs
+++ b/core/simulator/src/network.rs
@@ -93,6 +93,18 @@ impl Network {
self.simulator.register_client(client_id);
}
+ /// Disable all links to and from a process (crash simulation).
+ ///
+ /// Packets already in flight remain queued but are dropped at delivery
time.
+ pub fn process_disable(&mut self, process: ProcessId) {
+ self.simulator.process_disable(process);
+ }
+
+ /// Re-enable all links to and from a process (restart simulation).
+ pub fn process_enable(&mut self, process: ProcessId) {
+ self.simulator.process_enable(process);
+ }
+
/// Set the enabled/disabled state of a specific link.
/// Maps `enabled = true` to [`ALLOW_ALL`] and `enabled = false` to
[`BLOCK_ALL`].
pub fn set_link_filter(&mut self, from: ProcessId, to: ProcessId, enabled:
bool) {
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 171e0d20a..1539df91a 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -466,6 +466,33 @@ impl PacketSimulator {
&mut self.links[idx].drop_packet_fn
}
+ /// Disable a process by blocking all links to and from it.
+ ///
+ /// Packets already queued on those links remain but will be dropped at
+ /// delivery time because the link filter is [`BLOCK_ALL`].
+ pub fn process_disable(&mut self, process: ProcessId) {
+ let all_processes: Vec<ProcessId> =
self.process_indices.keys().copied().collect();
+ for other in all_processes {
+ if other == process {
+ continue;
+ }
+ *self.link_filter(process, other) = BLOCK_ALL;
+ *self.link_filter(other, process) = BLOCK_ALL;
+ }
+ }
+
+ /// Re-enable a process by allowing all links to and from it.
+ pub fn process_enable(&mut self, process: ProcessId) {
+ let all_processes: Vec<ProcessId> =
self.process_indices.keys().copied().collect();
+ for other in all_processes {
+ if other == process {
+ continue;
+ }
+ *self.link_filter(process, other) = ALLOW_ALL;
+ *self.link_filter(other, process) = ALLOW_ALL;
+ }
+ }
+
// TODO: implement record/replay_recorded for deterministic replay support.
/// Deliver all packets that are ready at the current tick.
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index fd5891956..730e0af18 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::bus::{MemBus, SharedMemBus};
+use crate::bus::{SharedSimOutbox, SimOutbox};
use crate::deps::{MemStorage, SimJournal, SimMuxStateMachine, SimSnapshot};
use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
use iggy_common::IggyByteSize;
@@ -34,9 +34,9 @@ const CLUSTER_ID: u128 = 1;
// For now there is only one shard per replica,
// we will add support for multiple shards per replica in the future.
pub type Replica =
- shard::IggyShard<SharedMemBus, SimJournal<MemStorage>, SimSnapshot,
SimMuxStateMachine>;
+ shard::IggyShard<SharedSimOutbox, SimJournal<MemStorage>, SimSnapshot,
SimMuxStateMachine>;
-pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, replica_count: u8)
-> Replica {
+pub fn new_replica(id: u8, name: String, bus: &Arc<SimOutbox>, replica_count:
u8) -> Replica {
let users: Users = UsersInner::new().into();
let streams: Streams = StreamsInner::new().into();
let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into();
@@ -48,7 +48,7 @@ pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>,
replica_count: u8) -
id,
replica_count,
0,
- SharedMemBus(Arc::clone(bus)),
+ SharedSimOutbox(Arc::clone(bus)),
LocalPipeline::new(),
);
metadata_consensus.init();
@@ -77,23 +77,11 @@ pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>,
replica_count: u8) -
id,
replica_count,
0,
- SharedMemBus(Arc::clone(bus)),
+ SharedSimOutbox(Arc::clone(bus)),
NamespacedPipeline::new(),
);
partition_consensus.init();
partitions.set_consensus(partition_consensus);
- // TODO: previously we used used unbounded channel with flume,
- // but this is not possible with crossfire without mangling types due to
Flavor trait in crossfire.
- // This needs to be revisited in the future.
- let (_tx, inbox) = shard::channel(1024);
- shard::IggyShard::new(
- u16::from(id),
- name,
- metadata,
- partitions,
- Vec::new(),
- inbox,
- (),
- )
+ shard::IggyShard::without_inbox(u16::from(id), name, metadata, partitions,
())
}