This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partition_remaster
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fbf459b936601778c8c09df9d0da0d3a4de86c72
Author: numinex <[email protected]>
AuthorDate: Mon Mar 30 14:28:18 2026 +0200

    obersvability
---
 Cargo.lock                                |   1 +
 core/consensus/Cargo.toml                 |   1 +
 core/consensus/src/impls.rs               | 312 ++++++++++++---
 core/consensus/src/lib.rs                 |   6 +-
 core/consensus/src/namespaced_pipeline.rs |   4 +
 core/consensus/src/observability.rs       | 604 ++++++++++++++++++++++++++++++
 core/consensus/src/plane_helpers.rs       |  52 ++-
 core/metadata/src/impls/metadata.rs       | 131 +++++--
 core/partitions/src/iggy_index_writer.rs  |  14 +-
 core/partitions/src/iggy_partition.rs     |   1 +
 core/partitions/src/iggy_partitions.rs    | 309 +++++++++++----
 core/partitions/src/messages_writer.rs    |   2 +
 12 files changed, 1262 insertions(+), 175 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index dfcd39e28..1276be916 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2237,6 +2237,7 @@ dependencies = [
  "message_bus",
  "rand 0.10.0",
  "rand_xoshiro",
+ "tracing",
 ]
 
 [[package]]
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 0428a64d6..dac219e3a 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -37,6 +37,7 @@ iobuf = { workspace = true }
 message_bus = { workspace = true }
 rand = { workspace = true }
 rand_xoshiro = { workspace = true }
