This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f0f11c6 feat(consensus): wire up view change protocol in simulator 
(#3092)
27f0f11c6 is described below

commit 27f0f11c65f5289bda2f67d9cf95526a87121aa3
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Apr 10 16:38:24 2026 +0530

    feat(consensus): wire up view change protocol in simulator (#3092)
    
    - Dispatch all VsrAction variants (SVC/DVC/SV/PrepareOk/Commit from tick
    and incoming message handlers.
    - Add CommitMessage heartbeat: primary periodically sends commit point
    to backups.
    - Integrate consensus tick into Simulator::step().
    - Add RebuildPipeline action with primary self-ack so quorum be reached
    after view change.
---
 core/binary_protocol/src/consensus/message.rs |  38 ++-
 core/consensus/src/impls.rs                   | 143 +++++++-
 core/consensus/src/observability.rs           |  27 ++
 core/metadata/src/impls/metadata.rs           |   4 +-
 core/partitions/src/iggy_partitions.rs        |   4 +-
 core/shard/src/lib.rs                         | 468 +++++++++++++++++++++++++-
 core/shard/src/router.rs                      |  65 +++-
 core/simulator/src/lib.rs                     | 188 ++++++++++-
 core/simulator/src/replica.rs                 |  10 +-
 9 files changed, 911 insertions(+), 36 deletions(-)

diff --git a/core/binary_protocol/src/consensus/message.rs 
b/core/binary_protocol/src/consensus/message.rs
index 50ae07ac5..db85a7229 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use crate::consensus::{
-    self, Command2, ConsensusError, ConsensusHeader, GenericHeader, 
PrepareHeader, PrepareOkHeader,
-    RequestHeader,
+    self, Command2, CommitHeader, ConsensusError, ConsensusHeader, 
DoViewChangeHeader,
+    GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader, 
StartViewChangeHeader,
+    StartViewHeader,
 };
 use iobuf::{Frozen, Owned};
 use smallvec::SmallVec;
@@ -458,6 +459,10 @@ pub enum MessageBag {
     Request(Message<RequestHeader>),
     Prepare(Message<PrepareHeader>),
     PrepareOk(Message<PrepareOkHeader>),
+    StartViewChange(Message<StartViewChangeHeader>),
+    DoViewChange(Message<DoViewChangeHeader>),
+    StartView(Message<StartViewHeader>),
+    Commit(Message<CommitHeader>),
 }
 
 impl MessageBag {
@@ -467,6 +472,10 @@ impl MessageBag {
             Self::Request(message) => message.header().command,
             Self::Prepare(message) => message.header().command,
             Self::PrepareOk(message) => message.header().command,
+            Self::StartViewChange(message) => message.header().command,
+            Self::DoViewChange(message) => message.header().command,
+            Self::StartView(message) => message.header().command,
+            Self::Commit(message) => message.header().command,
         }
     }
 
@@ -476,6 +485,10 @@ impl MessageBag {
             Self::Request(message) => message.header().size(),
             Self::Prepare(message) => message.header().size(),
             Self::PrepareOk(message) => message.header().size(),
+            Self::StartViewChange(message) => message.header().size(),
+            Self::DoViewChange(message) => message.header().size(),
+            Self::StartView(message) => message.header().size(),
+            Self::Commit(message) => message.header().size(),
         }
     }
 
@@ -485,6 +498,10 @@ impl MessageBag {
             Self::Request(message) => message.header().operation,
             Self::Prepare(message) => message.header().operation,
             Self::PrepareOk(message) => message.header().operation,
+            Self::StartViewChange(message) => message.header().operation(),
+            Self::DoViewChange(message) => message.header().operation(),
+            Self::StartView(message) => message.header().operation(),
+            Self::Commit(message) => message.header().operation(),
         }
     }
 }
@@ -512,6 +529,23 @@ where
                 let msg = unsafe { 
Message::<PrepareOkHeader>::from_backing_unchecked(backing) };
                 Ok(Self::PrepareOk(msg))
             }
+            Command2::StartViewChange => {
+                let msg =
+                    unsafe { 
Message::<StartViewChangeHeader>::from_backing_unchecked(backing) };
+                Ok(Self::StartViewChange(msg))
+            }
+            Command2::DoViewChange => {
+                let msg = unsafe { 
Message::<DoViewChangeHeader>::from_backing_unchecked(backing) };
+                Ok(Self::DoViewChange(msg))
+            }
+            Command2::StartView => {
+                let msg = unsafe { 
Message::<StartViewHeader>::from_backing_unchecked(backing) };
+                Ok(Self::StartView(msg))
+            }
+            Command2::Commit => {
+                let msg = unsafe { 
Message::<CommitHeader>::from_backing_unchecked(backing) };
+                Ok(Self::Commit(msg))
+            }
             other => Err(ConsensusError::InvalidCommand {
                 expected: Command2::Reserved,
                 found: other,
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 55101854c..d0932afa0 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -448,6 +448,26 @@ pub enum VsrAction {
     RetransmitPrepares {
         targets: Vec<(PrepareHeader, Vec<u8>)>,
     },
+    /// Rebuild the pipeline from the journal after a view change.
+    ///
+    /// The new primary must re-populate its pipeline with uncommitted ops
+    /// from the WAL so that incoming `PrepareOk` messages can be matched
+    /// and commits can proceed.
+    RebuildPipeline { from_op: u64, to_op: u64 },
+    /// Catch up `commit_min` to `commit_max` by applying committed ops from 
the
+    /// journal. Emitted during view change completion so the new primary
+    /// is fully caught up before accepting new requests.
+    CommitJournal,
+    /// Primary heartbeat: send current commit point to all backups.
+    ///
+    /// Emitted when the `CommitMessage` timeout fires. Prevents backups
+    /// from starting a view change during idle periods.
+    SendCommit {
+        view: u32,
+        commit: u64,
+        namespace: u64,
+        timestamp_monotonic: u64,
+    },
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -530,6 +550,10 @@ where
 
     timeouts: RefCell<TimeoutManager>,
 
+    /// Monotonic timestamp from the most recent accepted commit heartbeat.
+    /// Old/replayed commit messages with a lower timestamp are ignored.
+    heartbeat_timestamp: Cell<u64>,
+
     /// VSR client-table for duplicate detection and reply caching.
     client_table: RefCell<ClientTable>,
 }
@@ -577,13 +601,20 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             sent_own_start_view_change: Cell::new(false),
             sent_own_do_view_change: Cell::new(false),
             timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
+            heartbeat_timestamp: Cell::new(0),
             client_table: RefCell::new(ClientTable::new(CLIENTS_TABLE_MAX)),
         }
     }
 
