numinnex commented on code in PR #3049:
URL: https://github.com/apache/iggy/pull/3049#discussion_r3015915558
##########
core/simulator/src/lib.rs:
##########
@@ -23,122 +23,206 @@ pub mod packet;
pub mod ready_queue;
pub mod replica;
-use bus::MemBus;
+use bus::SimOutbox;
use consensus::PartitionsHandle;
use iggy_binary_protocol::{GenericHeader, Message, ReplyHeader};
use iggy_common::sharding::IggyNamespace;
use iggy_common::{IggyError, IggyMessagesBatchSet};
use message_bus::MessageBus;
+use network::Network;
+use packet::{PacketSimulatorOptions, ProcessId};
use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer};
use replica::{Replica, new_replica};
+use std::collections::HashSet;
use std::sync::Arc;
pub struct Simulator {
+ /// All replicas, indexed by replica id. Always fully populated — crashed
+ /// replicas are kept alive but skipped during dispatch.
pub replicas: Vec<Replica>,
- pub message_bus: Arc<MemBus>,
+ /// Per-replica outbox, indexed by replica id. Shared with consensus inside
+ /// each replica via [`SharedSimOutbox`](bus::SharedSimOutbox).
+ pub outboxes: Vec<Arc<SimOutbox>>,
+ /// Set of replica ids that are currently crashed. Dispatch and outbox
drain
+ /// are skipped for these ids.
+ pub crashed: HashSet<u8>,
+ pub network: Network,
+ pub replica_count: u8,
+ pub client_ids: Vec<u128>,
}
impl Simulator {
- /// Initialize a partition with its own consensus group on all replicas.
- pub fn init_partition(&mut self, namespace:
iggy_common::sharding::IggyNamespace) {
- for replica in &mut self.replicas {
- replica.init_partition(namespace);
- }
- }
-
+ /// Create a new simulator with per-replica outboxes routed through a
[`Network`].
#[allow(clippy::cast_possible_truncation)]
- pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) ->
Self {
- let mut message_bus = MemBus::new();
- for client in clients {
- message_bus.add_client(client, ());
- }
+ pub fn new(
+ replica_count: usize,
+ clients: impl Iterator<Item = u128>,
+ network_options: PacketSimulatorOptions,
+ ) -> Self {
+ let client_ids: Vec<u128> = clients.collect();
+ let mut network = Network::new(network_options);
- for i in 0..replica_count as u8 {
- message_bus.add_replica(i);
+ for &cid in &client_ids {
+ network.register_client(cid);
}
- let message_bus = Arc::new(message_bus);
- let replicas = (0..replica_count)
- .map(|i| {
- new_replica(
- i as u8,
- format!("replica-{i}"),
- &message_bus,
- replica_count as u8,
- )
- })
- .collect();
+ let rc = replica_count as u8;
+ let mut replicas = Vec::with_capacity(replica_count);
+ let mut outboxes = Vec::with_capacity(replica_count);
+
+ for i in 0..replica_count {
+ let id = i as u8;
+ let mut bus = SimOutbox::new(id);
+ for &cid in &client_ids {
+ bus.add_client(cid, ());
+ }
+ for j in 0..rc {
+ bus.add_replica(j);
+ }
+ let outbox = Arc::new(bus);
+ let replica = new_replica(id, format!("replica-{i}"), &outbox, rc);
+ replicas.push(replica);
+ outboxes.push(outbox);
+ }
Self {
replicas,
- message_bus,
+ outboxes,
+ crashed: HashSet::new(),
+ network,
+ replica_count: rc,
+ client_ids,
}
}
+ /// Initialize a partition with its own consensus group on all live
replicas.
#[allow(clippy::cast_possible_truncation)]
- pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) ->
Self {
- for i in 0..replica_count as u8 {
- message_bus.add_replica(i);
- }
-
- let message_bus = Arc::new(message_bus);
- let replicas = (0..replica_count)
- .map(|i| {
- new_replica(
- i as u8,
- format!("replica-{i}"),
- &message_bus,
- replica_count as u8,
- )
- })
- .collect();
-
- Self {
- replicas,
- message_bus,
+ pub fn init_partition(&mut self, namespace: IggyNamespace) {
+ for (i, replica) in self.replicas.iter_mut().enumerate() {
+ if !self.crashed.contains(&(i as u8)) {
+ replica.init_partition(namespace);
+ }
}
}
-}
-impl Simulator {
+ /// Advance the simulation by one tick.
+ ///
+ /// Returns all client replies delivered during this tick.
+ ///
+ /// The tick has three phases that never borrow replicas and network
+ /// simultaneously:
+ ///
+ /// 1. **Deliver**: `network.step()` returns ready packets; each is
+ /// dispatched to its target replica (or collected as a client reply).
+ /// 2. **Drain**: each live replica's outbox is drained and fed into
+ /// `network.submit()`.
+ /// 3. **Tick**: `network.tick()` advances network time (partitions,
+ /// clogging, etc.).
+ ///
/// # Panics
- /// Panics if a client response message has an invalid command type.
- #[allow(clippy::future_not_send)]
- pub async fn step(&self) -> Option<Message<ReplyHeader>> {
- if let Some(envelope) = self.message_bus.receive() {
- if let Some(_client_id) = envelope.to_client {
- let reply: Message<ReplyHeader> = envelope
- .message
- .try_into_typed()
- .expect("invalid message, wrong command type for an client
response");
- return Some(reply);
+ /// Panics if a packet addressed to a client cannot be converted to a
+ /// `ReplyHeader` message.
+ #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
+ pub async fn step(&mut self) -> Vec<Message<ReplyHeader>> {
+ let mut client_replies = Vec::new();
+
+ // Phase 1: Deliver ready packets from the network.
+ let packets = self.network.step();
+ for packet in &packets {
+ match packet.to {
+ ProcessId::Replica(id) => {
+ if !self.crashed.contains(&id)
+ && let Some(replica) = self.replicas.get(id as usize)
+ {
+ Self::dispatch_to_replica(replica,
packet.message.clone()).await;
Review Comment:
I think something that we could potentially do there is to create an
dedicated `compio` task per dispatch, to simulate multiple peers concurrently
sending messages to replica.
##########
core/simulator/src/main.rs:
##########
@@ -16,32 +16,27 @@
// under the License.
use iggy_binary_protocol::{Message, ReplyHeader};
-use iggy_common::PollingStrategy;
use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
-use message_bus::MessageBus;
+use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther,
PollingStrategy};
use partitions::{PollingArgs, PollingConsumer};
-use simulator::{Simulator, client::SimClient};
-use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
-
-/// Shared response queue for client replies
-#[derive(Default)]
-pub struct Responses {
- queue: VecDeque<Message<ReplyHeader>>,
-}
-
-impl Responses {
- pub fn push(&mut self, msg: Message<ReplyHeader>) {
- self.queue.push_back(msg);
- }
-
- pub fn pop(&mut self) -> Option<Message<ReplyHeader>> {
- self.queue.pop_front()
+use simulator::Simulator;
+use simulator::client::SimClient;
+use simulator::packet::PacketSimulatorOptions;
+
+/// Step the simulator until at least one client reply is received,
+/// or `max_ticks` is reached. Returns all collected replies.
+fn step_until_reply(sim: &mut Simulator, max_ticks: u64) ->
Vec<Message<ReplyHeader>> {
Review Comment:
We could probably move to start using `compio`, rather than default executor
from `futures`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]