+tracing = { workspace = true }
 
 [dev-dependencies]
 futures = { workspace = true }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 5bea3adc6..bf9a52412 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -17,8 +17,10 @@
 
 use crate::vsr_timeout::{TimeoutKind, TimeoutManager};
 use crate::{
-    Consensus, DvcQuorumArray, Pipeline, Project, StoredDvc, dvc_count, 
dvc_max_commit,
-    dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner,
+    AckLogEvent, Consensus, ControlActionLogEvent, DvcQuorumArray, 
IgnoreReason, Pipeline,
+    PlaneKind, PrepareLogEvent, Project, ReplicaLogContext, SimEventKind, 
StoredDvc,
+    ViewChangeLogEvent, ViewChangeReason, dvc_count, dvc_max_commit, 
dvc_quorum_array_empty,
+    dvc_record, dvc_reset, dvc_select_winner, emit_replica_event, 
emit_sim_event,
 };
 use bit_set::BitSet;
 use iggy_binary_protocol::{
@@ -384,6 +386,10 @@ impl Pipeline for LocalPipeline {
         Self::is_empty(self)
     }
 
+    fn len(&self) -> usize {
+        self.prepare_count()
+    }
+
     fn verify(&self) {
         Self::verify(self);
     }
@@ -426,6 +432,27 @@ pub enum VsrAction {
     },
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PrepareOkOutcome {
+    Accepted {
+        ack_count: usize,
+        quorum_reached: bool,
+    },
+    Ignored {
+        reason: IgnoreReason,
+    },
+}
+
+impl PrepareOkOutcome {
+    #[must_use]
+    pub const fn quorum_reached(self) -> bool {
+        match self {
+            Self::Accepted { quorum_reached, .. } => quorum_reached,
+            Self::Ignored { .. } => false,
+        }
+    }
+}
+
 #[allow(unused)]
 #[derive(Debug)]
 pub struct VsrConsensus<B = IggyMessageBus, P = LocalPipeline>
@@ -691,7 +718,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     ///
     /// Returns a list of actions to take based on fired timeouts.
     /// Empty vec means no actions needed.
-    pub fn tick(&self, current_op: u64, current_commit: u64) -> Vec<VsrAction> 
{
+    pub fn tick(&self, plane: PlaneKind, current_op: u64, current_commit: u64) 
-> Vec<VsrAction> {
         let mut actions = Vec::new();
         let mut timeouts = self.timeouts.borrow_mut();
 
@@ -701,25 +728,29 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         // Phase 2: Handle fired timeouts
         if timeouts.fired(TimeoutKind::NormalHeartbeat) {
             drop(timeouts);
-            actions.extend(self.handle_normal_heartbeat_timeout());
+            actions.extend(self.handle_normal_heartbeat_timeout(plane));
             timeouts = self.timeouts.borrow_mut();
         }
 
         if timeouts.fired(TimeoutKind::StartViewChangeMessage) {
             drop(timeouts);
-            actions.extend(self.handle_start_view_change_message_timeout());
+            
actions.extend(self.handle_start_view_change_message_timeout(plane));
             timeouts = self.timeouts.borrow_mut();
         }
 
         if timeouts.fired(TimeoutKind::DoViewChangeMessage) {
             drop(timeouts);
-            
actions.extend(self.handle_do_view_change_message_timeout(current_op, 
current_commit));
+            actions.extend(self.handle_do_view_change_message_timeout(
+                plane,
+                current_op,
+                current_commit,
+            ));
             timeouts = self.timeouts.borrow_mut();
         }
 
         if timeouts.fired(TimeoutKind::ViewChangeStatus) {
             drop(timeouts);
-            actions.extend(self.handle_view_change_status_timeout());
+            actions.extend(self.handle_view_change_status_timeout(plane));
             // timeouts = self.timeouts.borrow_mut(); // Not needed if last
         }
 
@@ -728,7 +759,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
     /// Called when `normal_heartbeat` timeout fires.
     /// Backup hasn't heard from primary - start view change.
-    fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> {
+    fn handle_normal_heartbeat_timeout(&self, plane: PlaneKind) -> 
Vec<VsrAction> {
         // Only backups trigger view change on heartbeat timeout
         if self.is_primary() {
             return Vec::new();
@@ -740,7 +771,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }
 
         // Advance to new view and transition to view change
-        let new_view = self.view.get() + 1;
+        let old_view = self.view.get();
+        let new_view = old_view + 1;
 
         self.view.set(new_view);
         self.status.set(Status::ViewChange);
@@ -758,14 +790,32 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts.start(TimeoutKind::ViewChangeStatus);
         }
 
-        vec![VsrAction::SendStartViewChange {
+        emit_sim_event(
+            SimEventKind::ViewChangeStarted,
+            &ViewChangeLogEvent {
+                replica: ReplicaLogContext::from_consensus(self, plane),
+                old_view,
+                new_view,
+                reason: ViewChangeReason::NormalHeartbeatTimeout,
+            },
+        );
+
+        let action = VsrAction::SendStartViewChange {
             view: new_view,
             namespace: self.namespace,
-        }]
+        };
+        emit_sim_event(
+            SimEventKind::ControlMessageScheduled,
+            &ControlActionLogEvent::from_vsr_action(
+                ReplicaLogContext::from_consensus(self, plane),
+                &action,
+            ),
+        );
+        vec![action]
     }
 
     /// Resend SVC message if we've started view change.
-    fn handle_start_view_change_message_timeout(&self) -> Vec<VsrAction> {
+    fn handle_start_view_change_message_timeout(&self, plane: PlaneKind) -> 
Vec<VsrAction> {
         if !self.sent_own_start_view_change.get() {
             return Vec::new();
         }
@@ -774,15 +824,24 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             .borrow_mut()
             .reset(TimeoutKind::StartViewChangeMessage);
 
-        vec![VsrAction::SendStartViewChange {
+        let action = VsrAction::SendStartViewChange {
             view: self.view.get(),
             namespace: self.namespace,
-        }]
+        };
+        emit_sim_event(
+            SimEventKind::ControlMessageScheduled,
+            &ControlActionLogEvent::from_vsr_action(
+                ReplicaLogContext::from_consensus(self, plane),
+                &action,
+            ),
+        );
+        vec![action]
     }
 
     /// Resend DVC message if we've sent one.
     fn handle_do_view_change_message_timeout(
         &self,
+        plane: PlaneKind,
         current_op: u64,
         current_commit: u64,
     ) -> Vec<VsrAction> {
@@ -803,24 +862,33 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             .borrow_mut()
             .reset(TimeoutKind::DoViewChangeMessage);
 
-        vec![VsrAction::SendDoViewChange {
+        let action = VsrAction::SendDoViewChange {
             view: self.view.get(),
             target: self.primary_index(self.view.get()),
             log_view: self.log_view.get(),
             op: current_op,
             commit: current_commit,
             namespace: self.namespace,
-        }]
+        };
+        emit_sim_event(
+            SimEventKind::ControlMessageScheduled,
+            &ControlActionLogEvent::from_vsr_action(
+                ReplicaLogContext::from_consensus(self, plane),
+                &action,
+            ),
+        );
+        vec![action]
     }
 
     /// Escalate to next view if stuck in view change.
-    fn handle_view_change_status_timeout(&self) -> Vec<VsrAction> {
+    fn handle_view_change_status_timeout(&self, plane: PlaneKind) -> 
Vec<VsrAction> {
         if self.status.get() != Status::ViewChange {
             return Vec::new();
         }
 
         // Escalate: try next view
-        let next_view = self.view.get() + 1;
+        let old_view = self.view.get();
+        let next_view = old_view + 1;
 
         self.view.set(next_view);
         self.reset_view_change_state();
@@ -833,10 +901,28 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             .borrow_mut()
             .reset(TimeoutKind::ViewChangeStatus);
 
-        vec![VsrAction::SendStartViewChange {
+        emit_sim_event(
+            SimEventKind::ViewChangeStarted,
+            &ViewChangeLogEvent {
+                replica: ReplicaLogContext::from_consensus(self, plane),
+                old_view,
+                new_view: next_view,
+                reason: ViewChangeReason::ViewChangeStatusTimeout,
+            },
+        );
+
+        let action = VsrAction::SendStartViewChange {
             view: next_view,
             namespace: self.namespace,
-        }]
+        };
+        emit_sim_event(
+            SimEventKind::ControlMessageScheduled,
+            &ControlActionLogEvent::from_vsr_action(
+                ReplicaLogContext::from_consensus(self, plane),
+                &action,
+            ),
+        );
+        vec![action]
     }
 
     /// Handle a received `StartViewChange` message.
@@ -847,7 +933,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     ///
     /// # Panics
     /// If `header.namespace` does not match this replica's namespace.
-    pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> 
Vec<VsrAction> {
+    pub fn handle_start_view_change(
+        &self,
+        plane: PlaneKind,
+        header: &StartViewChangeHeader,
+    ) -> Vec<VsrAction> {
         assert_eq!(
             header.namespace, self.namespace,
             "SVC routed to wrong group"
@@ -864,6 +954,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
         // If SVC is for a higher view, advance to that view
         if msg_view > self.view.get() {
+            let old_view = self.view.get();
             self.view.set(msg_view);
             self.status.set(Status::ViewChange);
             self.reset_view_change_state();
@@ -880,11 +971,29 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 timeouts.start(TimeoutKind::ViewChangeStatus);
             }
 
+            emit_sim_event(
+                SimEventKind::ViewChangeStarted,
+                &ViewChangeLogEvent {
+                    replica: ReplicaLogContext::from_consensus(self, plane),
+                    old_view,
+                    new_view: msg_view,
+                    reason: ViewChangeReason::ReceivedStartViewChange,
+                },
+            );
+
             // Send our own SVC
-            actions.push(VsrAction::SendStartViewChange {
+            let action = VsrAction::SendStartViewChange {
                 view: msg_view,
                 namespace: self.namespace,
-            });
+            };
+            emit_sim_event(
+                SimEventKind::ControlMessageScheduled,
+                &ControlActionLogEvent::from_vsr_action(
+                    ReplicaLogContext::from_consensus(self, plane),
+                    &action,
+                ),
+            );
+            actions.push(action);
         }
 
         // Record the SVC from sender
@@ -908,14 +1017,22 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 .borrow_mut()
                 .start(TimeoutKind::DoViewChangeMessage);
 
-            actions.push(VsrAction::SendDoViewChange {
+            let action = VsrAction::SendDoViewChange {
                 view: self.view.get(),
                 target: primary_candidate,
                 log_view: self.log_view.get(),
                 op: current_op,
                 commit: current_commit,
                 namespace: self.namespace,
-            });
+            };
+            emit_sim_event(
+                SimEventKind::ControlMessageScheduled,
+                &ControlActionLogEvent::from_vsr_action(
+                    ReplicaLogContext::from_consensus(self, plane),
+                    &action,
+                ),
+            );
+            actions.push(action);
 
             // If we are the primary candidate, record our own DVC
             if primary_candidate == self.replica {
@@ -933,7 +1050,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 // Check if we now have quorum
                 if dvc_count(&self.do_view_change_from_all_replicas.borrow()) 
>= self.quorum() {
                     self.do_view_change_quorum.set(true);
-                    actions.extend(self.complete_view_change_as_primary());
+                    
actions.extend(self.complete_view_change_as_primary(plane));
                 }
             }
         }
@@ -949,7 +1066,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     ///
     /// # Panics
     /// If `header.namespace` does not match this replica's namespace.
-    pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> 
Vec<VsrAction> {
+    pub fn handle_do_view_change(
+        &self,
+        plane: PlaneKind,
+        header: &DoViewChangeHeader,
+    ) -> Vec<VsrAction> {
         assert_eq!(
             header.namespace, self.namespace,
             "DVC routed to wrong group"
@@ -969,6 +1090,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
         // If DVC is for a higher view, advance to that view
         if msg_view > self.view.get() {
+            let old_view = self.view.get();
             self.view.set(msg_view);
             self.status.set(Status::ViewChange);
             self.reset_view_change_state();
@@ -985,11 +1107,29 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 timeouts.start(TimeoutKind::ViewChangeStatus);
             }
 
+            emit_sim_event(
+                SimEventKind::ViewChangeStarted,
+                &ViewChangeLogEvent {
+                    replica: ReplicaLogContext::from_consensus(self, plane),
+                    old_view,
+                    new_view: msg_view,
+                    reason: ViewChangeReason::ReceivedDoViewChange,
+                },
+            );
+
             // Send our own SVC
-            actions.push(VsrAction::SendStartViewChange {
+            let action = VsrAction::SendStartViewChange {
                 view: msg_view,
                 namespace: self.namespace,
-            });
+            };
+            emit_sim_event(
+                SimEventKind::ControlMessageScheduled,
+                &ControlActionLogEvent::from_vsr_action(
+                    ReplicaLogContext::from_consensus(self, plane),
+                    &action,
+                ),
+            );
+            actions.push(action);
         }
 
         // Only the primary candidate processes DVCs for quorum
@@ -1035,7 +1175,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             && dvc_count(&self.do_view_change_from_all_replicas.borrow()) >= 
self.quorum()
         {
             self.do_view_change_quorum.set(true);
-            actions.extend(self.complete_view_change_as_primary());
+            actions.extend(self.complete_view_change_as_primary(plane));
         }
 
         actions
@@ -1050,7 +1190,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     ///
     /// # Panics
     /// If `header.namespace` does not match this replica's namespace.
-    pub fn handle_start_view(&self, header: &StartViewHeader) -> 
Vec<VsrAction> {
+    pub fn handle_start_view(&self, plane: PlaneKind, header: 
&StartViewHeader) -> Vec<VsrAction> {
         assert_eq!(header.namespace, self.namespace, "SV routed to wrong 
group");
         let from_replica = header.replica;
         let msg_view = header.view;
@@ -1097,19 +1237,32 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         // Send PrepareOK for uncommitted ops (commit+1 to op)
         let mut actions = Vec::new();
         for op_num in (msg_commit + 1)..=msg_op {
-            actions.push(VsrAction::SendPrepareOk {
+            let action = VsrAction::SendPrepareOk {
                 view: msg_view,
                 op: op_num,
                 target: from_replica,
                 namespace: self.namespace,
-            });
+            };
+            emit_sim_event(
+                SimEventKind::ControlMessageScheduled,
+                &ControlActionLogEvent::from_vsr_action(
+                    ReplicaLogContext::from_consensus(self, plane),
+                    &action,
+                ),
+            );
+            actions.push(action);
         }
 
+        emit_replica_event(
+            SimEventKind::ReplicaStateChanged,
+            &ReplicaLogContext::from_consensus(self, plane),
+        );
+
         actions
     }
 
     /// Complete view change as the new primary after collecting DVC quorum.
-    fn complete_view_change_as_primary(&self) -> Vec<VsrAction> {
+    fn complete_view_change_as_primary(&self, plane: PlaneKind) -> 
Vec<VsrAction> {
         let dvc_array = self.do_view_change_from_all_replicas.borrow();
 
         let Some(winner) = dvc_select_winner(&dvc_array) else {
@@ -1143,23 +1296,39 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts.start(TimeoutKind::CommitMessage);
         }
 
-        vec![VsrAction::SendStartView {
+        let state = ReplicaLogContext::from_consensus(self, plane);
+        emit_replica_event(SimEventKind::PrimaryElected, &state);
+        emit_replica_event(SimEventKind::ReplicaStateChanged, &state);
+
+        let action = VsrAction::SendStartView {
             view: self.view.get(),
             op: new_op,
             commit: max_commit,
             namespace: self.namespace,
-        }]
+        };
+        emit_sim_event(
+            SimEventKind::ControlMessageScheduled,
+            &ControlActionLogEvent::from_vsr_action(
+                ReplicaLogContext::from_consensus(self, plane),
+                &action,
+            ),
+        );
+        vec![action]
     }
 
     /// Handle a `PrepareOk` message from a replica.
     ///
-    /// Returns `true` if quorum was just reached for this op.
+    /// Returns rich ack-progress information for structured logging.
     /// Caller (`on_ack`) should validate `is_primary` and status before 
calling.
     ///
     /// # Panics
     /// - If `header.command` is not `Command2::PrepareOk`.
     /// - If `header.replica >= self.replica_count`.
-    pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
+    pub fn handle_prepare_ok(
+        &self,
+        plane: PlaneKind,
+        header: &PrepareOkHeader,
+    ) -> PrepareOkOutcome {
         assert_eq!(header.command, Command2::PrepareOk);
         assert!(
             header.replica < self.replica_count,
@@ -1169,17 +1338,23 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
         // Ignore if from older view
         if header.view < self.view() {
-            return false;
+            return PrepareOkOutcome::Ignored {
+                reason: IgnoreReason::OlderView,
+            };
         }
 
         // Ignore if from newer view
         if header.view > self.view() {
-            return false;
+            return PrepareOkOutcome::Ignored {
+                reason: IgnoreReason::NewerView,
+            };
         }
 
         // Ignore if syncing
         if self.is_syncing() {
-            return false;
+            return PrepareOkOutcome::Ignored {
+                reason: IgnoreReason::Syncing,
+            };
         }
 
         // Find the prepare in our pipeline
@@ -1187,30 +1362,54 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
         let Some(entry) = pipeline.entry_by_op_mut(header.op) else {
             // Not in pipeline - could be old/duplicate or already committed
-            return false;
+            return PrepareOkOutcome::Ignored {
+                reason: IgnoreReason::UnknownPrepare,
+            };
         };
 
         // Verify checksum matches
         if entry.header.checksum != header.prepare_checksum {
-            return false;
+            return PrepareOkOutcome::Ignored {
+                reason: IgnoreReason::ChecksumMismatch,
+            };
         }
 
         // Check for duplicate ack
         if entry.has_ack(header.replica) {
-            return false;
+            return PrepareOkOutcome::Ignored {
+                reason: IgnoreReason::DuplicateAck,
+            };
         }
 
         // Record the ack from this replica
         let ack_count = entry.add_ack(header.replica);
         let quorum = self.quorum();
+        let quorum_reached = ack_count >= quorum && !entry.ok_quorum_received;
 
         // Check if we've reached quorum
-        if ack_count >= quorum && !entry.ok_quorum_received {
+        if quorum_reached {
             entry.ok_quorum_received = true;
-            return true;
         }
 
-        false
+        drop(pipeline);
+
+        emit_sim_event(
+            SimEventKind::PrepareAcked,
+            &AckLogEvent {
+                replica: ReplicaLogContext::from_consensus(self, plane),
+                op: header.op,
+                prepare_checksum: header.prepare_checksum,
+                ack_from_replica: header.replica,
+                ack_count,
+                quorum,
+                quorum_reached,
+            },
+        );
+
+        PrepareOkOutcome::Accepted {
+            ack_count,
+            quorum_reached,
+        }
     }
 
     /// Enqueue a self-addressed message for processing in the next loopback 
drain.
@@ -1339,11 +1538,28 @@ where
     // (push_loopback / drain_loopback_into) rather than inline here,
     // so that WAL persistence can happen between pipeline insertion
     // and ack recording.
-    fn pipeline_message(&self, message: &Self::Message<Self::ReplicateHeader>) 
{
+    fn pipeline_message(&self, plane: PlaneKind, message: 
&Self::Message<Self::ReplicateHeader>) {
         assert!(self.is_primary(), "only primary can pipeline messages");
 
         let mut pipeline = self.pipeline.borrow_mut();
         pipeline.push(PipelineEntry::new(*message.header()));
+        let pipeline_depth = pipeline.len();
+        drop(pipeline);
+
+        let header = message.header();
+        emit_sim_event(
+            SimEventKind::PrepareQueued,
+            &PrepareLogEvent {
+                replica: ReplicaLogContext::from_consensus(self, plane),
+                op: header.op,
+                parent_checksum: header.parent,
+                prepare_checksum: header.checksum,
+                client_id: header.client,
+                request_id: header.request,
+                operation: header.operation,
+                pipeline_depth,
+            },
+        );
     }
 
     fn verify_pipeline(&self) {
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 55114ab43..200ee87ad 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -44,6 +44,8 @@ pub trait Pipeline {
 
     fn is_empty(&self) -> bool;
 
+    fn len(&self) -> usize;
+
     fn verify(&self);
 }
 
@@ -63,7 +65,7 @@ pub trait Consensus: Sized {
     type Sequencer: Sequencer;
     type Pipeline: Pipeline;
 
-    fn pipeline_message(&self, message: &Self::Message<Self::ReplicateHeader>);
+    fn pipeline_message(&self, plane: PlaneKind, message: 
&Self::Message<Self::ReplicateHeader>);
     fn verify_pipeline(&self);
 
     fn is_follower(&self) -> bool;
@@ -109,6 +111,8 @@ mod namespaced_pipeline;
 pub use namespaced_pipeline::*;
 mod plane_helpers;
 pub use plane_helpers::*;
+mod observability;
+pub use observability::*;
 
 mod view_change_quorum;
 pub use view_change_quorum::*;
diff --git a/core/consensus/src/namespaced_pipeline.rs 
b/core/consensus/src/namespaced_pipeline.rs
index 153282680..71e18fb71 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -264,6 +264,10 @@ impl Pipeline for NamespacedPipeline {
         self.total_count == 0
     }
 
+    fn len(&self) -> usize {
+        self.total_count
+    }
+
     fn verify(&self) {
         assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX);
 
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
new file mode 100644
index 000000000..865935faf
--- /dev/null
+++ b/core/consensus/src/observability.rs
@@ -0,0 +1,604 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{Pipeline, PipelineEntry, Status, VsrAction, VsrConsensus};
+use iggy_binary_protocol::Operation;
+use iggy_common::sharding::IggyNamespace;
+use message_bus::MessageBus;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PlaneKind {
+    Metadata,
+    Partitions,
+}
+
+impl PlaneKind {
+    #[must_use]
+    pub const fn as_str(self) -> &'static str {
+        match self {
+            Self::Metadata => "metadata",
+            Self::Partitions => "partitions",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ReplicaRole {
+    Primary,
+    Backup,
+}
+
+impl ReplicaRole {
+    #[must_use]
+    pub const fn as_str(self) -> &'static str {
+        match self {
+            Self::Primary => "primary",
+            Self::Backup => "backup",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ViewChangeReason {
+    NormalHeartbeatTimeout,
+    ViewChangeStatusTimeout,
+    ReceivedStartViewChange,
+    ReceivedDoViewChange,
+}
+
+impl ViewChangeReason {
+    #[must_use]
+    pub const fn as_str(self) -> &'static str {
+        match self {
+            Self::NormalHeartbeatTimeout => "normal_heartbeat_timeout",
+            Self::ViewChangeStatusTimeout => "view_change_status_timeout",
+            Self::ReceivedStartViewChange => "received_start_view_change",
+            Self::ReceivedDoViewChange => "received_do_view_change",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum IgnoreReason {
+    NotPrimary,
+    NotNormal,
+    Syncing,
+    NewerView,
+    OlderView,
+    OldPrepare,
+    UnknownPrepare,
+    DuplicateAck,
+    ChecksumMismatch,
+    InvalidOperation,
+    ApplyFailed,
+    PersistFailed,
+}
+
+impl IgnoreReason {
+    #[must_use]
+    pub const fn as_str(self) -> &'static str {
+        match self {
+            Self::NotPrimary => "not_primary",
+            Self::NotNormal => "not_normal",
+            Self::Syncing => "syncing",
+            Self::NewerView => "newer_view",
+            Self::OlderView => "older_view",
+            Self::OldPrepare => "old_prepare",
+            Self::UnknownPrepare => "unknown_prepare",
+            Self::DuplicateAck => "duplicate_ack",
+            Self::ChecksumMismatch => "checksum_mismatch",
+            Self::InvalidOperation => "invalid_operation",
+            Self::ApplyFailed => "apply_failed",
+            Self::PersistFailed => "persist_failed",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ControlActionKind {
+    SendStartViewChange,
+    SendDoViewChange,
+    SendStartView,
+    SendPrepareOk,
+}
+
+impl ControlActionKind {
+    #[must_use]
+    pub const fn as_str(self) -> &'static str {
+        match self {
+            Self::SendStartViewChange => "send_start_view_change",
+            Self::SendDoViewChange => "send_do_view_change",
+            Self::SendStartView => "send_start_view",
+            Self::SendPrepareOk => "send_prepare_ok",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SimEventKind {
+    ClientRequestReceived,
+    PrepareQueued,
+    PrepareAcked,
+    OperationCommitted,
+    ClientReplyEmitted,
+    ViewChangeStarted,
+    PrimaryElected,
+    ReplicaStateChanged,
+    NamespaceProgressUpdated,
+    ControlMessageScheduled,
+}
+
+impl SimEventKind {
+    #[must_use]
+    pub const fn as_str(self) -> &'static str {
+        match self {
+            Self::ClientRequestReceived => "ClientRequestReceived",
+            Self::PrepareQueued => "PrepareQueued",
+            Self::PrepareAcked => "PrepareAcked",
+            Self::OperationCommitted => "OperationCommitted",
+            Self::ClientReplyEmitted => "ClientReplyEmitted",
+            Self::ViewChangeStarted => "ViewChangeStarted",
+            Self::PrimaryElected => "PrimaryElected",
+            Self::ReplicaStateChanged => "ReplicaStateChanged",
+            Self::NamespaceProgressUpdated => "NamespaceProgressUpdated",
+            Self::ControlMessageScheduled => "ControlMessageScheduled",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct NamespaceLogContext {
+    pub raw: u64,
+    pub stream_id: Option<u32>,
+    pub topic_id: Option<u32>,
+    pub partition_id: Option<u32>,
+}
+
+impl NamespaceLogContext {
+    #[must_use]
+    pub fn from_raw(plane: PlaneKind, raw: u64) -> Self {
+        if matches!(plane, PlaneKind::Metadata) {
+            return Self {
+                raw,
+                stream_id: None,
+                topic_id: None,
+                partition_id: None,
+            };
+        }
+
+        let namespace = IggyNamespace::from_raw(raw);
+        #[allow(clippy::cast_possible_truncation)]
+        let stream_id = namespace.stream_id() as u32;
+        #[allow(clippy::cast_possible_truncation)]
+        let topic_id = namespace.topic_id() as u32;
+        #[allow(clippy::cast_possible_truncation)]
+        let partition_id = namespace.partition_id() as u32;
+
+        Self {
+            raw,
+            stream_id: Some(stream_id),
+            topic_id: Some(topic_id),
+            partition_id: Some(partition_id),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ReplicaLogContext {
+    pub plane: PlaneKind,
+    pub cluster_id: u128,
+    pub replica_id: u8,
+    pub namespace: NamespaceLogContext,
+    pub view: u32,
+    pub log_view: u32,
+    pub commit: u64,
+    pub status: Status,
+    pub role: ReplicaRole,
+}
+
+impl ReplicaLogContext {
+    #[must_use]
+    pub fn from_consensus<B, P>(consensus: &VsrConsensus<B, P>, plane: 
PlaneKind) -> Self
+    where
+        B: MessageBus,
+        P: Pipeline<Entry = PipelineEntry>,
+    {
+        let role = if consensus.is_primary() {
+            ReplicaRole::Primary
+        } else {
+            ReplicaRole::Backup
+        };
+
+        Self {
+            plane,
+            cluster_id: consensus.cluster(),
+            replica_id: consensus.replica(),
+            namespace: NamespaceLogContext::from_raw(plane, 
consensus.namespace()),
+            view: consensus.view(),
+            log_view: consensus.log_view(),
+            commit: consensus.commit(),
+            status: consensus.status(),
+            role,
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct RequestLogEvent {
+    pub replica: ReplicaLogContext,
+    pub client_id: u128,
+    pub request_id: u64,
+    pub operation: Operation,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct PrepareLogEvent {
+    pub replica: ReplicaLogContext,
+    pub op: u64,
+    pub parent_checksum: u128,
+    pub prepare_checksum: u128,
+    pub client_id: u128,
+    pub request_id: u64,
+    pub operation: Operation,
+    pub pipeline_depth: usize,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct AckLogEvent {
+    pub replica: ReplicaLogContext,
+    pub op: u64,
+    pub prepare_checksum: u128,
+    pub ack_from_replica: u8,
+    pub ack_count: usize,
+    pub quorum: usize,
+    pub quorum_reached: bool,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct CommitLogEvent {
+    pub replica: ReplicaLogContext,
+    pub op: u64,
+    pub client_id: u128,
+    pub request_id: u64,
+    pub operation: Operation,
+    pub pipeline_depth: usize,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ViewChangeLogEvent {
+    pub replica: ReplicaLogContext,
+    pub old_view: u32,
+    pub new_view: u32,
+    pub reason: ViewChangeReason,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ControlActionLogEvent {
+    pub replica: ReplicaLogContext,
+    pub action: ControlActionKind,
+    pub target_replica: Option<u8>,
+    pub op: Option<u64>,
+    pub commit: Option<u64>,
+}
+
+impl ControlActionLogEvent {
+    #[must_use]
+    pub const fn from_vsr_action(replica: ReplicaLogContext, action: 
&VsrAction) -> Self {
+        match *action {
+            VsrAction::SendStartViewChange { .. } => Self {
+                replica,
+                action: ControlActionKind::SendStartViewChange,
+                target_replica: None,
+                op: None,
+                commit: None,
+            },
+            VsrAction::SendDoViewChange {
+                target, op, commit, ..
+            } => Self {
+                replica,
+                action: ControlActionKind::SendDoViewChange,
+                target_replica: Some(target),
+                op: Some(op),
+                commit: Some(commit),
+            },
+            VsrAction::SendStartView { op, commit, .. } => Self {
+                replica,
+                action: ControlActionKind::SendStartView,
+                target_replica: None,
+                op: Some(op),
+                commit: Some(commit),
+            },
+            VsrAction::SendPrepareOk { target, op, .. } => Self {
+                replica,
+                action: ControlActionKind::SendPrepareOk,
+                target_replica: Some(target),
+                op: Some(op),
+                commit: None,
+            },
+        }
+    }
+}
+
+pub trait StructuredSimEvent {
+    fn emit(&self, sim_event: SimEventKind);
+}
+
+#[must_use]
+pub const fn status_as_str(status: Status) -> &'static str {
+    match status {
+        Status::Normal => "normal",
+        Status::ViewChange => "view_change",
+        Status::Recovering => "recovering",
+    }
+}
+
+#[must_use]
+pub const fn operation_as_str(operation: Operation) -> &'static str {
+    match operation {
+        Operation::Reserved => "reserved",
+        Operation::CreateStream => "create_stream",
+        Operation::UpdateStream => "update_stream",
+        Operation::DeleteStream => "delete_stream",
+        Operation::PurgeStream => "purge_stream",
+        Operation::CreateTopic => "create_topic",
+        Operation::UpdateTopic => "update_topic",
+        Operation::DeleteTopic => "delete_topic",
+        Operation::PurgeTopic => "purge_topic",
+        Operation::CreatePartitions => "create_partitions",
+        Operation::DeletePartitions => "delete_partitions",
+        Operation::DeleteSegments => "delete_segments",
+        Operation::CreateConsumerGroup => "create_consumer_group",
+        Operation::DeleteConsumerGroup => "delete_consumer_group",
+        Operation::CreateUser => "create_user",
+        Operation::UpdateUser => "update_user",
+        Operation::DeleteUser => "delete_user",
+        Operation::ChangePassword => "change_password",
+        Operation::UpdatePermissions => "update_permissions",
+        Operation::CreatePersonalAccessToken => "create_personal_access_token",
+        Operation::DeletePersonalAccessToken => "delete_personal_access_token",
+        Operation::SendMessages => "send_messages",
+        Operation::StoreConsumerOffset => "store_consumer_offset",
+    }
+}
+
+#[must_use]
+pub const fn namespace_component(component: Option<u32>) -> u32 {
+    match component {
+        Some(value) => value,
+        None => 0,
+    }
+}
+
+pub fn emit_sim_event<T>(sim_event: SimEventKind, event: &T)
+where
+    T: StructuredSimEvent,
+{
+    event.emit(sim_event);
+}
+
+pub fn emit_replica_event(sim_event: SimEventKind, ctx: &ReplicaLogContext) {
+    tracing::event!(
+        target: "iggy.sim",
+        tracing::Level::DEBUG,
+        sim_event = sim_event.as_str(),
+        plane = ctx.plane.as_str(),
+        cluster_id = ctx.cluster_id,
+        replica_id = ctx.replica_id,
+        namespace_raw = ctx.namespace.raw,
+        stream_id = namespace_component(ctx.namespace.stream_id),
+        topic_id = namespace_component(ctx.namespace.topic_id),
+        partition_id = namespace_component(ctx.namespace.partition_id),
+        view = ctx.view,
+        log_view = ctx.log_view,
+        commit = ctx.commit,
+        status = status_as_str(ctx.status),
+        role = ctx.role.as_str(),
+    );
+}
+
+pub fn emit_namespace_progress_event(
+    sim_event: SimEventKind,
+    ctx: &ReplicaLogContext,
+    op: u64,
+    pipeline_depth: usize,
+) {
+    tracing::event!(
+        target: "iggy.sim",
+        tracing::Level::DEBUG,
+        sim_event = sim_event.as_str(),
+        plane = ctx.plane.as_str(),
+        cluster_id = ctx.cluster_id,
+        replica_id = ctx.replica_id,
+        namespace_raw = ctx.namespace.raw,
+        stream_id = namespace_component(ctx.namespace.stream_id),
+        topic_id = namespace_component(ctx.namespace.topic_id),
+        partition_id = namespace_component(ctx.namespace.partition_id),
+        view = ctx.view,
+        log_view = ctx.log_view,
+        commit = ctx.commit,
+        status = status_as_str(ctx.status),
+        role = ctx.role.as_str(),
+        op,
+        pipeline_depth,
+    );
+}
+
+impl StructuredSimEvent for RequestLogEvent {
+    fn emit(&self, sim_event: SimEventKind) {
+        let ctx = self.replica;
+        tracing::event!(
+            target: "iggy.sim",
+            tracing::Level::DEBUG,
+            sim_event = sim_event.as_str(),
+            plane = ctx.plane.as_str(),
+            cluster_id = ctx.cluster_id,
+            replica_id = ctx.replica_id,
+            namespace_raw = ctx.namespace.raw,
+            stream_id = namespace_component(ctx.namespace.stream_id),
+            topic_id = namespace_component(ctx.namespace.topic_id),
+            partition_id = namespace_component(ctx.namespace.partition_id),
+            view = ctx.view,
+            log_view = ctx.log_view,
+            commit = ctx.commit,
+            status = status_as_str(ctx.status),
+            role = ctx.role.as_str(),
+            client_id = self.client_id,
+            request_id = self.request_id,
+            operation = operation_as_str(self.operation),
+        );
+    }
+}
+
+impl StructuredSimEvent for PrepareLogEvent {
+    fn emit(&self, sim_event: SimEventKind) {
+        let ctx = self.replica;
+        tracing::event!(
+            target: "iggy.sim",
+            tracing::Level::DEBUG,
+            sim_event = sim_event.as_str(),
+            plane = ctx.plane.as_str(),
+            cluster_id = ctx.cluster_id,
+            replica_id = ctx.replica_id,
+            namespace_raw = ctx.namespace.raw,
+            stream_id = namespace_component(ctx.namespace.stream_id),
+            topic_id = namespace_component(ctx.namespace.topic_id),
+            partition_id = namespace_component(ctx.namespace.partition_id),
+            view = ctx.view,
+            log_view = ctx.log_view,
+            commit = ctx.commit,
+            status = status_as_str(ctx.status),
+            role = ctx.role.as_str(),
+            op = self.op,
+            parent_checksum = self.parent_checksum,
+            prepare_checksum = self.prepare_checksum,
+            client_id = self.client_id,
+            request_id = self.request_id,
+            operation = operation_as_str(self.operation),
+            pipeline_depth = self.pipeline_depth,
+        );
+    }
+}
+
+impl StructuredSimEvent for AckLogEvent {
+    fn emit(&self, sim_event: SimEventKind) {
+        let ctx = self.replica;
+        tracing::event!(
+            target: "iggy.sim",
+            tracing::Level::DEBUG,
+            sim_event = sim_event.as_str(),
+            plane = ctx.plane.as_str(),
+            cluster_id = ctx.cluster_id,
+            replica_id = ctx.replica_id,
+            namespace_raw = ctx.namespace.raw,
+            stream_id = namespace_component(ctx.namespace.stream_id),
+            topic_id = namespace_component(ctx.namespace.topic_id),
+            partition_id = namespace_component(ctx.namespace.partition_id),
+            view = ctx.view,
+            log_view = ctx.log_view,
+            commit = ctx.commit,
+            status = status_as_str(ctx.status),
+            role = ctx.role.as_str(),
+            op = self.op,
+            prepare_checksum = self.prepare_checksum,
+            ack_from_replica = self.ack_from_replica,
+            ack_count = self.ack_count,
+            quorum = self.quorum,
+            quorum_reached = self.quorum_reached,
+        );
+    }
+}
+
+impl StructuredSimEvent for CommitLogEvent {
+    fn emit(&self, sim_event: SimEventKind) {
+        let ctx = self.replica;
+        tracing::event!(
+            target: "iggy.sim",
+            tracing::Level::DEBUG,
+            sim_event = sim_event.as_str(),
+            plane = ctx.plane.as_str(),
+            cluster_id = ctx.cluster_id,
+            replica_id = ctx.replica_id,
+            namespace_raw = ctx.namespace.raw,
+            stream_id = namespace_component(ctx.namespace.stream_id),
+            topic_id = namespace_component(ctx.namespace.topic_id),
+            partition_id = namespace_component(ctx.namespace.partition_id),
+            view = ctx.view,
+            log_view = ctx.log_view,
+            commit = ctx.commit,
+            status = status_as_str(ctx.status),
+            role = ctx.role.as_str(),
+            op = self.op,
+            client_id = self.client_id,
+            request_id = self.request_id,
+            operation = operation_as_str(self.operation),
+            pipeline_depth = self.pipeline_depth,
+        );
+    }
+}
+
+impl StructuredSimEvent for ViewChangeLogEvent {
+    fn emit(&self, sim_event: SimEventKind) {
+        let ctx = self.replica;
+        tracing::event!(
+            target: "iggy.sim",
+            tracing::Level::DEBUG,
+            sim_event = sim_event.as_str(),
+            plane = ctx.plane.as_str(),
+            cluster_id = ctx.cluster_id,
+            replica_id = ctx.replica_id,
+            namespace_raw = ctx.namespace.raw,
+            stream_id = namespace_component(ctx.namespace.stream_id),
+            topic_id = namespace_component(ctx.namespace.topic_id),
+            partition_id = namespace_component(ctx.namespace.partition_id),
+            view = ctx.view,
+            log_view = ctx.log_view,
+            commit = ctx.commit,
+            status = status_as_str(ctx.status),
+            role = ctx.role.as_str(),
+            old_view = self.old_view,
+            new_view = self.new_view,
+            reason = self.reason.as_str(),
+        );
+    }
+}
+
+impl StructuredSimEvent for ControlActionLogEvent {
+    fn emit(&self, sim_event: SimEventKind) {
+        let ctx = self.replica;
+        tracing::event!(
+            target: "iggy.sim",
+            tracing::Level::DEBUG,
+            sim_event = sim_event.as_str(),
+            plane = ctx.plane.as_str(),
+            cluster_id = ctx.cluster_id,
+            replica_id = ctx.replica_id,
+            namespace_raw = ctx.namespace.raw,
+            stream_id = namespace_component(ctx.namespace.stream_id),
+            topic_id = namespace_component(ctx.namespace.topic_id),
+            partition_id = namespace_component(ctx.namespace.partition_id),
+            view = ctx.view,
+            log_view = ctx.log_view,
+            commit = ctx.commit,
+            status = status_as_str(ctx.status),
+            role = ctx.role.as_str(),
+            action = self.action.as_str(),
+            target_replica = self.target_replica.unwrap_or_default(),
+            op = self.op.unwrap_or_default(),
+            action_commit = self.commit.unwrap_or_default(),
+        );
+    }
+}
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index be33c1190..443682f8f 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, 
VsrConsensus};
+use crate::{
+    Consensus, IgnoreReason, Pipeline, PipelineEntry, PlaneKind, 
PrepareOkOutcome, Sequencer,
+    Status, VsrConsensus,
+};
 use iggy_binary_protocol::{
     Command2, GenericHeader, Message, PrepareHeader, PrepareOkHeader, 
ReplyHeader,
 };
@@ -34,6 +37,7 @@ use std::ops::AsyncFnOnce;
 #[allow(clippy::future_not_send)]
 pub async fn pipeline_prepare_common<C, F>(
     consensus: &C,
+    plane: PlaneKind,
     prepare: C::Message<C::ReplicateHeader>,
     on_replicate: F,
 ) where
@@ -45,7 +49,7 @@ pub async fn pipeline_prepare_common<C, F>(
     assert!(!consensus.is_syncing(), "on_request: must not be syncing");
 
     consensus.verify_pipeline();
-    consensus.pipeline_message(&prepare);
+    consensus.pipeline_message(plane, &prepare);
     on_replicate(prepare).await;
 }
 
@@ -116,7 +120,7 @@ where
 pub fn replicate_preflight<B, P>(
     consensus: &VsrConsensus<B, P>,
     header: &PrepareHeader,
-) -> Result<u64, &'static str>
+) -> Result<u64, IgnoreReason>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Entry = PipelineEntry>,
@@ -124,17 +128,17 @@ where
     assert_eq!(header.command, Command2::Prepare);
 
     if consensus.is_syncing() {
-        return Err("sync");
+        return Err(IgnoreReason::Syncing);
     }
 
     let current_op = consensus.sequencer().current_sequence();
 
     if consensus.status() != Status::Normal {
-        return Err("not normal state");
+        return Err(IgnoreReason::NotNormal);
     }
 
     if header.view > consensus.view() {
-        return Err("newer view");
+        return Err(IgnoreReason::NewerView);
     }
 
     if consensus.is_follower() {
@@ -149,17 +153,17 @@ where
 /// # Errors
 /// Returns a static error string if the replica is not primary or not in
 /// normal status.
-pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), 
&'static str>
+pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), 
IgnoreReason>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Entry = PipelineEntry>,
 {
     if !consensus.is_primary() {
-        return Err("not primary");
+        return Err(IgnoreReason::NotPrimary);
     }
 
     if consensus.status() != Status::Normal {
-        return Err("not normal");
+        return Err(IgnoreReason::NotNormal);
     }
 
     Ok(())
@@ -170,12 +174,22 @@ where
 /// After recording the ack, walks forward from `current_commit + 1` advancing
 /// the commit number only while consecutive ops have achieved quorum. This
 /// prevents committing ops that have gaps in quorum acknowledgment.
-pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: 
&PrepareOkHeader) -> bool
+pub fn ack_quorum_reached<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    plane: PlaneKind,
+    ack: &PrepareOkHeader,
+) -> bool
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Entry = PipelineEntry>,
 {
-    if !consensus.handle_prepare_ok(ack) {
+    if !matches!(
+        consensus.handle_prepare_ok(plane, ack),
+        PrepareOkOutcome::Accepted {
+            quorum_reached: true,
+            ..
+        }
+    ) {
         return false;
     }
 
@@ -506,7 +520,7 @@ mod tests {
             namespace: 0,
             reserved: [0; 120],
         };
-        let _ = consensus.handle_start_view_change(&svc);
+        let _ = consensus.handle_start_view_change(PlaneKind::Metadata, &svc);
 
         // Simulate an in-flight loopback message queued between SVC and DVC 
quorum.
         let stale_msg = 
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
@@ -529,7 +543,7 @@ mod tests {
             log_view: 0,
             reserved: [0; 100],
         };
-        let actions = consensus.handle_do_view_change(&dvc);
+        let actions = consensus.handle_do_view_change(PlaneKind::Metadata, 
&dvc);
 
         // View change completed: should have SendStartView action.
         assert!(
@@ -655,9 +669,9 @@ mod tests {
         let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
         consensus.init();
 
-        consensus.pipeline_message(&prepare_message(1, 0, 10));
-        consensus.pipeline_message(&prepare_message(2, 10, 20));
-        consensus.pipeline_message(&prepare_message(3, 20, 30));
+        consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(1, 0, 
10));
+        consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(2, 
10, 20));
+        consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(3, 
20, 30));
 
         consensus.advance_commit_number(3);
 
@@ -672,9 +686,9 @@ mod tests {
         let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
         consensus.init();
 
-        consensus.pipeline_message(&prepare_message(5, 0, 50));
-        consensus.pipeline_message(&prepare_message(6, 50, 60));
-        consensus.pipeline_message(&prepare_message(7, 60, 70));
+        consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(5, 0, 
50));
+        consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(6, 
50, 60));
+        consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(7, 
60, 70));
 
         consensus.advance_commit_number(6);
         let drained = drain_committable_prefix(&consensus);
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index bd7fab4be..484135f19 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -17,8 +17,9 @@
 use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{
-    Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, Project, 
Sequencer, VsrConsensus,
-    ack_preflight, ack_quorum_reached, build_reply_message, 
drain_committable_prefix,
+    CommitLogEvent, Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, 
PlaneKind, Project,
+    ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, 
ack_preflight,
+    ack_quorum_reached, build_reply_message, drain_committable_prefix, 
emit_sim_event,
     fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view,
     pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
     send_prepare_ok as send_prepare_ok_common,
@@ -291,10 +292,20 @@ where
     async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
-        // TODO: Bunch of asserts.
-        debug!("handling metadata request");
+        emit_sim_event(
+            SimEventKind::ClientRequestReceived,
+            &RequestLogEvent {
+                replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Metadata),
+                client_id: message.header().client,
+                request_id: message.header().request,
+                operation: message.header().operation,
+            },
+        );
         let prepare = message.project(consensus);
-        pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
+        pipeline_prepare_common(consensus, PlaneKind::Metadata, prepare, 
|prepare| {
+            self.on_replicate(prepare)
+        })
+        .await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
@@ -307,8 +318,14 @@ where
             Ok(current_op) => current_op,
             Err(reason) => {
                 warn!(
-                    replica = consensus.replica(),
-                    "on_replicate: ignoring ({reason})"
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    view = consensus.view(),
+                    op = header.op,
+                    operation = ?header.operation,
+                    reason = reason.as_str(),
+                    "ignoring prepare during replicate preflight"
                 );
                 return;
             }
@@ -319,7 +336,16 @@ where
         let is_old_prepare = fence_old_prepare_by_commit(consensus, &header)
             || journal.handle().header(header.op as usize).is_some();
         let message = if is_old_prepare {
-            warn!("received old prepare, not replicating");
+            warn!(
+                target: "iggy.metadata.diag",
+                plane = "metadata",
+                replica_id = consensus.replica(),
+                view = consensus.view(),
+                op = header.op,
+                commit = consensus.commit(),
+                operation = ?header.operation,
+                "received old prepare, skipping replication"
+            );
             message
         } else {
             self.replicate(message).await
@@ -338,15 +364,22 @@ where
             {
                 Ok(true) => {
                     debug!(
-                        replica = consensus.replica(),
-                        "on_replicate: forced checkpoint at op={snap_op}"
+                        target: "iggy.metadata.diag",
+                        plane = "metadata",
+                        replica_id = consensus.replica(),
+                        checkpoint_op = snap_op,
+                        "forced checkpoint completed"
                     );
                 }
                 Ok(false) => {}
                 Err(e) => {
                     error!(
-                        replica = consensus.replica(),
-                        "on_replicate: forced checkpoint failed: {e}"
+                        target: "iggy.metadata.diag",
+                        plane = "metadata",
+                        replica_id = consensus.replica(),
+                        checkpoint_op = snap_op,
+                        error = %e,
+                        "forced checkpoint failed"
                     );
                     return;
                 }
@@ -366,8 +399,13 @@ where
         // Append to journal.
         if let Err(e) = journal.handle().append(message).await {
             error!(
-                replica = consensus.replica(),
-                "on_replicate: journal append failed: {e}"
+                target: "iggy.metadata.diag",
+                plane = "metadata",
+                replica_id = consensus.replica(),
+                op = header.op,
+                operation = ?header.operation,
+                error = %e,
+                "journal append failed"
             );
             return;
         }
@@ -386,7 +424,15 @@ where
         let header = message.header();
 
         if let Err(reason) = ack_preflight(consensus) {
-            warn!("on_ack: ignoring ({reason})");
+            warn!(
+                target: "iggy.metadata.diag",
+                plane = "metadata",
+                replica_id = consensus.replica(),
+                view = consensus.view(),
+                op = header.op,
+                reason = reason.as_str(),
+                "ignoring ack during preflight"
+            );
             return;
         }
 
@@ -396,23 +442,39 @@ where
                 .entry_by_op_and_checksum(header.op, header.prepare_checksum)
                 .is_none()
             {
-                debug!("on_ack: prepare not in pipeline op={}", header.op);
+                debug!(
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    op = header.op,
+                    prepare_checksum = header.prepare_checksum,
+                    "ack target prepare not in pipeline"
+                );
                 return;
             }
         }
 
-        if ack_quorum_reached(consensus, header) {
+        if ack_quorum_reached(consensus, PlaneKind::Metadata, header) {
             let journal = self.journal.as_ref().unwrap();
 
-            debug!("on_ack: quorum received for op={}", header.op);
+            debug!(
+                target: "iggy.metadata.diag",
+                plane = "metadata",
+                replica_id = consensus.replica(),
+                op = header.op,
+                "ack quorum received"
+            );
 
             let drained = drain_committable_prefix(consensus);
             if let (Some(first), Some(last)) = (drained.first(), 
drained.last()) {
                 debug!(
-                    "on_ack: draining committed prefix ops=[{}..={}] count={}",
-                    first.header.op,
-                    last.header.op,
-                    drained.len()
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    first_op = first.header.op,
+                    last_op = last.header.op,
+                    drained_count = drained.len(),
+                    "draining committed metadata prefix"
                 );
             }
 
@@ -434,20 +496,31 @@ where
 
                 let response = 
self.mux_stm.update(prepare).unwrap_or_else(|err| {
                     warn!(
-                        "on_ack: state machine error for op={}: {err}",
-                        prepare_header.op
+                        target: "iggy.metadata.diag",
+                        plane = "metadata",
+                        replica_id = consensus.replica(),
+                        op = prepare_header.op,
+                        operation = ?prepare_header.operation,
+                        error = %err,
+                        "state machine update failed for committed metadata 
entry"
                     );
                     bytes::Bytes::new()
                 });
-                debug!("on_ack: state applied for op={}", prepare_header.op);
+                let pipeline_depth = consensus.pipeline().borrow().len();
+                let event = CommitLogEvent {
+                    replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Metadata),
+                    op: prepare_header.op,
+                    client_id: prepare_header.client,
+                    request_id: prepare_header.request,
+                    operation: prepare_header.operation,
+                    pipeline_depth,
+                };
+                emit_sim_event(SimEventKind::OperationCommitted, &event);
 
                 let generic_reply =
                     build_reply_message(consensus, &prepare_header, 
response).into_generic();
                 let reply_buffers = freeze_client_reply(generic_reply);
-                debug!(
-                    "on_ack: sending reply to client={} for op={}",
-                    prepare_header.client, prepare_header.op
-                );
+                emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
 
                 // TODO: Propagate send error instead of panicking; requires 
bus error design.
                 consensus
diff --git a/core/partitions/src/iggy_index_writer.rs 
b/core/partitions/src/iggy_index_writer.rs
index 31e016059..3aa05ac5b 100644
--- a/core/partitions/src/iggy_index_writer.rs
+++ b/core/partitions/src/iggy_index_writer.rs
@@ -58,8 +58,10 @@ impl IggyIndexWriter {
 
         let size = index_size_bytes.load(Ordering::Relaxed);
         trace!(
-            "Opened sparse index file for writing: {file_path}, size: {}",
-            size
+            target: "iggy.partitions.storage",
+            file = file_path,
+            size,
+            "opened sparse index file for writing"
         );
 
         Ok(Self {
@@ -91,7 +93,13 @@ impl IggyIndexWriter {
             self.fsync().await?;
         }
 
-        trace!("Saved {len} sparse index bytes to file: {}", self.file_path);
+        trace!(
+            target: "iggy.partitions.storage",
+            file = self.file_path.as_str(),
+            bytes = len,
+            position,
+            "saved sparse index bytes to file"
+        );
         Ok(())
     }
 
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 380dbb30b..c49911b3d 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -194,6 +194,7 @@ impl Partition for IggyPartition {
             if let Err(err) = self.store_consumer_offset(consumer, 
last_offset) {
                 // warning for now.
                 warn!(
+                    target: "iggy.partitions.diag",
                     consumer = ?consumer,
                     last_offset,
                     %err,
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 26a1c2d13..94614d715 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -27,10 +27,11 @@ use crate::segment::Segment;
 use crate::types::PartitionsConfig;
 use consensus::PlaneIdentity;
 use consensus::{
-    Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, Project, 
Sequencer,
-    VsrConsensus, ack_preflight, build_reply_message, 
fence_old_prepare_by_commit,
-    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
-    send_prepare_ok as send_prepare_ok_common,
+    CommitLogEvent, Consensus, NamespacedPipeline, Pipeline, PipelineEntry, 
Plane, PlaneKind,
+    Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, 
VsrConsensus,
+    ack_preflight, build_reply_message, emit_namespace_progress_event, 
emit_sim_event,
+    fence_old_prepare_by_commit, pipeline_prepare_common, replicate_preflight,
+    replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_binary_protocol::{
     Command2, ConsensusHeader, GenericHeader, Message, Operation, 
PrepareHeader, PrepareOkHeader,
@@ -397,12 +398,28 @@ where
             .consensus()
             .expect("on_request: consensus not initialized");
 
-        debug!(?namespace, "handling partition request");
+        emit_sim_event(
+            SimEventKind::ClientRequestReceived,
+            &RequestLogEvent {
+                replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                client_id: message.header().client,
+                request_id: message.header().request,
+                operation: message.header().operation,
+            },
+        );
         let message = if message.header().operation == Operation::SendMessages 
{
             match convert_request_message(namespace, message) {
                 Ok(message) => message,
                 Err(error) => {
-                    warn!(?namespace, %error, "on_request: failed to convert 
SendMessages");
+                    warn!(
+                        target: "iggy.partitions.diag",
+                        plane = "partitions",
+                        replica_id = consensus.replica(),
+                        namespace_raw = namespace.inner(),
+                        operation = ?Operation::SendMessages,
+                        error = %error,
+                        "failed to convert send_messages request"
+                    );
                     return;
                 }
             }
@@ -410,7 +427,10 @@ where
             message
         };
         let prepare = message.project(consensus);
-        pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
+        pipeline_prepare_common(consensus, PlaneKind::Partitions, prepare, 
|prepare| {
+            self.on_replicate(prepare)
+        })
+        .await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
@@ -424,8 +444,15 @@ where
             Ok(current_op) => current_op,
             Err(reason) => {
                 warn!(
-                    replica = consensus.replica(),
-                    "on_replicate: ignoring ({reason})"
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    view = consensus.view(),
+                    op = header.op,
+                    namespace_raw = header.namespace,
+                    operation = ?header.operation,
+                    reason = reason.as_str(),
+                    "ignoring prepare during replicate preflight"
                 );
                 return;
             }
@@ -433,7 +460,17 @@ where
 
         let is_old_prepare = fence_old_prepare_by_commit(consensus, &header);
         if is_old_prepare {
-            warn!("received old prepare, not replicating");
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                view = consensus.view(),
+                op = header.op,
+                commit = consensus.commit(),
+                namespace_raw = header.namespace,
+                operation = ?header.operation,
+                "received old prepare, skipping replication"
+            );
             return;
         }
 
@@ -444,11 +481,14 @@ where
 
         if let Err(error) = self.apply_replicated_operation(&namespace, 
message).await {
             warn!(
-                replica = consensus.replica(),
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
                 op = header.op,
-                ?namespace,
+                namespace_raw = namespace.inner(),
+                operation = ?header.operation,
                 %error,
-                "on_replicate: failed to apply replicated operation"
+                "failed to apply replicated partition operation"
             );
             return;
         }
@@ -457,11 +497,14 @@ where
             && let Err(error) = self.commit_messages(&namespace, true).await
         {
             warn!(
-                replica = consensus.replica(),
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
                 op = header.op,
-                ?namespace,
+                namespace_raw = namespace.inner(),
+                operation = ?header.operation,
                 %error,
-                "on_replicate: failed to durably persist replicated operation"
+                "failed to durably persist replicated partition operation"
             );
             return;
         }
@@ -470,6 +513,12 @@ where
         debug_assert_eq!(header.op, current_op + 1);
         consensus.sequencer().set_sequence(header.op);
         consensus.set_last_prepare_checksum(header.checksum);
+        emit_namespace_progress_event(
+            SimEventKind::NamespaceProgressUpdated,
+            &ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+            header.op,
+            consensus.pipeline().borrow().len(),
+        );
 
         self.send_prepare_ok(&header).await;
     }
@@ -479,7 +528,15 @@ where
         let consensus = self.consensus().expect("on_ack: consensus not 
initialized");
 
         if let Err(reason) = ack_preflight(consensus) {
-            warn!("on_ack: ignoring ({reason})");
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                view = consensus.view(),
+                op = header.op,
+                reason = reason.as_str(),
+                "ignoring ack during preflight"
+            );
             return;
         }
 
@@ -489,12 +546,19 @@ where
                 .entry_by_op_and_checksum(header.op, header.prepare_checksum)
                 .is_none()
             {
-                debug!("on_ack: prepare not in pipeline op={}", header.op);
+                debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    op = header.op,
+                    prepare_checksum = header.prepare_checksum,
+                    "ack target prepare not in pipeline"
+                );
                 return;
             }
         }
 
-        consensus.handle_prepare_ok(header);
+        consensus.handle_prepare_ok(PlaneKind::Partitions, header);
 
         // SAFETY(IGGY-66): Per-namespace drain independent of global commit.
         //
@@ -528,6 +592,12 @@ where
         let new_commit = pipeline.global_commit_frontier(consensus.commit());
         drop(pipeline);
         consensus.advance_commit_number(new_commit);
+        emit_namespace_progress_event(
+            SimEventKind::NamespaceProgressUpdated,
+            &ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+            new_commit,
+            consensus.pipeline().borrow().len(),
+        );
     }
 }
 
@@ -578,10 +648,13 @@ where
                 self.append_send_messages_to_journal(namespace, message)
                     .await?;
                 debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
                     replica = consensus.replica(),
                     op = header.op,
-                    ?namespace,
-                    "on_replicate: send_messages appended to partition journal"
+                    namespace_raw = namespace.inner(),
+                    operation = ?header.operation,
+                    "replicated send_messages appended to partition journal"
                 );
                 Ok(())
             }
@@ -611,10 +684,14 @@ where
                     2 => PollingConsumer::ConsumerGroup(consumer_id, 0),
                     _ => {
                         warn!(
+                            target: "iggy.partitions.diag",
+                            plane = "partitions",
                             replica = consensus.replica(),
                             op = header.op,
+                            namespace_raw = namespace.inner(),
+                            operation = ?header.operation,
                             consumer_kind,
-                            "on_replicate: unknown consumer kind"
+                            "unknown consumer kind while applying replicated 
offset update"
                         );
                         return Err(IggyError::InvalidCommand);
                     }
@@ -626,21 +703,28 @@ where
                 let _ = partition.store_consumer_offset(consumer, offset);
 
                 debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
                     replica = consensus.replica(),
                     op = header.op,
+                    namespace_raw = namespace.inner(),
+                    operation = ?header.operation,
                     consumer_kind,
                     consumer_id,
                     offset,
-                    "on_replicate: consumer offset stored"
+                    "replicated consumer offset stored"
                 );
                 Ok(())
             }
             _ => {
                 warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
                     replica = consensus.replica(),
+                    namespace_raw = namespace.inner(),
                     op = header.op,
-                    "on_replicate: unexpected operation {:?}",
-                    header.operation
+                    operation = ?header.operation,
+                    "unexpected replicated partition operation"
                 );
                 Ok(())
             }
@@ -769,8 +853,10 @@ where
 
         let Some(index_bytes) = index_bytes else {
             warn!(
-                ?namespace,
-                "commit_messages: failed to build a sparse index entry from 
pending journal batches"
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                namespace_raw = namespace.inner(),
+                "failed to build sparse index entry from pending journal 
batches"
             );
             return Err(IggyError::InvalidCommand);
         };
@@ -824,10 +910,13 @@ where
     ) {
         if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
             debug!(
-                "on_ack: draining committed ops=[{}..={}] count={}",
-                first.header.op,
-                last.header.op,
-                drained.len()
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                first_op = first.header.op,
+                last_op = last.header.op,
+                drained_count = drained.len(),
+                "draining committed partition ops"
             );
         }
 
@@ -840,47 +929,40 @@ where
         } in drained
         {
             let entry_namespace = 
IggyNamespace::from_raw(prepare_header.namespace);
-
-            match prepare_header.operation {
-                Operation::SendMessages => {
-                    if committed_ns.insert(entry_namespace)
-                        && let Err(error) = 
self.commit_messages(&entry_namespace, false).await
-                    {
-                        failed_ns.insert(entry_namespace);
-                        warn!(
-                            ?entry_namespace,
-                            op = prepare_header.op,
-                            %error,
-                            "on_ack: failed to commit partition messages"
-                        );
-                    }
-                    if failed_ns.contains(&entry_namespace) {
-                        continue;
-                    }
-                    debug!("on_ack: messages committed for op={}", 
prepare_header.op);
-                }
-                Operation::StoreConsumerOffset => {
-                    // TODO: Commit consumer offset update.
-                    debug!(
-                        "on_ack: consumer offset committed for op={}",
-                        prepare_header.op
-                    );
-                }
-                _ => {
-                    warn!(
-                        "on_ack: unexpected operation {:?} for op={}",
-                        prepare_header.operation, prepare_header.op
-                    );
-                }
+            if !self
+                .commit_partition_entry(
+                    consensus,
+                    prepare_header,
+                    entry_namespace,
+                    &mut committed_ns,
+                    &mut failed_ns,
+                )
+                .await
+            {
+                continue;
             }
 
+            let pipeline_depth = consensus.pipeline().borrow().len();
+            let event = CommitLogEvent {
+                replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                op: prepare_header.op,
+                client_id: prepare_header.client,
+                request_id: prepare_header.request,
+                operation: prepare_header.operation,
+                pipeline_depth,
+            };
+            emit_sim_event(SimEventKind::OperationCommitted, &event);
+            emit_namespace_progress_event(
+                SimEventKind::NamespaceProgressUpdated,
+                &event.replica,
+                prepare_header.op,
+                pipeline_depth,
+            );
+
             let generic_reply =
                 build_reply_message(consensus, &prepare_header, 
bytes::Bytes::new()).into_generic();
             let reply_buffers = freeze_client_reply(generic_reply);
-            debug!(
-                "on_ack: sending reply to client={} for op={}",
-                prepare_header.client, prepare_header.op
-            );
+            emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
 
             if let Err(error) = consensus
                 .message_bus()
@@ -888,21 +970,81 @@ where
                 .await
             {
                 warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
                     client = prepare_header.client,
                     op = prepare_header.op,
+                    namespace_raw = entry_namespace.inner(),
                     %error,
-                    "on_ack: failed to send reply to client"
+                    "failed to send reply to client"
                 );
             }
         }
         if !failed_ns.is_empty() {
             warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
                 failed_namespaces = failed_ns.len(),
-                "on_ack: some namespaces failed local commit handling"
+                "some namespaces failed local commit handling"
             );
         }
     }
 
+    async fn commit_partition_entry(
+        &self,
+        consensus: &VsrConsensus<B, NamespacedPipeline>,
+        prepare_header: PrepareHeader,
+        entry_namespace: IggyNamespace,
+        committed_ns: &mut HashSet<IggyNamespace>,
+        failed_ns: &mut HashSet<IggyNamespace>,
+    ) -> bool {
+        match prepare_header.operation {
+            Operation::SendMessages => {
+                if committed_ns.insert(entry_namespace)
+                    && let Err(error) = self.commit_messages(&entry_namespace, 
false).await
+                {
+                    failed_ns.insert(entry_namespace);
+                    warn!(
+                        target: "iggy.partitions.diag",
+                        plane = "partitions",
+                        replica_id = consensus.replica(),
+                        namespace_raw = entry_namespace.inner(),
+                        op = prepare_header.op,
+                        operation = ?prepare_header.operation,
+                        %error,
+                        "failed to commit partition messages"
+                    );
+                }
+                !failed_ns.contains(&entry_namespace)
+            }
+            Operation::StoreConsumerOffset => {
+                // TODO: Commit consumer offset update.
+                debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    op = prepare_header.op,
+                    namespace_raw = entry_namespace.inner(),
+                    "consumer offset committed"
+                );
+                true
+            }
+            _ => {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    op = prepare_header.op,
+                    namespace_raw = entry_namespace.inner(),
+                    operation = ?prepare_header.operation,
+                    "unexpected committed partition operation"
+                );
+                true
+            }
+        }
+    }
+
     /// Persist frozen batches to disk and update segment bookkeeping.
     async fn persist_frozen_batches_to_disk(
         &self,
@@ -947,10 +1089,12 @@ where
             .await
             .map_err(|error| {
                 warn!(
-                    ?namespace,
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    namespace_raw = namespace.inner(),
                     batch_count,
                     %error,
-                    "persist: failed to save frozen batches"
+                    "failed to save frozen batches"
                 );
                 error
             })?;
@@ -960,15 +1104,24 @@ where
             .await
             .map_err(|error| {
                 warn!(
-                    ?namespace,
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    namespace_raw = namespace.inner(),
                     batch_count,
                     %error,
-                    "persist: failed to save sparse indexes"
+                    "failed to save sparse indexes"
                 );
                 error
             })?;
 
-        debug!(?namespace, batch_count, ?saved, "persisted batches to disk");
+        debug!(
+            target: "iggy.partitions.diag",
+            plane = "partitions",
+            namespace_raw = namespace.inner(),
+            batch_count,
+            saved_bytes = saved.as_bytes_u64(),
+            "persisted batches to disk"
+        );
 
         let partition = self
             .get_mut_by_ns(namespace)
@@ -1063,7 +1216,13 @@ where
         );
         partition.stats.increment_segments_count(1);
 
-        debug!(?namespace, start_offset, "rotated to new segment");
+        debug!(
+            target: "iggy.partitions.diag",
+            plane = "partitions",
+            namespace_raw = namespace.inner(),
+            start_offset,
+            "rotated to new segment"
+        );
         Ok(())
     }
 
diff --git a/core/partitions/src/messages_writer.rs 
b/core/partitions/src/messages_writer.rs
index 98d5b688e..330937c73 100644
--- a/core/partitions/src/messages_writer.rs
+++ b/core/partitions/src/messages_writer.rs
@@ -126,7 +126,9 @@ async fn write_frozen_chunked<const ALIGN: usize>(
             .into();
         result.map_err(|err| {
             error!(
+                target: "iggy.partitions.storage",
                 file = file_path,
+                write_position = position,
                 %err,
                 "failed to write frozen messages to segment file"
             );

Reply via email to