-    // TODO: More init logic.
     pub fn init(&self) {
         self.status.set(Status::Normal);
+        let mut timeouts = self.timeouts.borrow_mut();
+        if self.is_primary() {
+            timeouts.start(TimeoutKind::Prepare);
+            timeouts.start(TimeoutKind::CommitMessage);
+        } else {
+            timeouts.start(TimeoutKind::NormalHeartbeat);
+        }
     }
 
     #[must_use]
@@ -817,6 +848,12 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts = self.timeouts.borrow_mut();
         }
 
+        if timeouts.fired(TimeoutKind::CommitMessage) {
+            drop(timeouts);
+            actions.extend(self.handle_commit_message_timeout());
+            timeouts = self.timeouts.borrow_mut();
+        }
+
         if timeouts.fired(TimeoutKind::ViewChangeStatus) {
             drop(timeouts);
             actions.extend(self.handle_view_change_status_timeout(plane));
@@ -1043,6 +1080,30 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         vec![VsrAction::RetransmitPrepares { targets }]
     }
 
+    /// Primary heartbeat: send commit point to all backups so they know
+    /// the primary is alive and can advance their own `commit_max`.
+    fn handle_commit_message_timeout(&self) -> Vec<VsrAction> {
+        if !self.is_primary() || self.status.get() != Status::Normal {
+            return Vec::new();
+        }
+
+        self.timeouts.borrow_mut().reset(TimeoutKind::CommitMessage);
+
+        // Don't advertise a commit point we haven't locally executed yet.
+        // After view change the new primary may have commit_min < commit_max
+        // until commit_journal catches up. Send commit_min (what we've
+        // actually applied) so backups don't advance past us.
+        let ts = self.heartbeat_timestamp.get() + 1;
+        self.heartbeat_timestamp.set(ts);
+
+        vec![VsrAction::SendCommit {
+            view: self.view.get(),
+            commit: self.commit_min.get(),
+            namespace: self.namespace,
+            timestamp_monotonic: ts,
+        }]
+    }
+
     /// Handle a received `StartViewChange` message.
     ///
     /// "When replica i receives STARTVIEWCHANGE messages for its view-number
@@ -1397,6 +1458,56 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }
     }
 
+    /// Handle a `Commit` (heartbeat) message from the primary.
+    ///
+    /// Advances `commit_max` and resets the backup's `NormalHeartbeat` timeout
+    /// so it doesn't start a spurious view change. Returns `true` if
+    /// `commit_max` advanced, signalling the caller to run `commit_journal`.
+    ///
+    /// Only accepts heartbeats with a strictly newer monotonic timestamp
+    /// to prevent old/replayed messages from suppressing view changes.
+    ///
+    /// # Panics
+    /// If `header.namespace` does not match this replica's namespace.
+    pub fn handle_commit(&self, header: &iggy_binary_protocol::CommitHeader) 
-> bool {
+        assert_eq!(
+            header.namespace, self.namespace,
+            "Commit routed to wrong group"
+        );
+
+        if self.is_primary() {
+            return false;
+        }
+
+        if self.status.get() != Status::Normal {
+            return false;
+        }
+
+        if header.view != self.view.get() {
+            return false;
+        }
+
+        // TODO: Once connection-level peer verification is added promote
+        // this to an assert — the network layer would guarantee the sender
+        // matches header.replica.
+        if header.replica != self.primary_index(header.view) {
+            return false;
+        }
+
+        // Only accept heartbeats with a strictly newer timestamp to prevent
+        // old/replayed commit messages from resetting the timeout.
+        if self.heartbeat_timestamp.get() < header.timestamp_monotonic {
+            self.heartbeat_timestamp.set(header.timestamp_monotonic);
+            self.timeouts
+                .borrow_mut()
+                .reset(TimeoutKind::NormalHeartbeat);
+        }
+
+        let old_commit_max = self.commit_max.get();
+        self.advance_commit_max(header.commit);
+        self.commit_max.get() > old_commit_max
+    }
+
     /// Complete view change as the new primary after collecting DVC quorum.
     ///
     /// # Client-table maintenance
@@ -1439,6 +1550,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts.stop(TimeoutKind::DoViewChangeMessage);
             timeouts.stop(TimeoutKind::StartViewChangeMessage);
             timeouts.start(TimeoutKind::CommitMessage);
