This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 27f0f11c6 feat(consensus): wire up view change protocol in simulator
(#3092)
27f0f11c6 is described below
commit 27f0f11c65f5289bda2f67d9cf95526a87121aa3
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Apr 10 16:38:24 2026 +0530
feat(consensus): wire up view change protocol in simulator (#3092)
- Dispatch all VsrAction variants (SVC/DVC/SV/PrepareOk/Commit from tick
and incoming message handlers.
- Add CommitMessage heartbeat: primary periodically sends commit point
to backups.
- Integrate consensus tick into Simulator::step().
- Add RebuildPipeline action with primary self-ack so quorum be reached
after view change.
---
core/binary_protocol/src/consensus/message.rs | 38 ++-
core/consensus/src/impls.rs | 143 +++++++-
core/consensus/src/observability.rs | 27 ++
core/metadata/src/impls/metadata.rs | 4 +-
core/partitions/src/iggy_partitions.rs | 4 +-
core/shard/src/lib.rs | 468 +++++++++++++++++++++++++-
core/shard/src/router.rs | 65 +++-
core/simulator/src/lib.rs | 188 ++++++++++-
core/simulator/src/replica.rs | 10 +-
9 files changed, 911 insertions(+), 36 deletions(-)
diff --git a/core/binary_protocol/src/consensus/message.rs
b/core/binary_protocol/src/consensus/message.rs
index 50ae07ac5..db85a7229 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -16,8 +16,9 @@
// under the License.
use crate::consensus::{
- self, Command2, ConsensusError, ConsensusHeader, GenericHeader,
PrepareHeader, PrepareOkHeader,
- RequestHeader,
+ self, Command2, CommitHeader, ConsensusError, ConsensusHeader,
DoViewChangeHeader,
+ GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader,
StartViewChangeHeader,
+ StartViewHeader,
};
use iobuf::{Frozen, Owned};
use smallvec::SmallVec;
@@ -458,6 +459,10 @@ pub enum MessageBag {
Request(Message<RequestHeader>),
Prepare(Message<PrepareHeader>),
PrepareOk(Message<PrepareOkHeader>),
+ StartViewChange(Message<StartViewChangeHeader>),
+ DoViewChange(Message<DoViewChangeHeader>),
+ StartView(Message<StartViewHeader>),
+ Commit(Message<CommitHeader>),
}
impl MessageBag {
@@ -467,6 +472,10 @@ impl MessageBag {
Self::Request(message) => message.header().command,
Self::Prepare(message) => message.header().command,
Self::PrepareOk(message) => message.header().command,
+ Self::StartViewChange(message) => message.header().command,
+ Self::DoViewChange(message) => message.header().command,
+ Self::StartView(message) => message.header().command,
+ Self::Commit(message) => message.header().command,
}
}
@@ -476,6 +485,10 @@ impl MessageBag {
Self::Request(message) => message.header().size(),
Self::Prepare(message) => message.header().size(),
Self::PrepareOk(message) => message.header().size(),
+ Self::StartViewChange(message) => message.header().size(),
+ Self::DoViewChange(message) => message.header().size(),
+ Self::StartView(message) => message.header().size(),
+ Self::Commit(message) => message.header().size(),
}
}
@@ -485,6 +498,10 @@ impl MessageBag {
Self::Request(message) => message.header().operation,
Self::Prepare(message) => message.header().operation,
Self::PrepareOk(message) => message.header().operation,
+ Self::StartViewChange(message) => message.header().operation(),
+ Self::DoViewChange(message) => message.header().operation(),
+ Self::StartView(message) => message.header().operation(),
+ Self::Commit(message) => message.header().operation(),
}
}
}
@@ -512,6 +529,23 @@ where
let msg = unsafe {
Message::<PrepareOkHeader>::from_backing_unchecked(backing) };
Ok(Self::PrepareOk(msg))
}
+ Command2::StartViewChange => {
+ let msg =
+ unsafe {
Message::<StartViewChangeHeader>::from_backing_unchecked(backing) };
+ Ok(Self::StartViewChange(msg))
+ }
+ Command2::DoViewChange => {
+ let msg = unsafe {
Message::<DoViewChangeHeader>::from_backing_unchecked(backing) };
+ Ok(Self::DoViewChange(msg))
+ }
+ Command2::StartView => {
+ let msg = unsafe {
Message::<StartViewHeader>::from_backing_unchecked(backing) };
+ Ok(Self::StartView(msg))
+ }
+ Command2::Commit => {
+ let msg = unsafe {
Message::<CommitHeader>::from_backing_unchecked(backing) };
+ Ok(Self::Commit(msg))
+ }
other => Err(ConsensusError::InvalidCommand {
expected: Command2::Reserved,
found: other,
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 55101854c..d0932afa0 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -448,6 +448,26 @@ pub enum VsrAction {
RetransmitPrepares {
targets: Vec<(PrepareHeader, Vec<u8>)>,
},
+ /// Rebuild the pipeline from the journal after a view change.
+ ///
+ /// The new primary must re-populate its pipeline with uncommitted ops
+ /// from the WAL so that incoming `PrepareOk` messages can be matched
+ /// and commits can proceed.
+ RebuildPipeline { from_op: u64, to_op: u64 },
+ /// Catch up `commit_min` to `commit_max` by applying committed ops from
the
+ /// journal. Emitted during view change completion so the new primary
+ /// is fully caught up before accepting new requests.
+ CommitJournal,
+ /// Primary heartbeat: send current commit point to all backups.
+ ///
+ /// Emitted when the `CommitMessage` timeout fires. Prevents backups
+ /// from starting a view change during idle periods.
+ SendCommit {
+ view: u32,
+ commit: u64,
+ namespace: u64,
+ timestamp_monotonic: u64,
+ },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -530,6 +550,10 @@ where
timeouts: RefCell<TimeoutManager>,
+ /// Monotonic timestamp from the most recent accepted commit heartbeat.
+ /// Old/replayed commit messages with a lower timestamp are ignored.
+ heartbeat_timestamp: Cell<u64>,
+
/// VSR client-table for duplicate detection and reply caching.
client_table: RefCell<ClientTable>,
}
@@ -577,13 +601,20 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
sent_own_start_view_change: Cell::new(false),
sent_own_do_view_change: Cell::new(false),
timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
+ heartbeat_timestamp: Cell::new(0),
client_table: RefCell::new(ClientTable::new(CLIENTS_TABLE_MAX)),
}
}
- // TODO: More init logic.
pub fn init(&self) {
self.status.set(Status::Normal);
+ let mut timeouts = self.timeouts.borrow_mut();
+ if self.is_primary() {
+ timeouts.start(TimeoutKind::Prepare);
+ timeouts.start(TimeoutKind::CommitMessage);
+ } else {
+ timeouts.start(TimeoutKind::NormalHeartbeat);
+ }
}
#[must_use]
@@ -817,6 +848,12 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
timeouts = self.timeouts.borrow_mut();
}
+ if timeouts.fired(TimeoutKind::CommitMessage) {
+ drop(timeouts);
+ actions.extend(self.handle_commit_message_timeout());
+ timeouts = self.timeouts.borrow_mut();
+ }
+
if timeouts.fired(TimeoutKind::ViewChangeStatus) {
drop(timeouts);
actions.extend(self.handle_view_change_status_timeout(plane));
@@ -1043,6 +1080,30 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
vec![VsrAction::RetransmitPrepares { targets }]
}
+ /// Primary heartbeat: send commit point to all backups so they know
+ /// the primary is alive and can advance their own `commit_max`.
+ fn handle_commit_message_timeout(&self) -> Vec<VsrAction> {
+ if !self.is_primary() || self.status.get() != Status::Normal {
+ return Vec::new();
+ }
+
+ self.timeouts.borrow_mut().reset(TimeoutKind::CommitMessage);
+
+ // Don't advertise a commit point we haven't locally executed yet.
+ // After view change the new primary may have commit_min < commit_max
+ // until commit_journal catches up. Send commit_min (what we've
+ // actually applied) so backups don't advance past us.
+ let ts = self.heartbeat_timestamp.get() + 1;
+ self.heartbeat_timestamp.set(ts);
+
+ vec![VsrAction::SendCommit {
+ view: self.view.get(),
+ commit: self.commit_min.get(),
+ namespace: self.namespace,
+ timestamp_monotonic: ts,
+ }]
+ }
+
/// Handle a received `StartViewChange` message.
///
/// "When replica i receives STARTVIEWCHANGE messages for its view-number
@@ -1397,6 +1458,56 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
}
+ /// Handle a `Commit` (heartbeat) message from the primary.
+ ///
+ /// Advances `commit_max` and resets the backup's `NormalHeartbeat` timeout
+ /// so it doesn't start a spurious view change. Returns `true` if
+ /// `commit_max` advanced, signalling the caller to run `commit_journal`.
+ ///
+ /// Only accepts heartbeats with a strictly newer monotonic timestamp
+ /// to prevent old/replayed messages from suppressing view changes.
+ ///
+ /// # Panics
+ /// If `header.namespace` does not match this replica's namespace.
+ pub fn handle_commit(&self, header: &iggy_binary_protocol::CommitHeader)
-> bool {
+ assert_eq!(
+ header.namespace, self.namespace,
+ "Commit routed to wrong group"
+ );
+
+ if self.is_primary() {
+ return false;
+ }
+
+ if self.status.get() != Status::Normal {
+ return false;
+ }
+
+ if header.view != self.view.get() {
+ return false;
+ }
+
+ // TODO: Once connection-level peer verification is added promote
+ // this to an assert — the network layer would guarantee the sender
+ // matches header.replica.
+ if header.replica != self.primary_index(header.view) {
+ return false;
+ }
+
+ // Only accept heartbeats with a strictly newer timestamp to prevent
+ // old/replayed commit messages from resetting the timeout.
+ if self.heartbeat_timestamp.get() < header.timestamp_monotonic {
+ self.heartbeat_timestamp.set(header.timestamp_monotonic);
+ self.timeouts
+ .borrow_mut()
+ .reset(TimeoutKind::NormalHeartbeat);
+ }
+
+ let old_commit_max = self.commit_max.get();
+ self.advance_commit_max(header.commit);
+ self.commit_max.get() > old_commit_max
+ }
+
/// Complete view change as the new primary after collecting DVC quorum.
///
/// # Client-table maintenance
@@ -1439,6 +1550,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
timeouts.stop(TimeoutKind::DoViewChangeMessage);
timeouts.stop(TimeoutKind::StartViewChangeMessage);
timeouts.start(TimeoutKind::CommitMessage);
+ // If there are uncommitted ops in the rebuilt pipeline, start the
+ // Prepare timeout so that lost PrepareOks trigger retransmission.
+ if max_commit < new_op {
+ timeouts.start(TimeoutKind::Prepare);
+ }
}
let state = ReplicaLogContext::from_consensus(self, plane);
@@ -1458,7 +1574,30 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
&action,
),
);
- vec![action]
+
+ let mut actions = vec![action];
+ // Catch up commit_min to commit_max before rebuilding the pipeline.
+ // Without this, a behind backup (commit_min < max_commit) that becomes
+ // primary would have unapplied committed ops.
+ actions.push(VsrAction::CommitJournal);
+ // The new primary must rebuild its pipeline from the journal so that
+ // incoming PrepareOk messages can be matched and commits can proceed.
+ if max_commit < new_op {
+ assert!(
+ (new_op - max_commit) <= PIPELINE_PREPARE_QUEUE_MAX as u64,
+ "view change: uncommitted range {}..={} ({} ops) exceeds
pipeline capacity ({}); \
+ DVC winner claims more in-flight ops than the pipeline can
hold",
+ max_commit + 1,
+ new_op,
+ new_op - max_commit,
+ PIPELINE_PREPARE_QUEUE_MAX,
+ );
+ actions.push(VsrAction::RebuildPipeline {
+ from_op: max_commit + 1,
+ to_op: new_op,
+ });
+ }
+ actions
}
/// Handle a `PrepareOk` message from a replica.
diff --git a/core/consensus/src/observability.rs
b/core/consensus/src/observability.rs
index c4eaed89c..190e0af22 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -115,6 +115,9 @@ pub enum ControlActionKind {
SendStartView,
SendPrepareOk,
SendPrepare,
+ RebuildPipeline,
+ CommitJournal,
+ SendCommit,
}
impl ControlActionKind {
@@ -126,6 +129,9 @@ impl ControlActionKind {
Self::SendStartView => "send_start_view",
Self::SendPrepareOk => "send_prepare_ok",
Self::SendPrepare => "send_prepare",
+ Self::RebuildPipeline => "rebuild_pipeline",
+ Self::CommitJournal => "commit_journal",
+ Self::SendCommit => "send_commit",
}
}
}
@@ -340,6 +346,27 @@ impl ControlActionLogEvent {
op: None,
commit: None,
},
+ VsrAction::RebuildPipeline { from_op, to_op, .. } => Self {
+ replica,
+ action: ControlActionKind::RebuildPipeline,
+ target_replica: None,
+ op: Some(from_op),
+ commit: Some(to_op),
+ },
+ VsrAction::CommitJournal => Self {
+ replica,
+ action: ControlActionKind::CommitJournal,
+ target_replica: None,
+ op: None,
+ commit: None,
+ },
+ VsrAction::SendCommit { commit, .. } => Self {
+ replica,
+ action: ControlActionKind::SendCommit,
+ target_replica: None,
+ op: None,
+ commit: Some(commit),
+ },
}
}
}
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index e0f65fc0d..79b3d98e1 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -646,9 +646,9 @@ where
/// and updating the client table for each.
///
/// The backup does NOT send replies to clients, only the primary does
that.
- #[allow(clippy::cast_possible_truncation)]
+ #[allow(clippy::cast_possible_truncation, clippy::missing_panics_doc)]
#[allow(clippy::future_not_send)]
- async fn commit_journal(&self) {
+ pub async fn commit_journal(&self) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 7134c84ff..88661ce3b 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -881,8 +881,8 @@ where
/// updating the client table for each.
///
/// The backup does NOT send replies to clients, only the primary does
that.
- #[allow(clippy::cast_possible_truncation)]
- async fn commit_journal(&self) {
+ #[allow(clippy::cast_possible_truncation, clippy::missing_panics_doc)]
+ pub async fn commit_journal(&self) {
let consensus = self
.consensus()
.expect("commit_journal: consensus not initialized");
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index a77fd3742..a0c2174ce 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -19,10 +19,12 @@ mod router;
pub mod shards_table;
use consensus::{
- MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, PlaneKind,
Sequencer, VsrConsensus,
+ MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Pipeline,
Plane, PlaneKind,
+ Sequencer, VsrAction, VsrConsensus,
};
use iggy_binary_protocol::{
- GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader,
RequestHeader,
+ Command2, CommitHeader, DoViewChangeHeader, GenericHeader, Message,
MessageBag, PrepareHeader,
+ PrepareOkHeader, RequestHeader, StartViewChangeHeader, StartViewHeader,
};
use iggy_common::sharding::IggyNamespace;
use iggy_common::variadic;
@@ -214,6 +216,10 @@ where
Ok(MessageBag::Request(request)) => self.on_request(request).await,
Ok(MessageBag::Prepare(prepare)) =>
self.on_replicate(prepare).await,
Ok(MessageBag::PrepareOk(prepare_ok)) =>
self.on_ack(prepare_ok).await,
+ Ok(MessageBag::StartViewChange(msg)) =>
self.on_start_view_change(msg).await,
+ Ok(MessageBag::DoViewChange(msg)) =>
self.on_do_view_change(msg).await,
+ Ok(MessageBag::StartView(msg)) => self.on_start_view(msg).await,
+ Ok(MessageBag::Commit(ref msg)) => self.on_commit(msg).await,
Err(e) => {
tracing::warn!(shard = self.id, error = %e, "dropping message
with invalid command");
}
@@ -379,10 +385,218 @@ where
partitions.register_namespace_in_pipeline(namespace.inner());
}
- /// Tick the partitions consensus and dispatch any resulting actions.
+ /// Handle an incoming view change message by routing it to the correct
+ /// consensus group (metadata or partitions) based on the message
namespace.
+ #[allow(clippy::future_not_send)]
+ async fn on_start_view_change(&self, msg: Message<StartViewChangeHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ {
+ let header = *msg.header();
+ let planes = self.plane.inner();
+
+ if let Some(ref consensus) = planes.0.consensus
+ && consensus.namespace() == header.namespace
+ {
+ let actions =
consensus.handle_start_view_change(PlaneKind::Metadata, &header);
+ dispatch_vsr_actions(consensus, planes.0.journal.as_ref(),
&actions).await;
+ return;
+ }
+
+ if let Some(consensus) = planes.1.0.consensus()
+ && consensus.namespace() == header.namespace
+ {
+ let actions =
consensus.handle_start_view_change(PlaneKind::Partitions, &header);
+ dispatch_vsr_actions(consensus, planes.1.0.journal(),
&actions).await;
+ return;
+ }
+
+ tracing::warn!(
+ shard = self.id,
+ namespace = header.namespace,
+ view = header.view,
+ replica = header.replica,
+ "dropping StartViewChange: namespace matches neither metadata nor
partitions consensus"
+ );
+ }
+
+ #[allow(clippy::future_not_send)]
+ async fn on_do_view_change(&self, msg: Message<DoViewChangeHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
+ {
+ let header = *msg.header();
+ let planes = self.plane.inner();
+
+ if let Some(ref consensus) = planes.0.consensus
+ && consensus.namespace() == header.namespace
+ {
+ let actions = consensus.handle_do_view_change(PlaneKind::Metadata,
&header);
+ dispatch_vsr_actions(consensus, planes.0.journal.as_ref(),
&actions).await;
+ if actions
+ .iter()
+ .any(|a| matches!(a, VsrAction::CommitJournal))
+ {
+ planes.0.commit_journal().await;
+ }
+ return;
+ }
+
+ if let Some(consensus) = planes.1.0.consensus()
+ && consensus.namespace() == header.namespace
+ {
+ let actions =
consensus.handle_do_view_change(PlaneKind::Partitions, &header);
+ dispatch_vsr_actions(consensus, planes.1.0.journal(),
&actions).await;
+ if actions
+ .iter()
+ .any(|a| matches!(a, VsrAction::CommitJournal))
+ {
+ planes.1.0.commit_journal().await;
+ }
+ return;
+ }
+
+ tracing::warn!(
+ shard = self.id,
+ namespace = header.namespace,
+ view = header.view,
+ replica = header.replica,
+ "dropping DoViewChange: namespace matches neither metadata nor
partitions consensus"
+ );
+ }
+
+ #[allow(clippy::future_not_send)]
+ async fn on_start_view(&self, msg: Message<StartViewHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ {
+ let header = *msg.header();
+ let planes = self.plane.inner();
+
+ if let Some(ref consensus) = planes.0.consensus
+ && consensus.namespace() == header.namespace
+ {
+ let actions = consensus.handle_start_view(PlaneKind::Metadata,
&header);
+ dispatch_vsr_actions(consensus, planes.0.journal.as_ref(),
&actions).await;
+ return;
+ }
+
+ if let Some(consensus) = planes.1.0.consensus()
+ && consensus.namespace() == header.namespace
+ {
+ let actions = consensus.handle_start_view(PlaneKind::Partitions,
&header);
+ dispatch_vsr_actions(consensus, planes.1.0.journal(),
&actions).await;
+ return;
+ }
+
+ tracing::warn!(
+ shard = self.id,
+ namespace = header.namespace,
+ view = header.view,
+ replica = header.replica,
+ "dropping StartView: namespace matches neither metadata nor
partitions consensus"
+ );
+ }
+
+ /// Handle an incoming `Commit` (primary heartbeat) message.
///
- /// Currently handles `RetransmitPrepares` (primary retransmits
- /// uncommitted prepares when the prepare timeout fires).
+ /// Routes to the correct consensus by namespace. The backup advances
+ /// `commit_max`, resets its `NormalHeartbeat` timeout, and commits
+ /// any newly committable ops from the journal.
+ #[allow(clippy::future_not_send)]
+ async fn on_commit(&self, msg: &Message<CommitHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
+ {
+ let header = *msg.header();
+ let planes = self.plane.inner();
+
+ if let Some(ref consensus) = planes.0.consensus
+ && consensus.namespace() == header.namespace
+ {
+ if consensus.handle_commit(&header) {
+ planes.0.commit_journal().await;
+ }
+ return;
+ }
+
+ if let Some(consensus) = planes.1.0.consensus()
+ && consensus.namespace() == header.namespace
+ {
+ if consensus.handle_commit(&header) {
+ planes.1.0.commit_journal().await;
+ }
+ return;
+ }
+
+ tracing::warn!(
+ shard = self.id,
+ namespace = header.namespace,
+ view = header.view,
+ replica = header.replica,
+ "dropping Commit: namespace matches neither metadata nor
partitions consensus"
+ );
+ }
+
+ /// Tick the partitions consensus and dispatch any resulting actions.
#[allow(clippy::future_not_send)]
pub async fn tick_partitions(&self)
where
@@ -398,29 +612,253 @@ where
let Some(consensus) = partitions.consensus() else {
return;
};
- let Some(journal) = partitions.journal() else {
+
+ let current_op = consensus.sequencer().current_sequence();
+ let current_commit = consensus.commit_min();
+ let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+
+ dispatch_vsr_actions(consensus, partitions.journal(), &actions).await;
+ }
+
+ /// Tick the metadata consensus and dispatch any resulting actions.
+ #[allow(clippy::future_not_send)]
+ pub async fn tick_metadata(&self)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ {
+ let metadata = self.plane.metadata();
+ let Some(ref consensus) = metadata.consensus else {
return;
};
let current_op = consensus.sequencer().current_sequence();
let current_commit = consensus.commit_min();
- let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+ let actions = consensus.tick(PlaneKind::Metadata, current_op,
current_commit);
+
+ dispatch_vsr_actions(consensus, metadata.journal.as_ref(),
&actions).await;
+ }
+}
+
+/// Dispatch a list of `VsrAction`s by constructing the appropriate
+/// protocol messages and sending them via the consensus message bus.
+#[allow(
+ clippy::future_not_send,
+ clippy::too_many_lines,
+ clippy::cast_possible_truncation
+)]
+async fn dispatch_vsr_actions<B, P, J>(
+ consensus: &VsrConsensus<B, P>,
+ journal: Option<&J>,
+ actions: &[VsrAction],
+) where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Entry = consensus::PipelineEntry>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+{
+ use std::mem::size_of;
- for action in actions {
- if let consensus::VsrAction::RetransmitPrepares { targets } =
action {
+ let bus = consensus.message_bus();
+ let self_id = consensus.replica();
+ let cluster = consensus.cluster();
+ let replica_count = consensus.replica_count();
+
+ let send = |target: u8, msg: Message<GenericHeader>| async move {
+ if let Err(e) = bus.send_to_replica(target, msg).await {
+ tracing::debug!(replica = self_id, target, "bus send failed: {e}");
+ }
+ };
+
+ for action in actions {
+ match action {
+ VsrAction::SendStartViewChange { view, namespace } => {
+ let msg =
Message::<StartViewChangeHeader>::new(size_of::<StartViewChangeHeader>())
+ .transmute_header(|_, h: &mut StartViewChangeHeader| {
+ h.command = Command2::StartViewChange;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.namespace = *namespace;
+ h.size = size_of::<StartViewChangeHeader>() as u32;
+ });
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::SendDoViewChange {
+ view,
+ target,
+ log_view,
+ op,
+ commit,
+ namespace,
+ } => {
+ let msg =
Message::<DoViewChangeHeader>::new(size_of::<DoViewChangeHeader>())
+ .transmute_header(|_, h: &mut DoViewChangeHeader| {
+ h.command = Command2::DoViewChange;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.log_view = *log_view;
+ h.op = *op;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.size = size_of::<DoViewChangeHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ VsrAction::SendStartView {
+ view,
+ op,
+ commit,
+ namespace,
+ } => {
+ let msg =
Message::<StartViewHeader>::new(size_of::<StartViewHeader>())
+ .transmute_header(|_, h: &mut StartViewHeader| {
+ h.command = Command2::StartView;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = *op;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.size = size_of::<StartViewHeader>() as u32;
+ });
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::SendPrepareOk {
+ view,
+ from_op,
+ to_op,
+ target,
+ namespace,
+ } => {
+ let Some(journal) = journal else {
+ continue;
+ };
+ for op in *from_op..=*to_op {
+ let Some(prepare_header) = journal.handle().header(op as
usize) else {
+ continue;
+ };
+ let prepare_header = *prepare_header;
+ let msg =
Message::<PrepareOkHeader>::new(size_of::<PrepareOkHeader>())
+ .transmute_header(|_, h: &mut PrepareOkHeader| {
+ h.command = Command2::PrepareOk;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = op;
+ h.commit = consensus.commit_max();
+ h.timestamp = prepare_header.timestamp;
+ h.parent = prepare_header.parent;
+ h.prepare_checksum = prepare_header.checksum;
+ h.request = prepare_header.request;
+ h.operation = prepare_header.operation;
+ h.namespace = *namespace;
+ h.size = size_of::<PrepareOkHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ }
+ VsrAction::RetransmitPrepares { targets } => {
+ let Some(journal) = journal else {
+ continue;
+ };
for (header, replicas) in targets {
- let Some(prepare) = journal.handle().entry(&header).await
else {
+ let Some(prepare) = journal.handle().entry(header).await
else {
continue;
};
for replica in replicas {
- let _ = consensus
- .message_bus()
- .send_to_replica(replica,
prepare.clone().into_generic())
- .await;
+ send(*replica, prepare.clone().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::RebuildPipeline { from_op, to_op } => {
+ let Some(journal) = journal else {
+ continue;
+ };
+ // Collect headers before borrowing the pipeline to avoid
+ // holding borrow_mut() across journal reads.
+ let mut gap_at = None;
+ let entries: Vec<_> = (*from_op..=*to_op)
+ .map_while(|op| {
+ let Some(header) = journal.handle().header(op as
usize) else {
+ gap_at = Some(op);
+ return None;
+ };
+ let mut entry = consensus::PipelineEntry::new(*header);
+ entry.add_ack(self_id);
+ Some(entry)
+ })
+ .collect();
+ if let Some(missing_op) = gap_at {
+ // Journal repair is not yet implemented.Truncate the
sequencer
+ // to the last op we could rebuild so the next client
+ // prepare chains correctly. Ops above the
+ // gap are lost until journal repair is added.
+ let rebuilt_up_to = missing_op.saturating_sub(1);
+ tracing::warn!(
+ replica = self_id,
+ missing_op,
+ range_start = from_op,
+ range_end = to_op,
+ rebuilt = entries.len(),
+ "RebuildPipeline: journal gap at op {missing_op}, \
+ truncating sequencer from {to_op} to {rebuilt_up_to} \
+ ({}/{} ops rebuilt)",
+ entries.len(),
+ to_op - from_op + 1,
+ );
+ consensus.sequencer().set_sequence(rebuilt_up_to);
+ }
+ let mut pipeline = consensus.pipeline().borrow_mut();
+ for entry in entries {
+ pipeline.push(entry);
+ }
+ }
+ // Handled by the caller (shard view change handlers) since it
+ // requires access to the plane's commit_journal method.
+ VsrAction::CommitJournal => {}
+ VsrAction::SendCommit {
+ view,
+ commit,
+ namespace,
+ timestamp_monotonic,
+ } => {
+ let msg =
Message::<CommitHeader>::new(size_of::<CommitHeader>()).transmute_header(
+ |_, h: &mut CommitHeader| {
+ h.command = Command2::Commit;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.timestamp_monotonic = *timestamp_monotonic;
+ h.size = size_of::<CommitHeader>() as u32;
+ },
+ );
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
}
}
}
- // TODO: Dispatch other VsrActions (view change messages, etc.)
}
}
}
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 177cddaa0..70e23e40e 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -18,7 +18,9 @@
use crate::shards_table::ShardsTable;
use crate::{IggyShard, Receiver, ShardFrame};
use futures::FutureExt;
-use iggy_binary_protocol::{ConsensusError, GenericHeader, Message, MessageBag,
PrepareHeader};
+use iggy_binary_protocol::{
+ ConsensusError, ConsensusHeader, GenericHeader, Message, MessageBag,
PrepareHeader,
+};
use iggy_common::sharding::IggyNamespace;
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
@@ -63,6 +65,22 @@ where
let h = *p.header();
(h.operation, h.namespace, p.into_generic())
}
+ MessageBag::StartViewChange(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
+ MessageBag::DoViewChange(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
+ MessageBag::StartView(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
+ MessageBag::Commit(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
};
let namespace = IggyNamespace::from_raw(namespace);
let target = if operation.is_metadata() {
@@ -79,9 +97,26 @@ where
0
})
} else {
+ // TODO: View change messages (StartViewChange, DoViewChange,
StartView) and
+ // Commit heartbeats return Operation::Reserved, so they always
land here and
+ // route to shard 0. This is correct only in single-shard
deployments. In
+ // multi-shard, partition-plane messages must reach the shard
owning that
+ // consensus group. Fixing this requires routing by Command2 +
consensus
+ // namespace (a u64) rather than by Operation + IggyNamespace,
since these
+ // headers don't carry an IggyNamespace.
0
};
- let _ = self.senders[target as
usize].send(ShardFrame::fire_and_forget(generic));
+ if self.senders[target as usize]
+ .send(ShardFrame::fire_and_forget(generic))
+ .is_err()
+ {
+ tracing::warn!(
+ shard = self.id,
+ target,
+ ?operation,
+ "dispatch: shard channel full or closed, message dropped"
+ );
+ }
}
/// Dispatch a message and return a receiver that resolves when the target
@@ -107,6 +142,22 @@ where
let h = *p.header();
(h.operation, h.namespace, p.into_generic())
}
+ MessageBag::StartViewChange(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
+ MessageBag::DoViewChange(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
+ MessageBag::StartView(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
+ MessageBag::Commit(m) => {
+ let h = *m.header();
+ (h.operation(), h.namespace, m.into_generic())
+ }
};
let namespace = IggyNamespace::from_raw(namespace);
@@ -131,11 +182,19 @@ where
0
})
} else {
+ // TODO: Same view-change and Commit routing issue as dispatch()
above.
0
};
// Create a frame and send it to the target shard.
let (frame, rx) = ShardFrame::<R>::with_response(generic);
- let _ = self.senders[target as usize].send(frame);
+ if self.senders[target as usize].send(frame).is_err() {
+ tracing::warn!(
+ shard = self.id,
+ target,
+ ?operation,
+ "dispatch_request: shard channel full or closed, message
dropped"
+ );
+ }
Ok(rx)
}
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index d9d9029a3..193a5697d 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -109,9 +109,11 @@ impl Simulator {
///
/// Returns all client replies delivered during this tick.
///
- /// The tick has three phases that never borrow replicas and network
+ /// The tick has four phases that never borrow replicas and network
/// simultaneously:
///
+ /// 0. **Consensus tick**: advance consensus timeouts on live replicas,
+ /// dispatching any resulting actions (view change messages,
retransmissions).
/// 1. **Deliver**: `network.step()` returns ready packets; each is
/// dispatched to its target replica (or collected as a client reply).
/// 2. **Drain**: each live replica's outbox is drained and fed into
@@ -126,6 +128,14 @@ impl Simulator {
pub fn step(&mut self) -> Vec<Message<ReplyHeader>> {
let mut client_replies = Vec::new();
+ // Phase 0: Advance consensus timeouts (skip crashed replicas).
+ for (i, replica) in self.replicas.iter().enumerate() {
+ if !self.crashed.contains(&(i as u8)) {
+ futures::executor::block_on(replica.tick_partitions());
+ futures::executor::block_on(replica.tick_metadata());
+ }
+ }
+
// Phase 1: Deliver ready packets from the network.
let packets = self.network.step();
for packet in &packets {
@@ -226,14 +236,17 @@ impl Simulator {
self.crashed.contains(&replica_index)
}
- /// Advance consensus timeouts and dispatch actions on every replica.
+ /// Advance consensus timeouts and dispatch actions on every live replica.
///
- /// Call this once per simulation tick. Handles prepare retransmission
- /// (and eventually view-change messages) via the timeout system.
- #[allow(clippy::future_not_send)]
+ /// Note: `step()` already calls this internally. This method is provided
+ /// for callers that need to tick consensus without a full step cycle.
+ #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
pub async fn tick(&self) {
- for replica in &self.replicas {
- replica.tick_partitions().await;
+ for (i, replica) in self.replicas.iter().enumerate() {
+ if !self.crashed.contains(&(i as u8)) {
+ replica.tick_partitions().await;
+ replica.tick_metadata().await;
+ }
}
}
@@ -291,3 +304,164 @@ impl Simulator {
// 2. Send a request to ns_b, step until ns_b reply arrives.
// 3. Assert ns_b committed while ns_a pipeline is still full.
// Requires namespace-aware stepping (filter bus by namespace) or two-phase
delivery.
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::client::SimClient;
+ use consensus::Status;
+ use iggy_common::sharding::IggyNamespace;
+
+ /// After crashing the primary in a 5-node cluster, the 4 remaining
+ /// backups should detect the failure via heartbeat timeout and elect
+ /// a new primary through the view change protocol.
+ #[test]
+ fn view_change_after_primary_crash() {
+ iggy_common::MemoryPool::init_pool(&iggy_common::MemoryPoolConfigOther
{
+ enabled: false,
+ size: iggy_common::IggyByteSize::from(0u64),
+ bucket_capacity: 1,
+ });
+
+ let replica_count: u8 = 5;
+ let client_id: u128 = 1;
+ let network_opts = packet::PacketSimulatorOptions {
+ node_count: replica_count,
+ client_count: 1,
+ ..packet::PacketSimulatorOptions::default()
+ };
+
+ let mut sim = Simulator::new(
+ replica_count as usize,
+ std::iter::once(client_id),
+ network_opts,
+ );
+ let client = SimClient::new(client_id);
+ let ns = IggyNamespace::new(1, 1, 0);
+ sim.init_partition(ns);
+
+ // Send a message through the primary (replica 0) to verify normal
operation.
+ let msg = client.send_messages(ns, &[b"before crash"]);
+ sim.submit_request(client_id, 0, msg.into_generic());
+ let mut got_reply = false;
+ for _ in 0..100 {
+ if !sim.step().is_empty() {
+ got_reply = true;
+ break;
+ }
+ }
+ assert!(got_reply, "expected reply before crash");
+
+ // Crash the primary.
+ sim.replica_crash(0);
+
+ // Run enough steps for the heartbeat timeout to fire
+ // and the view change to complete across 4 surviving replicas.
+ for _ in 0..800 {
+ sim.step();
+ }
+
+ // Verify that a new primary was elected in a higher view.
+ let mut new_primary_found = false;
+ for replica_idx in 1..replica_count {
+ let replica = &sim.replicas[replica_idx as usize];
+ let partitions = replica.plane.partitions();
+ let consensus = partitions.consensus().unwrap();
+ if consensus.view() > 0
+ && consensus.status() == Status::Normal
+ && consensus.is_primary()
+ {
+ new_primary_found = true;
+ }
+ }
+ assert!(
+ new_primary_found,
+ "expected a new primary after crashing replica 0"
+ );
+
+ // Submit a request to the new primary and verify it commits.
+ let c = sim.replicas[1].plane.partitions().consensus().unwrap();
+ let new_primary_idx = c.primary_index(c.view());
+
+ let msg2 = client.send_messages(ns, &[b"after view change"]);
+ sim.submit_request(client_id, new_primary_idx, msg2.into_generic());
+ let mut got_reply_after = false;
+ for _ in 0..200 {
+ if !sim.step().is_empty() {
+ got_reply_after = true;
+ break;
+ }
+ }
+ assert!(
+ got_reply_after,
+ "expected reply from new primary after view change"
+ );
+ }
+
+ /// Regression: a behind backup (`commit_min < commit_max`) becoming the
new
+ /// primary must not panic during the `CommitMessage` heartbeat timeout.
+ /// Previously, `handle_commit_message_timeout` asserted `commit_min ==
commit_max`,
+ /// which fails when the new primary hasn't caught up yet.
+ #[test]
+ fn view_change_behind_backup_becomes_primary() {
+ iggy_common::MemoryPool::init_pool(&iggy_common::MemoryPoolConfigOther
{
+ enabled: false,
+ size: iggy_common::IggyByteSize::from(0u64),
+ bucket_capacity: 1,
+ });
+
+ let replica_count: u8 = 3;
+ let client_id: u128 = 1;
+ let network_opts = packet::PacketSimulatorOptions {
+ node_count: replica_count,
+ client_count: 1,
+ ..packet::PacketSimulatorOptions::default()
+ };
+
+ let mut sim = Simulator::new(
+ replica_count as usize,
+ std::iter::once(client_id),
+ network_opts,
+ );
+ let client = SimClient::new(client_id);
+ let ns = IggyNamespace::new(1, 1, 0);
+ sim.init_partition(ns);
+
+ // Send several messages so the primary commits ahead of backups.
+ // Backups receive prepares but may not have committed all of them
+ // (commit_max lags behind the primary's commit_min because the
+ // primary's commit point is only propagated via subsequent Prepare
+ // headers or Commit heartbeats).
+ for i in 0..3 {
+ let msg = client.send_messages(ns,
&[format!("msg-{i}").as_bytes()]);
+ sim.submit_request(client_id, 0, msg.into_generic());
+ // Only a few steps — enough for replication but not for the
+ // backup to fully learn the commit point.
+ for _ in 0..10 {
+ sim.step();
+ }
+ }
+
+ // Crash the primary immediately. Backups may have commit_min <
commit_max.
+ sim.replica_crash(0);
+
+ // Run view change. This must not panic in
handle_commit_message_timeout.
+ for _ in 0..800 {
+ sim.step();
+ }
+
+ // Verify a new primary was elected and is functional.
+ let mut new_primary_found = false;
+ for idx in 1..replica_count {
+ let c = sim.replicas[idx as usize]
+ .plane
+ .partitions()
+ .consensus()
+ .unwrap();
+ if c.view() > 0 && c.status() == Status::Normal && c.is_primary() {
+ new_primary_found = true;
+ }
+ }
+ assert!(new_primary_found, "expected a new primary");
+ }
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index c9373d3ef..d630c4a02 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -31,6 +31,12 @@ use std::sync::Arc;
// TODO: Make configurable
const CLUSTER_ID: u128 = 1;
+/// Consensus-level namespace for the partitions plane.
+///
+/// Must differ from the metadata namespace (0) so that view change messages
+/// can be routed to the correct consensus group.
+pub const PARTITIONS_CONSENSUS_NAMESPACE: u64 = 1;
+
// For now there is only one shard per replica,
// we will add support for multiple shards per replica in the future.
pub type Replica = shard::IggyShard<
@@ -76,13 +82,11 @@ pub fn new_replica(id: u8, name: String, bus:
&Arc<SimOutbox>, replica_count: u8
let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)),
partitions_config);
partitions.set_journal(SimJournal::<MemStorage>::default());
- // TODO: namespace=0 collides with metadata consensus. Safe for now
because the simulator
- // routes by Operation type, but a shared view change bus would produce
namespace collisions.
let partition_consensus = VsrConsensus::new(
CLUSTER_ID,
id,
replica_count,
- 0,
+ PARTITIONS_CONSENSUS_NAMESPACE,
SharedSimOutbox(Arc::clone(bus)),
NamespacedPipeline::new(),
);