This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 702c9b3e5 feat(simulator): wire Network into tick loop with 
per-replica outboxes and replica_crash (#3049)
702c9b3e5 is described below

commit 702c9b3e55c215d9578a0858e90d982977e68208
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Apr 2 02:15:01 2026 +0530

    feat(simulator): wire Network into tick loop with per-replica outboxes and 
replica_crash (#3049)
    
    ## Summary
    
    - Replace shared `MemBus` FIFO queue with per-replica `SimOutbox`
    instances that stage outbound messages locally
    - Wire `PacketSimulator`-backed `Network` into the simulator tick loop
    for delivery with configurable delay, loss, and partitioning
    - Add `replica_crash()` which disables network links, discards outbox,
    tracks crashed nodes via `HashSet<u8>`
    
    Implements the outbox-only architecture from #3017. `replica_restart` is
    defered since it requires consensus durability support that does not yet
    exist.
    
    Closes #3048
    
    ---------
    
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 core/shard/src/lib.rs         |  27 +++++
 core/simulator/src/bus.rs     |  42 +++----
 core/simulator/src/lib.rs     | 259 ++++++++++++++++++++++++++++--------------
 core/simulator/src/main.rs    | 228 +++++++++++++++----------------------
 core/simulator/src/network.rs |  12 ++
 core/simulator/src/packet.rs  |  27 +++++
 core/simulator/src/replica.rs |  24 +---
 7 files changed, 361 insertions(+), 258 deletions(-)

diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index a38a79143..3e17f4ba2 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -141,6 +141,33 @@ where
         }
     }
 