+            // If there are uncommitted ops in the rebuilt pipeline, start the
+            // Prepare timeout so that lost PrepareOks trigger retransmission.
+            if max_commit < new_op {
+                timeouts.start(TimeoutKind::Prepare);
+            }
         }
 
         let state = ReplicaLogContext::from_consensus(self, plane);
@@ -1458,7 +1574,30 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 &action,
             ),
         );
-        vec![action]
+
+        let mut actions = vec![action];
+        // Catch up commit_min to commit_max before rebuilding the pipeline.
+        // Without this, a behind backup (commit_min < max_commit) that becomes
+        // primary would have unapplied committed ops.
+        actions.push(VsrAction::CommitJournal);
+        // The new primary must rebuild its pipeline from the journal so that
+        // incoming PrepareOk messages can be matched and commits can proceed.
+        if max_commit < new_op {
+            assert!(
+                (new_op - max_commit) <= PIPELINE_PREPARE_QUEUE_MAX as u64,
+                "view change: uncommitted range {}..={} ({} ops) exceeds 
pipeline capacity ({}); \
+                 DVC winner claims more in-flight ops than the pipeline can 
hold",
+                max_commit + 1,
+                new_op,
+                new_op - max_commit,
+                PIPELINE_PREPARE_QUEUE_MAX,
+            );
+            actions.push(VsrAction::RebuildPipeline {
+                from_op: max_commit + 1,
+                to_op: new_op,
+            });
+        }
+        actions
     }
 
     /// Handle a `PrepareOk` message from a replica.
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
index c4eaed89c..190e0af22 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -115,6 +115,9 @@ pub enum ControlActionKind {
     SendStartView,
     SendPrepareOk,
     SendPrepare,
+    RebuildPipeline,
+    CommitJournal,
+    SendCommit,
 }
 
 impl ControlActionKind {
@@ -126,6 +129,9 @@ impl ControlActionKind {
             Self::SendStartView => "send_start_view",
             Self::SendPrepareOk => "send_prepare_ok",
             Self::SendPrepare => "send_prepare",
+            Self::RebuildPipeline => "rebuild_pipeline",
+            Self::CommitJournal => "commit_journal",
+            Self::SendCommit => "send_commit",
         }
     }
 }
@@ -340,6 +346,27 @@ impl ControlActionLogEvent {
                 op: None,
                 commit: None,
             },
+            VsrAction::RebuildPipeline { from_op, to_op, .. } => Self {
+                replica,
+                action: ControlActionKind::RebuildPipeline,
+                target_replica: None,
+                op: Some(from_op),
+                commit: Some(to_op),
+            },
+            VsrAction::CommitJournal => Self {
+                replica,
+                action: ControlActionKind::CommitJournal,
+                target_replica: None,
+                op: None,
+                commit: None,
+            },
+            VsrAction::SendCommit { commit, .. } => Self {
+                replica,
+                action: ControlActionKind::SendCommit,
+                target_replica: None,
+                op: None,
+                commit: Some(commit),
+            },
         }
     }
 }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index e0f65fc0d..79b3d98e1 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -646,9 +646,9 @@ where
     /// and updating the client table for each.
     ///
     /// The backup does NOT send replies to clients, only the primary does 
that.
-    #[allow(clippy::cast_possible_truncation)]
+    #[allow(clippy::cast_possible_truncation, clippy::missing_panics_doc)]
     #[allow(clippy::future_not_send)]