+    /// Create a shard without inter-shard channels.
+    ///
+    /// Useful for the simulator where inbound messages are delivered directly
+    /// via [`on_message`](Self::on_message) instead of through an inbox 
channel.
+    #[must_use]
+    pub fn without_inbox(
+        id: u16,
+        name: String,
+        metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
+        partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+        shards_table: T,
+    ) -> Self {
+        // TODO: previously we used unbounded channel with flume,
+        // but this is not possible with crossfire without mangling types due 
to Flavor trait in crossfire.
+        // This needs to be revisited in the future.
+        let (_tx, inbox) = channel(1);
+        let plane = MuxPlane::new(variadic!(metadata, partitions));
+        Self {
+            id,
+            name,
+            plane,
+            senders: Vec::new(),
+            inbox,
+            shards_table,
+        }
+    }
+
     #[must_use]
     pub const fn shards_table(&self) -> &T {
         &self.shards_table
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 7781f5dbf..8dbd16f6e 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -37,37 +37,41 @@ pub struct Envelope {
     pub payload: EnvelopePayload,
 }
 
-// TODO: Proper bus with an `Network` component which would simulate sending 
packets.
-// Tigerbeetle handles this by having an list of "buses", and calling 
callbacks for clients when an response is send.
-// This requires self-referntial structs (as message_bus has to store 
collection of other buses), which is overcomplilcated.
-// I think the way we could handle that is by having an dedicated collection 
for client responses (clients_table).
-#[derive(Debug, Default)]
-pub struct MemBus {
+/// Per-replica outbox for staging outbound messages.
+///
+/// Consensus code calls `send_to_replica()` / `send_to_client()` which stage
+/// messages here. The simulator's tick loop drains each replica's outbox and
+/// feeds the messages into the [`Network`] for simulated delivery.
+#[derive(Debug)]
+pub struct SimOutbox {
+    /// Replica id that owns this outbox. Populated as `from_replica` on every 
envelope.
+    self_id: u8,
     clients: Mutex<HashSet<u128>>,
     replicas: Mutex<HashSet<u8>>,
     pending_messages: Mutex<VecDeque<Envelope>>,
 }
 
-impl MemBus {
+impl SimOutbox {
     #[must_use]
-    pub fn new() -> Self {
+    pub fn new(self_id: u8) -> Self {
         Self {
+            self_id,
             clients: Mutex::new(HashSet::new()),
             replicas: Mutex::new(HashSet::new()),
             pending_messages: Mutex::new(VecDeque::new()),
         }
     }
 
-    /// Get the next pending message from the bus
+    /// Drain all staged messages from this outbox.
     ///
     /// # Panics
     /// Panics if the internal mutex is poisoned.
-    pub fn receive(&self) -> Option<Envelope> {
-        self.pending_messages.lock().unwrap().pop_front()
+    pub fn drain(&self) -> Vec<Envelope> {
+        self.pending_messages.lock().unwrap().drain(..).collect()
     }
 }
 
-impl MessageBus for MemBus {
+impl MessageBus for SimOutbox {
     type Client = u128;
     type Replica = u8;
     type Data = Message<GenericHeader>;
@@ -108,7 +112,7 @@ impl MessageBus for MemBus {
         }
 
         self.pending_messages.lock().unwrap().push_back(Envelope {
-            from_replica: None,
+            from_replica: Some(self.self_id),
             to_replica: None,
             to_client: Some(client_id),
             payload: EnvelopePayload::Client(message.deep_copy()),
@@ -127,7 +131,7 @@ impl MessageBus for MemBus {
         }
 
         self.pending_messages.lock().unwrap().push_back(Envelope {
-            from_replica: None,
+            from_replica: Some(self.self_id),
             to_replica: Some(replica),
             to_client: None,
             payload: EnvelopePayload::Replica(message.deep_copy()),
@@ -137,18 +141,18 @@ impl MessageBus for MemBus {
     }
 }
 
-/// Newtype wrapper for shared [`MemBus`] that implements [`MessageBus`]
+/// Newtype wrapper for shared [`SimOutbox`] that implements [`MessageBus`]
 #[derive(Debug, Clone)]
-pub struct SharedMemBus(pub Arc<MemBus>);
+pub struct SharedSimOutbox(pub Arc<SimOutbox>);
 
-impl Deref for SharedMemBus {
-    type Target = MemBus;
+impl Deref for SharedSimOutbox {
+    type Target = SimOutbox;
     fn deref(&self) -> &Self::Target {
         &self.0
     }
 }
 
-impl MessageBus for SharedMemBus {
+impl MessageBus for SharedSimOutbox {
     type Client = u128;
     type Replica = u8;
     type Data = Message<GenericHeader>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 95cdb36ec..a6a6b09cb 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -23,133 +23,214 @@ pub mod packet;
 pub mod ready_queue;
 pub mod replica;
 
-use bus::{EnvelopePayload, MemBus};
+use bus::SimOutbox;
 use consensus::PartitionsHandle;
 use iggy_binary_protocol::{GenericHeader, Message, ReplyHeader};
 use iggy_common::IggyError;
 use iggy_common::sharding::IggyNamespace;
 use message_bus::MessageBus;
+use network::Network;
+use packet::{PacketSimulatorOptions, ProcessId};
 use partitions::{Partition, PartitionOffsets, PollQueryResult, PollingArgs, 
PollingConsumer};
 use replica::{Replica, new_replica};
+use std::collections::HashSet;
 use std::sync::Arc;
 
 pub struct Simulator {
+    /// All replicas, indexed by replica id. Always fully populated — crashed
+    /// replicas are kept alive but skipped during dispatch.
     pub replicas: Vec<Replica>,
-    pub message_bus: Arc<MemBus>,
+    /// Per-replica outbox, indexed by replica id. Shared with consensus inside
+    /// each replica via [`SharedSimOutbox`](bus::SharedSimOutbox).
+    pub outboxes: Vec<Arc<SimOutbox>>,
+    /// Set of replica ids that are currently crashed. Dispatch and outbox 
drain
+    /// are skipped for these ids.
+    pub crashed: HashSet<u8>,
+    pub network: Network,
+    pub replica_count: u8,
+    pub client_ids: Vec<u128>,
 }
 
 impl Simulator {
-    /// Initialize a partition with its own consensus group on all replicas.
-    pub fn init_partition(&mut self, namespace: 
iggy_common::sharding::IggyNamespace) {
-        for replica in &mut self.replicas {
-            replica.init_partition(namespace);
-        }
-    }
-
+    /// Create a new simulator with per-replica outboxes routed through a 
[`Network`].
     #[allow(clippy::cast_possible_truncation)]
-    pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> 
Self {
-        let mut message_bus = MemBus::new();
-        for client in clients {
-            message_bus.add_client(client, ());
-        }
+    pub fn new(
+        replica_count: usize,
+        clients: impl Iterator<Item = u128>,
+        network_options: PacketSimulatorOptions,
+    ) -> Self {
+        let client_ids: Vec<u128> = clients.collect();
+        let mut network = Network::new(network_options);
 
-        for i in 0..replica_count as u8 {
-            message_bus.add_replica(i);
+        for &cid in &client_ids {
+            network.register_client(cid);
         }
 
-        let message_bus = Arc::new(message_bus);
-        let replicas = (0..replica_count)
-            .map(|i| {
-                new_replica(
-                    i as u8,
-                    format!("replica-{i}"),
-                    &message_bus,
-                    replica_count as u8,
-                )
-            })
-            .collect();
+        let rc = replica_count as u8;
+        let mut replicas = Vec::with_capacity(replica_count);
+        let mut outboxes = Vec::with_capacity(replica_count);
+
+        for i in 0..replica_count {
+            let id = i as u8;
+            let mut bus = SimOutbox::new(id);
+            for &cid in &client_ids {
+                bus.add_client(cid, ());
+            }
+            for j in 0..rc {
+                bus.add_replica(j);
+            }
+            let outbox = Arc::new(bus);
+            let replica = new_replica(id, format!("replica-{i}"), &outbox, rc);
+            replicas.push(replica);
+            outboxes.push(outbox);
+        }
 
         Self {
             replicas,
-            message_bus,
+            outboxes,
+            crashed: HashSet::new(),
+            network,
+            replica_count: rc,
+            client_ids,
         }
     }
 
+    /// Initialize a partition with its own consensus group on all live 
replicas.
     #[allow(clippy::cast_possible_truncation)]
-    pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> 
Self {
-        for i in 0..replica_count as u8 {
-            message_bus.add_replica(i);
-        }
-
-        let message_bus = Arc::new(message_bus);
-        let replicas = (0..replica_count)
-            .map(|i| {
-                new_replica(
-                    i as u8,
-                    format!("replica-{i}"),
-                    &message_bus,
-                    replica_count as u8,
-                )
-            })
-            .collect();
-
-        Self {
-            replicas,
-            message_bus,
+    pub fn init_partition(&mut self, namespace: IggyNamespace) {
+        for (i, replica) in self.replicas.iter_mut().enumerate() {
+            if !self.crashed.contains(&(i as u8)) {
+                replica.init_partition(namespace);
+            }
         }
     }
-}
 
-impl Simulator {
+    /// Advance the simulation by one tick.
+    ///
+    /// Returns all client replies delivered during this tick.
+    ///
+    /// The tick has three phases that never borrow replicas and network
+    /// simultaneously:
+    ///
+    /// 1. **Deliver**: `network.step()` returns ready packets; each is
+    ///    dispatched to its target replica (or collected as a client reply).
+    /// 2. **Drain**: each live replica's outbox is drained and fed into
+    ///    `network.submit()`.
+    /// 3. **Tick**: `network.tick()` advances network time (partitions,
+    ///    clogging, etc.).
+    ///
     /// # Panics
-    /// Panics if a client response message has an invalid command type.
-    #[allow(clippy::future_not_send)]
-    pub async fn step(&self) -> Option<Message<ReplyHeader>> {
-        if let Some(envelope) = self.message_bus.receive() {
-            if let Some(_client_id) = envelope.to_client {
-                let EnvelopePayload::Client(message) = envelope.payload else {
-                    panic!("client envelope must carry a reply message");
-                };
-                let reply: Message<ReplyHeader> = message
-                    .try_into_typed()
-                    .expect("invalid message, wrong command type for a client 
response");
-                return Some(reply);
+    /// Panics if a packet addressed to a client cannot be converted to a
+    /// `ReplyHeader` message.
+    #[allow(clippy::cast_possible_truncation)]
+    pub fn step(&mut self) -> Vec<Message<ReplyHeader>> {
+        let mut client_replies = Vec::new();
+
+        // Phase 1: Deliver ready packets from the network.
+        let packets = self.network.step();
+        for packet in &packets {
+            match packet.to {
+                ProcessId::Replica(id) => {
+                    if !self.crashed.contains(&id)
+                        && let Some(replica) = self.replicas.get(id as usize)
+                    {
+                        Self::dispatch_to_replica(replica, 
packet.message.deep_copy());
+                    }
+                    // Crashed or missing: packet silently dropped.
+                }
+                ProcessId::Client(_) => {
+                    let reply: Message<ReplyHeader> = packet
+                        .message
+                        .deep_copy()
+                        .try_into_typed()
+                        .expect("invalid message, wrong command type for a 
client response");
+                    client_replies.push(reply);
+                }
             }
+        }
+        self.network.recycle_buffer(packets);
 
-            if let Some(replica_id) = envelope.to_replica
-                && let Some(replica) = self.replicas.get(replica_id as usize)
-            {
-                let EnvelopePayload::Replica(message) = envelope.payload else {
-                    panic!("replica envelope must carry a replica message");
+        // Phase 2: Drain each replica's outbox into the network.
+        for (i, outbox) in self.outboxes.iter().enumerate() {
+            let envelopes = outbox.drain();
+            if self.crashed.contains(&(i as u8)) {
+                // Defensive: discard any messages from a crashed node's 
outbox.
+                continue;
+            }
+            for envelope in envelopes {
+                let from = ProcessId::Replica(i as u8);
+                let to = if let Some(rid) = envelope.to_replica {
+                    ProcessId::Replica(rid)
+                } else if let Some(cid) = envelope.to_client {
+                    ProcessId::Client(cid)
+                } else {
+                    continue;
+                };
+                let message = match envelope.payload {
+                    bus::EnvelopePayload::Replica(m) | 
bus::EnvelopePayload::Client(m) => m,
                 };
-                self.dispatch_to_replica(replica, message).await;
+                self.network.submit(from, to, message);
             }
         }
 
-        None
+        // Phase 3: Advance network time.
+        self.network.tick();
+
+        client_replies
     }
 
-    #[allow(clippy::future_not_send)]
-    async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
-        replica.on_message(message).await;
+    /// Submit a client request into the simulated network.
+    ///
+    /// This is the simulator equivalent of a client opening a TCP connection
+    /// and sending a message to a replica.
+    pub fn submit_request(
+        &mut self,
+        client_id: u128,
+        target_replica: u8,
+        message: Message<GenericHeader>,
+    ) {
+        self.network.submit(
+            ProcessId::Client(client_id),
+            ProcessId::Replica(target_replica),
+            message,
+        );
+    }
 
-        let mut buf = Vec::new();
-        replica.process_loopback(&mut buf).await;
-        debug_assert_eq!(
-            replica.process_loopback(&mut buf).await,
-            0,
-            "on_ack must not re-enqueue loopback messages"
+    /// Crash a replica: disable its network links and discard its outbox.
+    ///
+    /// The replica object is kept alive but will not receive any messages or
+    /// have its outbox drained until a future `replica_restart` (not yet
+    /// implemented and it requires consensus durability support).
+    ///
+    /// # Panics
+    /// Panics if the replica is already crashed.
+    pub fn replica_crash(&mut self, replica_index: u8) {
+        assert!(
+            !self.crashed.contains(&replica_index),
+            "cannot crash replica {replica_index}: already down"
         );
+
+        // Discard any unsent messages (never reached the wire).
+        self.outboxes[replica_index as usize].drain();
+
+        // Block all network links to/from this process.
+        self.network
+            .process_disable(ProcessId::Replica(replica_index));
+
+        self.crashed.insert(replica_index);
+    }
+
+    /// Returns `true` if the given replica is currently crashed.
+    #[must_use]
+    pub fn is_crashed(&self, replica_index: u8) -> bool {
+        self.crashed.contains(&replica_index)
     }
-}
 
-impl Simulator {
     /// Poll messages directly from a replica's partition.
     ///
     /// # Errors
     /// Returns `IggyError::ResourceNotFound` if the namespace does not exist 
on this replica.
-    #[allow(clippy::future_not_send)]
-    pub async fn poll_messages(
+    pub fn poll_messages(
         &self,
         replica_idx: usize,
         namespace: IggyNamespace,
@@ -165,7 +246,7 @@ impl Simulator {
                 .ok_or(IggyError::ResourceNotFound(format!(
                     "partition not found for namespace {namespace:?} on 
replica {replica_idx}"
                 )))?;
-        partition.poll_messages(consumer, args).await
+        futures::executor::block_on(partition.poll_messages(consumer, args))
     }
 
     /// Get partition offsets from a replica.
@@ -179,6 +260,18 @@ impl Simulator {
         let partition = replica.plane.partitions().get_by_ns(&namespace)?;
         Some(partition.offsets())
     }
+
+    fn dispatch_to_replica(replica: &Replica, message: Message<GenericHeader>) 
{
+        futures::executor::block_on(replica.on_message(message));
+
+        let mut buf = Vec::new();
+        futures::executor::block_on(replica.process_loopback(&mut buf));
+        let loopback_count = 
futures::executor::block_on(replica.process_loopback(&mut buf));
+        debug_assert_eq!(
+            loopback_count, 0,
+            "on_ack must not re-enqueue loopback messages"
+        );
+    }
 }
 
 // TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index fe91cda8e..dc34745ea 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -16,32 +16,26 @@
 // under the License.
 
 use iggy_binary_protocol::{Message, ReplyHeader};
-use iggy_common::PollingStrategy;
 use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
-use message_bus::MessageBus;
+use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther, 
PollingStrategy};
 use partitions::{PollingArgs, PollingConsumer};
-use simulator::{Simulator, client::SimClient};
-use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
-
-/// Shared response queue for client replies
-#[derive(Default)]
-pub struct Responses {
-    queue: VecDeque<Message<ReplyHeader>>,
-}
-
-impl Responses {
-    pub fn push(&mut self, msg: Message<ReplyHeader>) {
-        self.queue.push_back(msg);
-    }
-
-    pub fn pop(&mut self) -> Option<Message<ReplyHeader>> {
-        self.queue.pop_front()
+use simulator::Simulator;
+use simulator::client::SimClient;
+use simulator::packet::PacketSimulatorOptions;
+
+/// Step the simulator until at least one client reply is received,
+/// or `max_ticks` is reached. Returns all collected replies.
+fn step_until_reply(sim: &mut Simulator, max_ticks: u64) -> 
Vec<Message<ReplyHeader>> {
+    let mut all_replies = Vec::new();
+    for _ in 0..max_ticks {
+        all_replies.extend(sim.step());
+        if !all_replies.is_empty() {
+            return all_replies;
+        }
     }
+    all_replies
 }
 
-#[allow(clippy::too_many_lines)]
 fn main() {
     // PooledBuffer::from (used by poll_messages) panics if the global pool is 
uninitialized.
     // Disabled pooling just falls through to the system allocator.
@@ -53,8 +47,16 @@ fn main() {
 
     let client_id: u128 = 1;
     let leader: u8 = 0;
-    let mut sim = Simulator::new(3, std::iter::once(client_id));
-    let bus = sim.message_bus.clone();
+
+    // Deterministic network: minimum delay, no loss, no partitions.
+    let network_opts = PacketSimulatorOptions {
+        node_count: 3,
+        client_count: 1,
+        ..PacketSimulatorOptions::default()
+    };
+
+    let mut sim = Simulator::new(3, std::iter::once(client_id), network_opts);
+    let client = SimClient::new(client_id);
 
     // Hardcoded partition for testing: stream_id=1, topic_id=1, partition_id=0
     let test_namespace = IggyNamespace::new(1, 1, 0);
@@ -63,125 +65,75 @@ fn main() {
     println!("[sim] Initializing test partition: {test_namespace:?}");
     sim.init_partition(test_namespace);
 
-    // Responses queue
-    let responses = Arc::new(Mutex::new(Responses::default()));
-    let responses_clone = responses.clone();
-
-    // TODO: Scuffed client/simulator setup.
-    // We need a better interface on simulator
-    let client_handle = std::thread::spawn(move || {
-        futures::executor::block_on(async {
-            let client = SimClient::new(client_id);
-
-            // Send some test messages to the partition
-            println!("[client] Sending messages to partition");
-            let test_messages = vec![
-                b"Hello, partition!".as_slice(),
-                b"Message 2".as_slice(),
-                b"Message 3".as_slice(),
-            ];
-
-            let send_msg = client.send_messages(test_namespace, 
&test_messages);
-            bus.send_to_replica(leader, send_msg.into_generic())
-                .await
-                .expect("failed to send messages");
-
-            loop {
-                let reply = responses_clone.lock().unwrap().pop();
-                if let Some(reply) = reply {
-                    println!("[client] Got send_messages reply: {:?}", 
reply.header());
-                    break;
-                }
-                std::thread::sleep(std::time::Duration::from_millis(1));
-            }
-
-            // Send metadata operations
-            let create_msg = client.create_stream("test-stream");
-            bus.send_to_replica(leader, create_msg.into_generic())
-                .await
-                .expect("failed to send create_stream");
-
-            loop {
-                let reply = responses_clone.lock().unwrap().pop();
-                if let Some(reply) = reply {
-                    println!("[client] Got create_stream reply: {:?}", 
reply.header());
-                    break;
-                }
-                std::thread::sleep(std::time::Duration::from_millis(1));
-            }
-
-            let delete_msg = client.delete_stream("test-stream");
-            bus.send_to_replica(leader, delete_msg.into_generic())
-                .await
-                .expect("failed to send delete_stream");
-
-            loop {
-                let reply = responses_clone.lock().unwrap().pop();
-                if let Some(reply) = reply {
-                    println!("[client] Got delete_stream reply: {:?}", 
reply.header());
-                    break;
-                }
-                std::thread::sleep(std::time::Duration::from_millis(1));
-            }
-        });
-    });
-
-    println!("[sim] Starting simulator loop");
-    futures::executor::block_on(async {
-        loop {
-            if let Some(reply) = sim.step().await {
-                responses.lock().unwrap().push(reply);
-            }
-
-            if client_handle.is_finished() {
-                break;
-            }
-        }
-
-        // Poll messages directly from the leader's partition (bypassing 
consensus)
-        let consumer = PollingConsumer::Consumer(1, 0);
-        let args = PollingArgs::new(PollingStrategy::first(), 10, false);
-        match sim
-            .poll_messages(leader as usize, test_namespace, consumer, args)
-            .await
-        {
-            Ok((fragments, _last_matching_offset)) => {
-                println!("[sim] Poll returned {} fragments", fragments.len());
-            }
-            Err(e) => {
-                println!("[sim] Poll failed: {e}");
-            }
-        }
-
-        let args_auto = PollingArgs::new(PollingStrategy::first(), 2, true);
-        if let Ok(batch) = sim
-            .poll_messages(leader as usize, test_namespace, consumer, 
args_auto)
-            .await
-        {
+    // 1. Send messages to a partition
+    println!("[sim] Sending messages to partition");
+    let test_messages = vec![
+        b"Hello, partition!".as_slice(),
+        b"Message 2".as_slice(),
+        b"Message 3".as_slice(),
+    ];
+    let send_msg = client.send_messages(test_namespace, &test_messages);
+    sim.submit_request(client_id, leader, send_msg.into_generic());
+
+    let replies = step_until_reply(&mut sim, 100);
+    assert!(!replies.is_empty(), "expected send_messages reply");
+    println!("[sim] Got send_messages reply: {:?}", replies[0].header());
+
+    // 2. Metadata operations (create + delete stream)
+    let create_msg = client.create_stream("test-stream");
+    sim.submit_request(client_id, leader, create_msg.into_generic());
+
+    let replies = step_until_reply(&mut sim, 100);
+    assert!(!replies.is_empty(), "expected create_stream reply");
+    println!("[sim] Got create_stream reply: {:?}", replies[0].header());
+
+    let delete_msg = client.delete_stream("test-stream");
+    sim.submit_request(client_id, leader, delete_msg.into_generic());
+
+    let replies = step_until_reply(&mut sim, 100);
+    assert!(!replies.is_empty(), "expected delete_stream reply");
+    println!("[sim] Got delete_stream reply: {:?}", replies[0].header());
+
+    // 3. Crash a follower and verify the cluster still commits
+    println!("\n[sim] === Crash demo ===");
+    println!("[sim] Crashing replica 2 (follower)");
+    sim.replica_crash(2);
+    assert!(sim.is_crashed(2));
+
+    let send_msg2 = client.send_messages(test_namespace, &[b"After 
crash".as_slice()]);
+    sim.submit_request(client_id, leader, send_msg2.into_generic());
+
+    let replies = step_until_reply(&mut sim, 100);
+    assert!(
+        !replies.is_empty(),
+        "expected reply even with one follower crashed"
+    );
+    println!(
+        "[sim] Got send_messages reply with replica 2 down: {:?}",
+        replies[0].header()
+    );
+
+    // 4. Poll messages and check offsets on the leader
+    let consumer = PollingConsumer::Consumer(1, 0);
+    let args = PollingArgs::new(PollingStrategy::first(), 10, false);
+    match sim.poll_messages(leader as usize, test_namespace, consumer, args) {
+        Ok((fragments, _last_matching_offset)) => {
             println!(
-                "[sim] Auto-commit poll returned {} fragments",
-                batch.0.len()
+                "[sim] Poll returned {} fragments (expected 4)",
+                fragments.len()
             );
         }
-
-        // Next poll should start from offset 2 (after auto-commit of 0,1)
-        let args_next = PollingArgs::new(PollingStrategy::next(), 10, false);
-        if let Ok(batch) = sim
-            .poll_messages(leader as usize, test_namespace, consumer, 
args_next)
-            .await
-        {
-            println!("[sim] Next poll returned {} fragments", batch.0.len());
+        Err(e) => {
+            println!("[sim] Poll failed: {e}");
         }
+    }
 
-        // Check offsets
-        if let Some(offsets) = sim.offsets(leader as usize, test_namespace) {
-            println!(
-                "[sim] Partition offsets: commit={}, write={}",
-                offsets.commit_offset, offsets.write_offset
-            );
-        }
-    });
+    if let Some(offsets) = sim.offsets(leader as usize, test_namespace) {
+        println!(
+            "[sim] Partition offsets: commit={}, write={}",
+            offsets.commit_offset, offsets.write_offset
+        );
+    }
 
-    client_handle.join().expect("client thread panicked");
-    println!("[sim] Simulator loop ended");
+    println!("[sim] Simulator finished successfully");
 }
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
index 687add486..269541a10 100644
--- a/core/simulator/src/network.rs
+++ b/core/simulator/src/network.rs
@@ -93,6 +93,18 @@ impl Network {
         self.simulator.register_client(client_id);
     }
 
+    /// Disable all links to and from a process (crash simulation).
+    ///
+    /// Packets already in flight remain queued but are dropped at delivery 
time.
+    pub fn process_disable(&mut self, process: ProcessId) {
+        self.simulator.process_disable(process);
+    }
+
+    /// Re-enable all links to and from a process (restart simulation).
+    pub fn process_enable(&mut self, process: ProcessId) {
+        self.simulator.process_enable(process);
+    }
+
     /// Set the enabled/disabled state of a specific link.
     /// Maps `enabled = true` to [`ALLOW_ALL`] and `enabled = false` to 
[`BLOCK_ALL`].
     pub fn set_link_filter(&mut self, from: ProcessId, to: ProcessId, enabled: 
bool) {
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 171e0d20a..1539df91a 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -466,6 +466,33 @@ impl PacketSimulator {
         &mut self.links[idx].drop_packet_fn
     }
 
+    /// Disable a process by blocking all links to and from it.
+    ///
+    /// Packets already queued on those links remain but will be dropped at
+    /// delivery time because the link filter is [`BLOCK_ALL`].
+    pub fn process_disable(&mut self, process: ProcessId) {
+        let all_processes: Vec<ProcessId> = 
self.process_indices.keys().copied().collect();
+        for other in all_processes {
+            if other == process {
+                continue;
+            }
+            *self.link_filter(process, other) = BLOCK_ALL;
+            *self.link_filter(other, process) = BLOCK_ALL;
+        }
+    }
+
+    /// Re-enable a process by allowing all links to and from it.
+    pub fn process_enable(&mut self, process: ProcessId) {
+        let all_processes: Vec<ProcessId> = 
self.process_indices.keys().copied().collect();
+        for other in all_processes {
+            if other == process {
+                continue;
+            }
+            *self.link_filter(process, other) = ALLOW_ALL;
+            *self.link_filter(other, process) = ALLOW_ALL;
+        }
+    }
+
     // TODO: implement record/replay_recorded for deterministic replay support.
 
     /// Deliver all packets that are ready at the current tick.
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index fd5891956..730e0af18 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::bus::{MemBus, SharedMemBus};
+use crate::bus::{SharedSimOutbox, SimOutbox};
 use crate::deps::{MemStorage, SimJournal, SimMuxStateMachine, SimSnapshot};
 use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
 use iggy_common::IggyByteSize;
@@ -34,9 +34,9 @@ const CLUSTER_ID: u128 = 1;
 // For now there is only one shard per replica,
 // we will add support for multiple shards per replica in the future.
 pub type Replica =
-    shard::IggyShard<SharedMemBus, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
+    shard::IggyShard<SharedSimOutbox, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
 
-pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, replica_count: u8) 
-> Replica {
+pub fn new_replica(id: u8, name: String, bus: &Arc<SimOutbox>, replica_count: 
u8) -> Replica {
     let users: Users = UsersInner::new().into();
     let streams: Streams = StreamsInner::new().into();
     let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into();
@@ -48,7 +48,7 @@ pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, 
replica_count: u8) -
         id,
         replica_count,
         0,
-        SharedMemBus(Arc::clone(bus)),
+        SharedSimOutbox(Arc::clone(bus)),
         LocalPipeline::new(),
     );
     metadata_consensus.init();
@@ -77,23 +77,11 @@ pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, 
replica_count: u8) -
         id,
         replica_count,
         0,
-        SharedMemBus(Arc::clone(bus)),
+        SharedSimOutbox(Arc::clone(bus)),
         NamespacedPipeline::new(),
     );
     partition_consensus.init();
     partitions.set_consensus(partition_consensus);
 
-    // TODO: previously we used used unbounded channel with flume,
-    // but this is not possible with crossfire without mangling types due to 
Flavor trait in crossfire.
-    // This needs to be revisited in the future.
-    let (_tx, inbox) = shard::channel(1024);
-    shard::IggyShard::new(
-        u16::from(id),
-        name,
-        metadata,
-        partitions,
-        Vec::new(),
-        inbox,
-        (),
-    )
+    shard::IggyShard::without_inbox(u16::from(id), name, metadata, partitions, 
())
 }

Reply via email to