-    async fn commit_journal(&self) {
+    pub async fn commit_journal(&self) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
 
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 7134c84ff..88661ce3b 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -881,8 +881,8 @@ where
     /// updating the client table for each.
     ///
     /// The backup does NOT send replies to clients, only the primary does 
that.
-    #[allow(clippy::cast_possible_truncation)]
-    async fn commit_journal(&self) {
+    #[allow(clippy::cast_possible_truncation, clippy::missing_panics_doc)]
+    pub async fn commit_journal(&self) {
         let consensus = self
             .consensus()
             .expect("commit_journal: consensus not initialized");
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index a77fd3742..a0c2174ce 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -19,10 +19,12 @@ mod router;
 pub mod shards_table;
 
 use consensus::{
-    MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, PlaneKind, 
Sequencer, VsrConsensus,
+    MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Pipeline, 
Plane, PlaneKind,
+    Sequencer, VsrAction, VsrConsensus,
 };
 use iggy_binary_protocol::{
-    GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader, 
RequestHeader,
+    Command2, CommitHeader, DoViewChangeHeader, GenericHeader, Message, 
MessageBag, PrepareHeader,
+    PrepareOkHeader, RequestHeader, StartViewChangeHeader, StartViewHeader,
 };
 use iggy_common::sharding::IggyNamespace;
 use iggy_common::variadic;
@@ -214,6 +216,10 @@ where
             Ok(MessageBag::Request(request)) => self.on_request(request).await,
             Ok(MessageBag::Prepare(prepare)) => 
self.on_replicate(prepare).await,
             Ok(MessageBag::PrepareOk(prepare_ok)) => 
self.on_ack(prepare_ok).await,
+            Ok(MessageBag::StartViewChange(msg)) => 
self.on_start_view_change(msg).await,
+            Ok(MessageBag::DoViewChange(msg)) => 
self.on_do_view_change(msg).await,
+            Ok(MessageBag::StartView(msg)) => self.on_start_view(msg).await,
+            Ok(MessageBag::Commit(ref msg)) => self.on_commit(msg).await,
             Err(e) => {
                 tracing::warn!(shard = self.id, error = %e, "dropping message 
with invalid command");
             }
@@ -379,10 +385,218 @@ where
         partitions.register_namespace_in_pipeline(namespace.inner());
     }
 
-    /// Tick the partitions consensus and dispatch any resulting actions.
+    /// Handle an incoming view change message by routing it to the correct
+    /// consensus group (metadata or partitions) based on the message 
namespace.
+    #[allow(clippy::future_not_send)]
+    async fn on_start_view_change(&self, msg: Message<StartViewChangeHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+    {
+        let header = *msg.header();
+        let planes = self.plane.inner();
+
+        if let Some(ref consensus) = planes.0.consensus
+            && consensus.namespace() == header.namespace
+        {
+            let actions = 
consensus.handle_start_view_change(PlaneKind::Metadata, &header);
+            dispatch_vsr_actions(consensus, planes.0.journal.as_ref(), 
&actions).await;
+            return;
+        }
+
+        if let Some(consensus) = planes.1.0.consensus()
+            && consensus.namespace() == header.namespace
+        {
+            let actions = 
consensus.handle_start_view_change(PlaneKind::Partitions, &header);
+            dispatch_vsr_actions(consensus, planes.1.0.journal(), 
&actions).await;
+            return;
+        }
+
+        tracing::warn!(
+            shard = self.id,
+            namespace = header.namespace,
+            view = header.view,
+            replica = header.replica,
+            "dropping StartViewChange: namespace matches neither metadata nor 
partitions consensus"
+        );
+    }
+
+    #[allow(clippy::future_not_send)]
+    async fn on_do_view_change(&self, msg: Message<DoViewChangeHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
+    {
+        let header = *msg.header();
+        let planes = self.plane.inner();
+
+        if let Some(ref consensus) = planes.0.consensus
+            && consensus.namespace() == header.namespace
+        {
+            let actions = consensus.handle_do_view_change(PlaneKind::Metadata, 
&header);
+            dispatch_vsr_actions(consensus, planes.0.journal.as_ref(), 
&actions).await;
+            if actions
+                .iter()
+                .any(|a| matches!(a, VsrAction::CommitJournal))
+            {
+                planes.0.commit_journal().await;
+            }
+            return;
+        }
+
+        if let Some(consensus) = planes.1.0.consensus()
+            && consensus.namespace() == header.namespace
+        {
+            let actions = 
consensus.handle_do_view_change(PlaneKind::Partitions, &header);
+            dispatch_vsr_actions(consensus, planes.1.0.journal(), 
&actions).await;
+            if actions
+                .iter()
+                .any(|a| matches!(a, VsrAction::CommitJournal))
+            {
+                planes.1.0.commit_journal().await;
+            }
+            return;
+        }
+
+        tracing::warn!(
+            shard = self.id,
+            namespace = header.namespace,
+            view = header.view,
+            replica = header.replica,
+            "dropping DoViewChange: namespace matches neither metadata nor 
partitions consensus"
+        );
+    }
+
+    #[allow(clippy::future_not_send)]
+    async fn on_start_view(&self, msg: Message<StartViewHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+    {
+        let header = *msg.header();
+        let planes = self.plane.inner();
+
+        if let Some(ref consensus) = planes.0.consensus
+            && consensus.namespace() == header.namespace
+        {
+            let actions = consensus.handle_start_view(PlaneKind::Metadata, 
&header);
+            dispatch_vsr_actions(consensus, planes.0.journal.as_ref(), 
&actions).await;
+            return;
+        }
+
+        if let Some(consensus) = planes.1.0.consensus()
+            && consensus.namespace() == header.namespace
+        {
+            let actions = consensus.handle_start_view(PlaneKind::Partitions, 
&header);
+            dispatch_vsr_actions(consensus, planes.1.0.journal(), 
&actions).await;
+            return;
+        }
+
+        tracing::warn!(
+            shard = self.id,
+            namespace = header.namespace,
+            view = header.view,
+            replica = header.replica,
+            "dropping StartView: namespace matches neither metadata nor 
partitions consensus"
+        );
+    }
+
+    /// Handle an incoming `Commit` (primary heartbeat) message.
     ///
-    /// Currently handles `RetransmitPrepares` (primary retransmits
-    /// uncommitted prepares when the prepare timeout fires).
+    /// Routes to the correct consensus by namespace. The backup advances
+    /// `commit_max`, resets its `NormalHeartbeat` timeout, and commits
+    /// any newly committable ops from the journal.
+    #[allow(clippy::future_not_send)]
+    async fn on_commit(&self, msg: &Message<CommitHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
+    {
+        let header = *msg.header();
+        let planes = self.plane.inner();
+
+        if let Some(ref consensus) = planes.0.consensus
+            && consensus.namespace() == header.namespace
+        {
+            if consensus.handle_commit(&header) {
+                planes.0.commit_journal().await;
+            }
+            return;
+        }
+
+        if let Some(consensus) = planes.1.0.consensus()
+            && consensus.namespace() == header.namespace
+        {
+            if consensus.handle_commit(&header) {
+                planes.1.0.commit_journal().await;
+            }
+            return;
+        }
+
+        tracing::warn!(
+            shard = self.id,
+            namespace = header.namespace,
+            view = header.view,
+            replica = header.replica,
+            "dropping Commit: namespace matches neither metadata nor 
partitions consensus"
+        );
+    }
+
+    /// Tick the partitions consensus and dispatch any resulting actions.
     #[allow(clippy::future_not_send)]
     pub async fn tick_partitions(&self)
     where
@@ -398,29 +612,253 @@ where
         let Some(consensus) = partitions.consensus() else {
             return;
         };
-        let Some(journal) = partitions.journal() else {
+
+        let current_op = consensus.sequencer().current_sequence();
+        let current_commit = consensus.commit_min();
+        let actions = consensus.tick(PlaneKind::Partitions, current_op, 
current_commit);
+
+        dispatch_vsr_actions(consensus, partitions.journal(), &actions).await;
+    }
+
+    /// Tick the metadata consensus and dispatch any resulting actions.
+    #[allow(clippy::future_not_send)]
+    pub async fn tick_metadata(&self)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+    {
+        let metadata = self.plane.metadata();
+        let Some(ref consensus) = metadata.consensus else {
             return;
         };
 
         let current_op = consensus.sequencer().current_sequence();
         let current_commit = consensus.commit_min();
-        let actions = consensus.tick(PlaneKind::Partitions, current_op, 
current_commit);
+        let actions = consensus.tick(PlaneKind::Metadata, current_op, 
current_commit);
+
+        dispatch_vsr_actions(consensus, metadata.journal.as_ref(), 
&actions).await;
+    }
+}
+
+/// Dispatch a list of `VsrAction`s by constructing the appropriate
+/// protocol messages and sending them via the consensus message bus.
+#[allow(
+    clippy::future_not_send,
+    clippy::too_many_lines,
+    clippy::cast_possible_truncation
+)]
+async fn dispatch_vsr_actions<B, P, J>(
+    consensus: &VsrConsensus<B, P>,
+    journal: Option<&J>,
+    actions: &[VsrAction],
+) where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Entry = consensus::PipelineEntry>,
+    J: JournalHandle,
+    <J as JournalHandle>::Target: Journal<
+            <J as JournalHandle>::Storage,
+            Entry = Message<PrepareHeader>,
+            Header = PrepareHeader,
+        >,
+{
+    use std::mem::size_of;
 
-        for action in actions {
-            if let consensus::VsrAction::RetransmitPrepares { targets } = 
action {
+    let bus = consensus.message_bus();
+    let self_id = consensus.replica();
+    let cluster = consensus.cluster();
+    let replica_count = consensus.replica_count();
+
+    let send = |target: u8, msg: Message<GenericHeader>| async move {
+        if let Err(e) = bus.send_to_replica(target, msg).await {
+            tracing::debug!(replica = self_id, target, "bus send failed: {e}");
+        }
+    };
+
+    for action in actions {
+        match action {
+            VsrAction::SendStartViewChange { view, namespace } => {
+                let msg = 
Message::<StartViewChangeHeader>::new(size_of::<StartViewChangeHeader>())
+                    .transmute_header(|_, h: &mut StartViewChangeHeader| {
+                        h.command = Command2::StartViewChange;
+                        h.cluster = cluster;
+                        h.replica = self_id;
+                        h.view = *view;
+                        h.namespace = *namespace;
+                        h.size = size_of::<StartViewChangeHeader>() as u32;
+                    });
+                for target in 0..replica_count {
+                    if target != self_id {
+                        send(target, msg.deep_copy().into_generic()).await;
+                    }
+                }
+            }
+            VsrAction::SendDoViewChange {
+                view,
+                target,
+                log_view,
+                op,
+                commit,
+                namespace,
+            } => {
+                let msg = 
Message::<DoViewChangeHeader>::new(size_of::<DoViewChangeHeader>())
+                    .transmute_header(|_, h: &mut DoViewChangeHeader| {
+                        h.command = Command2::DoViewChange;
+                        h.cluster = cluster;
+                        h.replica = self_id;
+                        h.view = *view;
+                        h.log_view = *log_view;
+                        h.op = *op;
+                        h.commit = *commit;
+                        h.namespace = *namespace;
+                        h.size = size_of::<DoViewChangeHeader>() as u32;
+                    });
+                send(*target, msg.into_generic()).await;
+            }
+            VsrAction::SendStartView {
+                view,
+                op,
+                commit,
+                namespace,
+            } => {
+                let msg = 
Message::<StartViewHeader>::new(size_of::<StartViewHeader>())
+                    .transmute_header(|_, h: &mut StartViewHeader| {
+                        h.command = Command2::StartView;
+                        h.cluster = cluster;
+                        h.replica = self_id;
+                        h.view = *view;
+                        h.op = *op;
+                        h.commit = *commit;
+                        h.namespace = *namespace;
+                        h.size = size_of::<StartViewHeader>() as u32;
+                    });
+                for target in 0..replica_count {
+                    if target != self_id {
+                        send(target, msg.deep_copy().into_generic()).await;
+                    }
+                }
+            }
+            VsrAction::SendPrepareOk {
+                view,
+                from_op,
+                to_op,
+                target,
+                namespace,
+            } => {
+                let Some(journal) = journal else {
+                    continue;
+                };
+                for op in *from_op..=*to_op {
+                    let Some(prepare_header) = journal.handle().header(op as 
usize) else {
+                        continue;
+                    };
+                    let prepare_header = *prepare_header;
+                    let msg = 
Message::<PrepareOkHeader>::new(size_of::<PrepareOkHeader>())
+                        .transmute_header(|_, h: &mut PrepareOkHeader| {
+                            h.command = Command2::PrepareOk;
+                            h.cluster = cluster;
+                            h.replica = self_id;
+                            h.view = *view;
+                            h.op = op;
+                            h.commit = consensus.commit_max();
+                            h.timestamp = prepare_header.timestamp;
+                            h.parent = prepare_header.parent;
+                            h.prepare_checksum = prepare_header.checksum;
+                            h.request = prepare_header.request;
+                            h.operation = prepare_header.operation;
+                            h.namespace = *namespace;
+                            h.size = size_of::<PrepareOkHeader>() as u32;
+                        });
+                    send(*target, msg.into_generic()).await;
+                }
+            }
+            VsrAction::RetransmitPrepares { targets } => {
+                let Some(journal) = journal else {
+                    continue;
+                };
                 for (header, replicas) in targets {
-                    let Some(prepare) = journal.handle().entry(&header).await 
else {
+                    let Some(prepare) = journal.handle().entry(header).await 
else {
                         continue;
                     };
                     for replica in replicas {
-                        let _ = consensus
-                            .message_bus()
-                            .send_to_replica(replica, 
prepare.clone().into_generic())
-                            .await;
+                        send(*replica, prepare.clone().into_generic()).await;
+                    }
+                }
+            }
+            VsrAction::RebuildPipeline { from_op, to_op } => {
+                let Some(journal) = journal else {
+                    continue;
+                };
+                // Collect headers before borrowing the pipeline to avoid
+                // holding borrow_mut() across journal reads.
+                let mut gap_at = None;
+                let entries: Vec<_> = (*from_op..=*to_op)
+                    .map_while(|op| {
+                        let Some(header) = journal.handle().header(op as 
usize) else {
+                            gap_at = Some(op);
+                            return None;
+                        };
+                        let mut entry = consensus::PipelineEntry::new(*header);
+                        entry.add_ack(self_id);
+                        Some(entry)
+                    })
+                    .collect();
+                if let Some(missing_op) = gap_at {
+                    // Journal repair is not yet implemented.Truncate the 
sequencer
+                    // to the last op we could rebuild so the next client
+                    // prepare chains correctly. Ops above the
+                    // gap are lost until journal repair is added.
+                    let rebuilt_up_to = missing_op.saturating_sub(1);
+                    tracing::warn!(
+                        replica = self_id,
+                        missing_op,
+                        range_start = from_op,
+                        range_end = to_op,
+                        rebuilt = entries.len(),
+                        "RebuildPipeline: journal gap at op {missing_op}, \
+                         truncating sequencer from {to_op} to {rebuilt_up_to} \
+                         ({}/{} ops rebuilt)",
+                        entries.len(),
+                        to_op - from_op + 1,
+                    );
+                    consensus.sequencer().set_sequence(rebuilt_up_to);
+                }
+                let mut pipeline = consensus.pipeline().borrow_mut();
+                for entry in entries {
+                    pipeline.push(entry);
+                }
+            }
+            // Handled by the caller (shard view change handlers) since it
+            // requires access to the plane's commit_journal method.
+            VsrAction::CommitJournal => {}
+            VsrAction::SendCommit {
+                view,
+                commit,
+                namespace,
+                timestamp_monotonic,
+            } => {
+                let msg = 
Message::<CommitHeader>::new(size_of::<CommitHeader>()).transmute_header(
+                    |_, h: &mut CommitHeader| {
+                        h.command = Command2::Commit;
+                        h.cluster = cluster;
+                        h.replica = self_id;
+                        h.view = *view;
+                        h.commit = *commit;
+                        h.namespace = *namespace;
+                        h.timestamp_monotonic = *timestamp_monotonic;
+                        h.size = size_of::<CommitHeader>() as u32;
+                    },
+                );
+                for target in 0..replica_count {
+                    if target != self_id {
+                        send(target, msg.deep_copy().into_generic()).await;
                     }
                 }
             }
-            // TODO: Dispatch other VsrActions (view change messages, etc.)
         }
     }
 }
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 177cddaa0..70e23e40e 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -18,7 +18,9 @@
 use crate::shards_table::ShardsTable;
 use crate::{IggyShard, Receiver, ShardFrame};
 use futures::FutureExt;
-use iggy_binary_protocol::{ConsensusError, GenericHeader, Message, MessageBag, 
PrepareHeader};
+use iggy_binary_protocol::{
+    ConsensusError, ConsensusHeader, GenericHeader, Message, MessageBag, 
PrepareHeader,
+};
 use iggy_common::sharding::IggyNamespace;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
@@ -63,6 +65,22 @@ where
                 let h = *p.header();
                 (h.operation, h.namespace, p.into_generic())
             }
+            MessageBag::StartViewChange(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
+            MessageBag::DoViewChange(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
+            MessageBag::StartView(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
+            MessageBag::Commit(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
         };
         let namespace = IggyNamespace::from_raw(namespace);
         let target = if operation.is_metadata() {
@@ -79,9 +97,26 @@ where
                 0
             })
         } else {
+            // TODO: View change messages (StartViewChange, DoViewChange, 
StartView) and
+            // Commit heartbeats return Operation::Reserved, so they always 
land here and
+            // route to shard 0. This is correct only in single-shard 
deployments. In
+            // multi-shard, partition-plane messages must reach the shard 
owning that
+            // consensus group. Fixing this requires routing by Command2 + 
consensus
+            // namespace (a u64) rather than by Operation + IggyNamespace, 
since these
+            // headers don't carry an IggyNamespace.
             0
         };
-        let _ = self.senders[target as 
usize].send(ShardFrame::fire_and_forget(generic));
+        if self.senders[target as usize]
+            .send(ShardFrame::fire_and_forget(generic))
+            .is_err()
+        {
+            tracing::warn!(
+                shard = self.id,
+                target,
+                ?operation,
+                "dispatch: shard channel full or closed, message dropped"
+            );
+        }
     }
 
     /// Dispatch a message and return a receiver that resolves when the target
@@ -107,6 +142,22 @@ where
                 let h = *p.header();
                 (h.operation, h.namespace, p.into_generic())
             }
+            MessageBag::StartViewChange(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
+            MessageBag::DoViewChange(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
+            MessageBag::StartView(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
+            MessageBag::Commit(m) => {
+                let h = *m.header();
+                (h.operation(), h.namespace, m.into_generic())
+            }
         };
         let namespace = IggyNamespace::from_raw(namespace);
 
@@ -131,11 +182,19 @@ where
                 0
             })
         } else {
+            // TODO: Same view-change and Commit routing issue as dispatch() 
above.
             0
         };
         // Create a frame and send it to the target shard.
         let (frame, rx) = ShardFrame::<R>::with_response(generic);
-        let _ = self.senders[target as usize].send(frame);
+        if self.senders[target as usize].send(frame).is_err() {
+            tracing::warn!(
+                shard = self.id,
+                target,
+                ?operation,
+                "dispatch_request: shard channel full or closed, message 
dropped"
+            );
+        }
         Ok(rx)
     }
 
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index d9d9029a3..193a5697d 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -109,9 +109,11 @@ impl Simulator {
     ///
     /// Returns all client replies delivered during this tick.
     ///
-    /// The tick has three phases that never borrow replicas and network
+    /// The tick has four phases that never borrow replicas and network
     /// simultaneously:
     ///
+    /// 0. **Consensus tick**: advance consensus timeouts on live replicas,
+    ///    dispatching any resulting actions (view change messages, 
retransmissions).
     /// 1. **Deliver**: `network.step()` returns ready packets; each is
     ///    dispatched to its target replica (or collected as a client reply).
     /// 2. **Drain**: each live replica's outbox is drained and fed into
@@ -126,6 +128,14 @@ impl Simulator {
     pub fn step(&mut self) -> Vec<Message<ReplyHeader>> {
         let mut client_replies = Vec::new();
 
+        // Phase 0: Advance consensus timeouts (skip crashed replicas).
+        for (i, replica) in self.replicas.iter().enumerate() {
+            if !self.crashed.contains(&(i as u8)) {
+                futures::executor::block_on(replica.tick_partitions());
+                futures::executor::block_on(replica.tick_metadata());
+            }
+        }
+
         // Phase 1: Deliver ready packets from the network.
         let packets = self.network.step();
         for packet in &packets {
@@ -226,14 +236,17 @@ impl Simulator {
         self.crashed.contains(&replica_index)
     }
 
-    /// Advance consensus timeouts and dispatch actions on every replica.
+    /// Advance consensus timeouts and dispatch actions on every live replica.
     ///
-    /// Call this once per simulation tick. Handles prepare retransmission
-    /// (and eventually view-change messages) via the timeout system.
-    #[allow(clippy::future_not_send)]
+    /// Note: `step()` already calls this internally. This method is provided
+    /// for callers that need to tick consensus without a full step cycle.
+    #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
     pub async fn tick(&self) {
-        for replica in &self.replicas {
-            replica.tick_partitions().await;
+        for (i, replica) in self.replicas.iter().enumerate() {
+            if !self.crashed.contains(&(i as u8)) {
+                replica.tick_partitions().await;
+                replica.tick_metadata().await;
+            }
         }
     }
 
@@ -291,3 +304,164 @@ impl Simulator {
 // 2. Send a request to ns_b, step until ns_b reply arrives.
 // 3. Assert ns_b committed while ns_a pipeline is still full.
 // Requires namespace-aware stepping (filter bus by namespace) or two-phase 
delivery.
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::client::SimClient;
+    use consensus::Status;
+    use iggy_common::sharding::IggyNamespace;
+
+    /// After crashing the primary in a 5-node cluster, the 4 remaining
+    /// backups should detect the failure via heartbeat timeout and elect
+    /// a new primary through the view change protocol.
+    #[test]
+    fn view_change_after_primary_crash() {
+        iggy_common::MemoryPool::init_pool(&iggy_common::MemoryPoolConfigOther 
{
+            enabled: false,
+            size: iggy_common::IggyByteSize::from(0u64),
+            bucket_capacity: 1,
+        });
+
+        let replica_count: u8 = 5;
+        let client_id: u128 = 1;
+        let network_opts = packet::PacketSimulatorOptions {
+            node_count: replica_count,
+            client_count: 1,
+            ..packet::PacketSimulatorOptions::default()
+        };
+
+        let mut sim = Simulator::new(
+            replica_count as usize,
+            std::iter::once(client_id),
+            network_opts,
+        );
+        let client = SimClient::new(client_id);
+        let ns = IggyNamespace::new(1, 1, 0);
+        sim.init_partition(ns);
+
+        // Send a message through the primary (replica 0) to verify normal 
operation.
+        let msg = client.send_messages(ns, &[b"before crash"]);
+        sim.submit_request(client_id, 0, msg.into_generic());
+        let mut got_reply = false;
+        for _ in 0..100 {
+            if !sim.step().is_empty() {
+                got_reply = true;
+                break;
+            }
+        }
+        assert!(got_reply, "expected reply before crash");
+
+        // Crash the primary.
+        sim.replica_crash(0);
+
+        // Run enough steps for the heartbeat timeout to fire
+        // and the view change  to complete across 4 surviving replicas.
+        for _ in 0..800 {
+            sim.step();
+        }
+
+        // Verify that a new primary was elected in a higher view.
+        let mut new_primary_found = false;
+        for replica_idx in 1..replica_count {
+            let replica = &sim.replicas[replica_idx as usize];
+            let partitions = replica.plane.partitions();
+            let consensus = partitions.consensus().unwrap();
+            if consensus.view() > 0
+                && consensus.status() == Status::Normal
+                && consensus.is_primary()
+            {
+                new_primary_found = true;
+            }
+        }
+        assert!(
+            new_primary_found,
+            "expected a new primary after crashing replica 0"
+        );
+
+        // Submit a request to the new primary and verify it commits.
+        let c = sim.replicas[1].plane.partitions().consensus().unwrap();
+        let new_primary_idx = c.primary_index(c.view());
+
+        let msg2 = client.send_messages(ns, &[b"after view change"]);
+        sim.submit_request(client_id, new_primary_idx, msg2.into_generic());
+        let mut got_reply_after = false;
+        for _ in 0..200 {
+            if !sim.step().is_empty() {
+                got_reply_after = true;
+                break;
+            }
+        }
+        assert!(
+            got_reply_after,
+            "expected reply from new primary after view change"
+        );
+    }
+
+    /// Regression: a behind backup (`commit_min < commit_max`) becoming the 
new
+    /// primary must not panic during the `CommitMessage` heartbeat timeout.
+    /// Previously, `handle_commit_message_timeout` asserted `commit_min == 
commit_max`,
+    /// which fails when the new primary hasn't caught up yet.
+    #[test]
+    fn view_change_behind_backup_becomes_primary() {
+        iggy_common::MemoryPool::init_pool(&iggy_common::MemoryPoolConfigOther 
{
+            enabled: false,
+            size: iggy_common::IggyByteSize::from(0u64),
+            bucket_capacity: 1,
+        });
+
+        let replica_count: u8 = 3;
+        let client_id: u128 = 1;
+        let network_opts = packet::PacketSimulatorOptions {
+            node_count: replica_count,
+            client_count: 1,
+            ..packet::PacketSimulatorOptions::default()
+        };
+
+        let mut sim = Simulator::new(
+            replica_count as usize,
+            std::iter::once(client_id),
+            network_opts,
+        );
+        let client = SimClient::new(client_id);
+        let ns = IggyNamespace::new(1, 1, 0);
+        sim.init_partition(ns);
+
+        // Send several messages so the primary commits ahead of backups.
+        // Backups receive prepares but may not have committed all of them
+        // (commit_max lags behind the primary's commit_min because the
+        // primary's commit point is only propagated via subsequent Prepare
+        // headers or Commit heartbeats).
+        for i in 0..3 {
+            let msg = client.send_messages(ns, 
&[format!("msg-{i}").as_bytes()]);
+            sim.submit_request(client_id, 0, msg.into_generic());
+            // Only a few steps — enough for replication but not for the
+            // backup to fully learn the commit point.
+            for _ in 0..10 {
+                sim.step();
+            }
+        }
+
+        // Crash the primary immediately. Backups may have commit_min < 
commit_max.
+        sim.replica_crash(0);
+
+        // Run view change. This must not panic in 
handle_commit_message_timeout.
+        for _ in 0..800 {
+            sim.step();
+        }
+
+        // Verify a new primary was elected and is functional.
+        let mut new_primary_found = false;
+        for idx in 1..replica_count {
+            let c = sim.replicas[idx as usize]
+                .plane
+                .partitions()
+                .consensus()
+                .unwrap();
+            if c.view() > 0 && c.status() == Status::Normal && c.is_primary() {
+                new_primary_found = true;
+            }
+        }
+        assert!(new_primary_found, "expected a new primary");
+    }
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index c9373d3ef..d630c4a02 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -31,6 +31,12 @@ use std::sync::Arc;
 // TODO: Make configurable
 const CLUSTER_ID: u128 = 1;
 
+/// Consensus-level namespace for the partitions plane.
+///
+/// Must differ from the metadata namespace (0) so that view change messages
+/// can be routed to the correct consensus group.
+pub const PARTITIONS_CONSENSUS_NAMESPACE: u64 = 1;
+
 // For now there is only one shard per replica,
 // we will add support for multiple shards per replica in the future.
 pub type Replica = shard::IggyShard<
@@ -76,13 +82,11 @@ pub fn new_replica(id: u8, name: String, bus: 
&Arc<SimOutbox>, replica_count: u8
     let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)), 
partitions_config);
     partitions.set_journal(SimJournal::<MemStorage>::default());
 
-    // TODO: namespace=0 collides with metadata consensus. Safe for now 
because the simulator
-    // routes by Operation type, but a shared view change bus would produce 
namespace collisions.
     let partition_consensus = VsrConsensus::new(
         CLUSTER_ID,
         id,
         replica_count,
-        0,
+        PARTITIONS_CONSENSUS_NAMESPACE,
         SharedSimOutbox(Arc::clone(bus)),
         NamespacedPipeline::new(),
     );

Reply via email to