This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partition_remaster in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fbf459b936601778c8c09df9d0da0d3a4de86c72 Author: numinex <[email protected]> AuthorDate: Mon Mar 30 14:28:18 2026 +0200 obersvability --- Cargo.lock | 1 + core/consensus/Cargo.toml | 1 + core/consensus/src/impls.rs | 312 ++++++++++++--- core/consensus/src/lib.rs | 6 +- core/consensus/src/namespaced_pipeline.rs | 4 + core/consensus/src/observability.rs | 604 ++++++++++++++++++++++++++++++ core/consensus/src/plane_helpers.rs | 52 ++- core/metadata/src/impls/metadata.rs | 131 +++++-- core/partitions/src/iggy_index_writer.rs | 14 +- core/partitions/src/iggy_partition.rs | 1 + core/partitions/src/iggy_partitions.rs | 309 +++++++++++---- core/partitions/src/messages_writer.rs | 2 + 12 files changed, 1262 insertions(+), 175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dfcd39e28..1276be916 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2237,6 +2237,7 @@ dependencies = [ "message_bus", "rand 0.10.0", "rand_xoshiro", + "tracing", ] [[package]] diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml index 0428a64d6..dac219e3a 100644 --- a/core/consensus/Cargo.toml +++ b/core/consensus/Cargo.toml @@ -37,6 +37,7 @@ iobuf = { workspace = true } message_bus = { workspace = true } rand = { workspace = true } rand_xoshiro = { workspace = true } +tracing = { workspace = true } [dev-dependencies] futures = { workspace = true } diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 5bea3adc6..bf9a52412 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -17,8 +17,10 @@ use crate::vsr_timeout::{TimeoutKind, TimeoutManager}; use crate::{ - Consensus, DvcQuorumArray, Pipeline, Project, StoredDvc, dvc_count, dvc_max_commit, - dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner, + AckLogEvent, Consensus, ControlActionLogEvent, DvcQuorumArray, IgnoreReason, Pipeline, + PlaneKind, PrepareLogEvent, Project, ReplicaLogContext, SimEventKind, StoredDvc, + ViewChangeLogEvent, ViewChangeReason, dvc_count, dvc_max_commit, dvc_quorum_array_empty, + dvc_record, dvc_reset, dvc_select_winner, emit_replica_event, emit_sim_event, }; use bit_set::BitSet; use iggy_binary_protocol::{ @@ -384,6 +386,10 @@ impl Pipeline for LocalPipeline { Self::is_empty(self) } + fn len(&self) -> usize { + self.prepare_count() + } + fn verify(&self) { Self::verify(self); } @@ -426,6 +432,27 @@ pub enum VsrAction { }, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PrepareOkOutcome { + Accepted { + ack_count: usize, + quorum_reached: bool, + }, + Ignored { + reason: IgnoreReason, + }, +} + +impl PrepareOkOutcome { + #[must_use] + pub const fn quorum_reached(self) -> bool { + match self { + Self::Accepted { quorum_reached, .. } => quorum_reached, + Self::Ignored { .. } => false, + } + } +} + #[allow(unused)] #[derive(Debug)] pub struct VsrConsensus<B = IggyMessageBus, P = LocalPipeline> @@ -691,7 +718,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// /// Returns a list of actions to take based on fired timeouts. /// Empty vec means no actions needed. - pub fn tick(&self, current_op: u64, current_commit: u64) -> Vec<VsrAction> { + pub fn tick(&self, plane: PlaneKind, current_op: u64, current_commit: u64) -> Vec<VsrAction> { let mut actions = Vec::new(); let mut timeouts = self.timeouts.borrow_mut(); @@ -701,25 +728,29 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // Phase 2: Handle fired timeouts if timeouts.fired(TimeoutKind::NormalHeartbeat) { drop(timeouts); - actions.extend(self.handle_normal_heartbeat_timeout()); + actions.extend(self.handle_normal_heartbeat_timeout(plane)); timeouts = self.timeouts.borrow_mut(); } if timeouts.fired(TimeoutKind::StartViewChangeMessage) { drop(timeouts); - actions.extend(self.handle_start_view_change_message_timeout()); + actions.extend(self.handle_start_view_change_message_timeout(plane)); timeouts = self.timeouts.borrow_mut(); } if timeouts.fired(TimeoutKind::DoViewChangeMessage) { drop(timeouts); - actions.extend(self.handle_do_view_change_message_timeout(current_op, current_commit)); + actions.extend(self.handle_do_view_change_message_timeout( + plane, + current_op, + current_commit, + )); timeouts = self.timeouts.borrow_mut(); } if timeouts.fired(TimeoutKind::ViewChangeStatus) { drop(timeouts); - actions.extend(self.handle_view_change_status_timeout()); + actions.extend(self.handle_view_change_status_timeout(plane)); // timeouts = self.timeouts.borrow_mut(); // Not needed if last } @@ -728,7 +759,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// Called when `normal_heartbeat` timeout fires. /// Backup hasn't heard from primary - start view change. - fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> { + fn handle_normal_heartbeat_timeout(&self, plane: PlaneKind) -> Vec<VsrAction> { // Only backups trigger view change on heartbeat timeout if self.is_primary() { return Vec::new(); @@ -740,7 +771,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } // Advance to new view and transition to view change - let new_view = self.view.get() + 1; + let old_view = self.view.get(); + let new_view = old_view + 1; self.view.set(new_view); self.status.set(Status::ViewChange); @@ -758,14 +790,32 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { timeouts.start(TimeoutKind::ViewChangeStatus); } - vec![VsrAction::SendStartViewChange { + emit_sim_event( + SimEventKind::ViewChangeStarted, + &ViewChangeLogEvent { + replica: ReplicaLogContext::from_consensus(self, plane), + old_view, + new_view, + reason: ViewChangeReason::NormalHeartbeatTimeout, + }, + ); + + let action = VsrAction::SendStartViewChange { view: new_view, namespace: self.namespace, - }] + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + vec![action] } /// Resend SVC message if we've started view change. - fn handle_start_view_change_message_timeout(&self) -> Vec<VsrAction> { + fn handle_start_view_change_message_timeout(&self, plane: PlaneKind) -> Vec<VsrAction> { if !self.sent_own_start_view_change.get() { return Vec::new(); } @@ -774,15 +824,24 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { .borrow_mut() .reset(TimeoutKind::StartViewChangeMessage); - vec![VsrAction::SendStartViewChange { + let action = VsrAction::SendStartViewChange { view: self.view.get(), namespace: self.namespace, - }] + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + vec![action] } /// Resend DVC message if we've sent one. fn handle_do_view_change_message_timeout( &self, + plane: PlaneKind, current_op: u64, current_commit: u64, ) -> Vec<VsrAction> { @@ -803,24 +862,33 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { .borrow_mut() .reset(TimeoutKind::DoViewChangeMessage); - vec![VsrAction::SendDoViewChange { + let action = VsrAction::SendDoViewChange { view: self.view.get(), target: self.primary_index(self.view.get()), log_view: self.log_view.get(), op: current_op, commit: current_commit, namespace: self.namespace, - }] + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + vec![action] } /// Escalate to next view if stuck in view change. - fn handle_view_change_status_timeout(&self) -> Vec<VsrAction> { + fn handle_view_change_status_timeout(&self, plane: PlaneKind) -> Vec<VsrAction> { if self.status.get() != Status::ViewChange { return Vec::new(); } // Escalate: try next view - let next_view = self.view.get() + 1; + let old_view = self.view.get(); + let next_view = old_view + 1; self.view.set(next_view); self.reset_view_change_state(); @@ -833,10 +901,28 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { .borrow_mut() .reset(TimeoutKind::ViewChangeStatus); - vec![VsrAction::SendStartViewChange { + emit_sim_event( + SimEventKind::ViewChangeStarted, + &ViewChangeLogEvent { + replica: ReplicaLogContext::from_consensus(self, plane), + old_view, + new_view: next_view, + reason: ViewChangeReason::ViewChangeStatusTimeout, + }, + ); + + let action = VsrAction::SendStartViewChange { view: next_view, namespace: self.namespace, - }] + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + vec![action] } /// Handle a received `StartViewChange` message. @@ -847,7 +933,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// /// # Panics /// If `header.namespace` does not match this replica's namespace. - pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> Vec<VsrAction> { + pub fn handle_start_view_change( + &self, + plane: PlaneKind, + header: &StartViewChangeHeader, + ) -> Vec<VsrAction> { assert_eq!( header.namespace, self.namespace, "SVC routed to wrong group" @@ -864,6 +954,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // If SVC is for a higher view, advance to that view if msg_view > self.view.get() { + let old_view = self.view.get(); self.view.set(msg_view); self.status.set(Status::ViewChange); self.reset_view_change_state(); @@ -880,11 +971,29 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { timeouts.start(TimeoutKind::ViewChangeStatus); } + emit_sim_event( + SimEventKind::ViewChangeStarted, + &ViewChangeLogEvent { + replica: ReplicaLogContext::from_consensus(self, plane), + old_view, + new_view: msg_view, + reason: ViewChangeReason::ReceivedStartViewChange, + }, + ); + // Send our own SVC - actions.push(VsrAction::SendStartViewChange { + let action = VsrAction::SendStartViewChange { view: msg_view, namespace: self.namespace, - }); + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + actions.push(action); } // Record the SVC from sender @@ -908,14 +1017,22 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { .borrow_mut() .start(TimeoutKind::DoViewChangeMessage); - actions.push(VsrAction::SendDoViewChange { + let action = VsrAction::SendDoViewChange { view: self.view.get(), target: primary_candidate, log_view: self.log_view.get(), op: current_op, commit: current_commit, namespace: self.namespace, - }); + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + actions.push(action); // If we are the primary candidate, record our own DVC if primary_candidate == self.replica { @@ -933,7 +1050,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // Check if we now have quorum if dvc_count(&self.do_view_change_from_all_replicas.borrow()) >= self.quorum() { self.do_view_change_quorum.set(true); - actions.extend(self.complete_view_change_as_primary()); + actions.extend(self.complete_view_change_as_primary(plane)); } } } @@ -949,7 +1066,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// /// # Panics /// If `header.namespace` does not match this replica's namespace. - pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> Vec<VsrAction> { + pub fn handle_do_view_change( + &self, + plane: PlaneKind, + header: &DoViewChangeHeader, + ) -> Vec<VsrAction> { assert_eq!( header.namespace, self.namespace, "DVC routed to wrong group" @@ -969,6 +1090,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // If DVC is for a higher view, advance to that view if msg_view > self.view.get() { + let old_view = self.view.get(); self.view.set(msg_view); self.status.set(Status::ViewChange); self.reset_view_change_state(); @@ -985,11 +1107,29 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { timeouts.start(TimeoutKind::ViewChangeStatus); } + emit_sim_event( + SimEventKind::ViewChangeStarted, + &ViewChangeLogEvent { + replica: ReplicaLogContext::from_consensus(self, plane), + old_view, + new_view: msg_view, + reason: ViewChangeReason::ReceivedDoViewChange, + }, + ); + // Send our own SVC - actions.push(VsrAction::SendStartViewChange { + let action = VsrAction::SendStartViewChange { view: msg_view, namespace: self.namespace, - }); + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + actions.push(action); } // Only the primary candidate processes DVCs for quorum @@ -1035,7 +1175,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { && dvc_count(&self.do_view_change_from_all_replicas.borrow()) >= self.quorum() { self.do_view_change_quorum.set(true); - actions.extend(self.complete_view_change_as_primary()); + actions.extend(self.complete_view_change_as_primary(plane)); } actions @@ -1050,7 +1190,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// /// # Panics /// If `header.namespace` does not match this replica's namespace. - pub fn handle_start_view(&self, header: &StartViewHeader) -> Vec<VsrAction> { + pub fn handle_start_view(&self, plane: PlaneKind, header: &StartViewHeader) -> Vec<VsrAction> { assert_eq!(header.namespace, self.namespace, "SV routed to wrong group"); let from_replica = header.replica; let msg_view = header.view; @@ -1097,19 +1237,32 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // Send PrepareOK for uncommitted ops (commit+1 to op) let mut actions = Vec::new(); for op_num in (msg_commit + 1)..=msg_op { - actions.push(VsrAction::SendPrepareOk { + let action = VsrAction::SendPrepareOk { view: msg_view, op: op_num, target: from_replica, namespace: self.namespace, - }); + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + actions.push(action); } + emit_replica_event( + SimEventKind::ReplicaStateChanged, + &ReplicaLogContext::from_consensus(self, plane), + ); + actions } /// Complete view change as the new primary after collecting DVC quorum. - fn complete_view_change_as_primary(&self) -> Vec<VsrAction> { + fn complete_view_change_as_primary(&self, plane: PlaneKind) -> Vec<VsrAction> { let dvc_array = self.do_view_change_from_all_replicas.borrow(); let Some(winner) = dvc_select_winner(&dvc_array) else { @@ -1143,23 +1296,39 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { timeouts.start(TimeoutKind::CommitMessage); } - vec![VsrAction::SendStartView { + let state = ReplicaLogContext::from_consensus(self, plane); + emit_replica_event(SimEventKind::PrimaryElected, &state); + emit_replica_event(SimEventKind::ReplicaStateChanged, &state); + + let action = VsrAction::SendStartView { view: self.view.get(), op: new_op, commit: max_commit, namespace: self.namespace, - }] + }; + emit_sim_event( + SimEventKind::ControlMessageScheduled, + &ControlActionLogEvent::from_vsr_action( + ReplicaLogContext::from_consensus(self, plane), + &action, + ), + ); + vec![action] } /// Handle a `PrepareOk` message from a replica. /// - /// Returns `true` if quorum was just reached for this op. + /// Returns rich ack-progress information for structured logging. /// Caller (`on_ack`) should validate `is_primary` and status before calling. /// /// # Panics /// - If `header.command` is not `Command2::PrepareOk`. /// - If `header.replica >= self.replica_count`. - pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool { + pub fn handle_prepare_ok( + &self, + plane: PlaneKind, + header: &PrepareOkHeader, + ) -> PrepareOkOutcome { assert_eq!(header.command, Command2::PrepareOk); assert!( header.replica < self.replica_count, @@ -1169,17 +1338,23 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // Ignore if from older view if header.view < self.view() { - return false; + return PrepareOkOutcome::Ignored { + reason: IgnoreReason::OlderView, + }; } // Ignore if from newer view if header.view > self.view() { - return false; + return PrepareOkOutcome::Ignored { + reason: IgnoreReason::NewerView, + }; } // Ignore if syncing if self.is_syncing() { - return false; + return PrepareOkOutcome::Ignored { + reason: IgnoreReason::Syncing, + }; } // Find the prepare in our pipeline @@ -1187,30 +1362,54 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { let Some(entry) = pipeline.entry_by_op_mut(header.op) else { // Not in pipeline - could be old/duplicate or already committed - return false; + return PrepareOkOutcome::Ignored { + reason: IgnoreReason::UnknownPrepare, + }; }; // Verify checksum matches if entry.header.checksum != header.prepare_checksum { - return false; + return PrepareOkOutcome::Ignored { + reason: IgnoreReason::ChecksumMismatch, + }; } // Check for duplicate ack if entry.has_ack(header.replica) { - return false; + return PrepareOkOutcome::Ignored { + reason: IgnoreReason::DuplicateAck, + }; } // Record the ack from this replica let ack_count = entry.add_ack(header.replica); let quorum = self.quorum(); + let quorum_reached = ack_count >= quorum && !entry.ok_quorum_received; // Check if we've reached quorum - if ack_count >= quorum && !entry.ok_quorum_received { + if quorum_reached { entry.ok_quorum_received = true; - return true; } - false + drop(pipeline); + + emit_sim_event( + SimEventKind::PrepareAcked, + &AckLogEvent { + replica: ReplicaLogContext::from_consensus(self, plane), + op: header.op, + prepare_checksum: header.prepare_checksum, + ack_from_replica: header.replica, + ack_count, + quorum, + quorum_reached, + }, + ); + + PrepareOkOutcome::Accepted { + ack_count, + quorum_reached, + } } /// Enqueue a self-addressed message for processing in the next loopback drain. @@ -1339,11 +1538,28 @@ where // (push_loopback / drain_loopback_into) rather than inline here, // so that WAL persistence can happen between pipeline insertion // and ack recording. - fn pipeline_message(&self, message: &Self::Message<Self::ReplicateHeader>) { + fn pipeline_message(&self, plane: PlaneKind, message: &Self::Message<Self::ReplicateHeader>) { assert!(self.is_primary(), "only primary can pipeline messages"); let mut pipeline = self.pipeline.borrow_mut(); pipeline.push(PipelineEntry::new(*message.header())); + let pipeline_depth = pipeline.len(); + drop(pipeline); + + let header = message.header(); + emit_sim_event( + SimEventKind::PrepareQueued, + &PrepareLogEvent { + replica: ReplicaLogContext::from_consensus(self, plane), + op: header.op, + parent_checksum: header.parent, + prepare_checksum: header.checksum, + client_id: header.client, + request_id: header.request, + operation: header.operation, + pipeline_depth, + }, + ); } fn verify_pipeline(&self) { diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 55114ab43..200ee87ad 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -44,6 +44,8 @@ pub trait Pipeline { fn is_empty(&self) -> bool; + fn len(&self) -> usize; + fn verify(&self); } @@ -63,7 +65,7 @@ pub trait Consensus: Sized { type Sequencer: Sequencer; type Pipeline: Pipeline; - fn pipeline_message(&self, message: &Self::Message<Self::ReplicateHeader>); + fn pipeline_message(&self, plane: PlaneKind, message: &Self::Message<Self::ReplicateHeader>); fn verify_pipeline(&self); fn is_follower(&self) -> bool; @@ -109,6 +111,8 @@ mod namespaced_pipeline; pub use namespaced_pipeline::*; mod plane_helpers; pub use plane_helpers::*; +mod observability; +pub use observability::*; mod view_change_quorum; pub use view_change_quorum::*; diff --git a/core/consensus/src/namespaced_pipeline.rs b/core/consensus/src/namespaced_pipeline.rs index 153282680..71e18fb71 100644 --- a/core/consensus/src/namespaced_pipeline.rs +++ b/core/consensus/src/namespaced_pipeline.rs @@ -264,6 +264,10 @@ impl Pipeline for NamespacedPipeline { self.total_count == 0 } + fn len(&self) -> usize { + self.total_count + } + fn verify(&self) { assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX); diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs new file mode 100644 index 000000000..865935faf --- /dev/null +++ b/core/consensus/src/observability.rs @@ -0,0 +1,604 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{Pipeline, PipelineEntry, Status, VsrAction, VsrConsensus}; +use iggy_binary_protocol::Operation; +use iggy_common::sharding::IggyNamespace; +use message_bus::MessageBus; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PlaneKind { + Metadata, + Partitions, +} + +impl PlaneKind { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Metadata => "metadata", + Self::Partitions => "partitions", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReplicaRole { + Primary, + Backup, +} + +impl ReplicaRole { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Primary => "primary", + Self::Backup => "backup", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ViewChangeReason { + NormalHeartbeatTimeout, + ViewChangeStatusTimeout, + ReceivedStartViewChange, + ReceivedDoViewChange, +} + +impl ViewChangeReason { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::NormalHeartbeatTimeout => "normal_heartbeat_timeout", + Self::ViewChangeStatusTimeout => "view_change_status_timeout", + Self::ReceivedStartViewChange => "received_start_view_change", + Self::ReceivedDoViewChange => "received_do_view_change", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IgnoreReason { + NotPrimary, + NotNormal, + Syncing, + NewerView, + OlderView, + OldPrepare, + UnknownPrepare, + DuplicateAck, + ChecksumMismatch, + InvalidOperation, + ApplyFailed, + PersistFailed, +} + +impl IgnoreReason { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::NotPrimary => "not_primary", + Self::NotNormal => "not_normal", + Self::Syncing => "syncing", + Self::NewerView => "newer_view", + Self::OlderView => "older_view", + Self::OldPrepare => "old_prepare", + Self::UnknownPrepare => "unknown_prepare", + Self::DuplicateAck => "duplicate_ack", + Self::ChecksumMismatch => "checksum_mismatch", + Self::InvalidOperation => "invalid_operation", + Self::ApplyFailed => "apply_failed", + Self::PersistFailed => "persist_failed", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ControlActionKind { + SendStartViewChange, + SendDoViewChange, + SendStartView, + SendPrepareOk, +} + +impl ControlActionKind { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::SendStartViewChange => "send_start_view_change", + Self::SendDoViewChange => "send_do_view_change", + Self::SendStartView => "send_start_view", + Self::SendPrepareOk => "send_prepare_ok", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SimEventKind { + ClientRequestReceived, + PrepareQueued, + PrepareAcked, + OperationCommitted, + ClientReplyEmitted, + ViewChangeStarted, + PrimaryElected, + ReplicaStateChanged, + NamespaceProgressUpdated, + ControlMessageScheduled, +} + +impl SimEventKind { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::ClientRequestReceived => "ClientRequestReceived", + Self::PrepareQueued => "PrepareQueued", + Self::PrepareAcked => "PrepareAcked", + Self::OperationCommitted => "OperationCommitted", + Self::ClientReplyEmitted => "ClientReplyEmitted", + Self::ViewChangeStarted => "ViewChangeStarted", + Self::PrimaryElected => "PrimaryElected", + Self::ReplicaStateChanged => "ReplicaStateChanged", + Self::NamespaceProgressUpdated => "NamespaceProgressUpdated", + Self::ControlMessageScheduled => "ControlMessageScheduled", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct NamespaceLogContext { + pub raw: u64, + pub stream_id: Option<u32>, + pub topic_id: Option<u32>, + pub partition_id: Option<u32>, +} + +impl NamespaceLogContext { + #[must_use] + pub fn from_raw(plane: PlaneKind, raw: u64) -> Self { + if matches!(plane, PlaneKind::Metadata) { + return Self { + raw, + stream_id: None, + topic_id: None, + partition_id: None, + }; + } + + let namespace = IggyNamespace::from_raw(raw); + #[allow(clippy::cast_possible_truncation)] + let stream_id = namespace.stream_id() as u32; + #[allow(clippy::cast_possible_truncation)] + let topic_id = namespace.topic_id() as u32; + #[allow(clippy::cast_possible_truncation)] + let partition_id = namespace.partition_id() as u32; + + Self { + raw, + stream_id: Some(stream_id), + topic_id: Some(topic_id), + partition_id: Some(partition_id), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ReplicaLogContext { + pub plane: PlaneKind, + pub cluster_id: u128, + pub replica_id: u8, + pub namespace: NamespaceLogContext, + pub view: u32, + pub log_view: u32, + pub commit: u64, + pub status: Status, + pub role: ReplicaRole, +} + +impl ReplicaLogContext { + #[must_use] + pub fn from_consensus<B, P>(consensus: &VsrConsensus<B, P>, plane: PlaneKind) -> Self + where + B: MessageBus, + P: Pipeline<Entry = PipelineEntry>, + { + let role = if consensus.is_primary() { + ReplicaRole::Primary + } else { + ReplicaRole::Backup + }; + + Self { + plane, + cluster_id: consensus.cluster(), + replica_id: consensus.replica(), + namespace: NamespaceLogContext::from_raw(plane, consensus.namespace()), + view: consensus.view(), + log_view: consensus.log_view(), + commit: consensus.commit(), + status: consensus.status(), + role, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RequestLogEvent { + pub replica: ReplicaLogContext, + pub client_id: u128, + pub request_id: u64, + pub operation: Operation, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PrepareLogEvent { + pub replica: ReplicaLogContext, + pub op: u64, + pub parent_checksum: u128, + pub prepare_checksum: u128, + pub client_id: u128, + pub request_id: u64, + pub operation: Operation, + pub pipeline_depth: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AckLogEvent { + pub replica: ReplicaLogContext, + pub op: u64, + pub prepare_checksum: u128, + pub ack_from_replica: u8, + pub ack_count: usize, + pub quorum: usize, + pub quorum_reached: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CommitLogEvent { + pub replica: ReplicaLogContext, + pub op: u64, + pub client_id: u128, + pub request_id: u64, + pub operation: Operation, + pub pipeline_depth: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ViewChangeLogEvent { + pub replica: ReplicaLogContext, + pub old_view: u32, + pub new_view: u32, + pub reason: ViewChangeReason, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ControlActionLogEvent { + pub replica: ReplicaLogContext, + pub action: ControlActionKind, + pub target_replica: Option<u8>, + pub op: Option<u64>, + pub commit: Option<u64>, +} + +impl ControlActionLogEvent { + #[must_use] + pub const fn from_vsr_action(replica: ReplicaLogContext, action: &VsrAction) -> Self { + match *action { + VsrAction::SendStartViewChange { .. } => Self { + replica, + action: ControlActionKind::SendStartViewChange, + target_replica: None, + op: None, + commit: None, + }, + VsrAction::SendDoViewChange { + target, op, commit, .. + } => Self { + replica, + action: ControlActionKind::SendDoViewChange, + target_replica: Some(target), + op: Some(op), + commit: Some(commit), + }, + VsrAction::SendStartView { op, commit, .. } => Self { + replica, + action: ControlActionKind::SendStartView, + target_replica: None, + op: Some(op), + commit: Some(commit), + }, + VsrAction::SendPrepareOk { target, op, .. } => Self { + replica, + action: ControlActionKind::SendPrepareOk, + target_replica: Some(target), + op: Some(op), + commit: None, + }, + } + } +} + +pub trait StructuredSimEvent { + fn emit(&self, sim_event: SimEventKind); +} + +#[must_use] +pub const fn status_as_str(status: Status) -> &'static str { + match status { + Status::Normal => "normal", + Status::ViewChange => "view_change", + Status::Recovering => "recovering", + } +} + +#[must_use] +pub const fn operation_as_str(operation: Operation) -> &'static str { + match operation { + Operation::Reserved => "reserved", + Operation::CreateStream => "create_stream", + Operation::UpdateStream => "update_stream", + Operation::DeleteStream => "delete_stream", + Operation::PurgeStream => "purge_stream", + Operation::CreateTopic => "create_topic", + Operation::UpdateTopic => "update_topic", + Operation::DeleteTopic => "delete_topic", + Operation::PurgeTopic => "purge_topic", + Operation::CreatePartitions => "create_partitions", + Operation::DeletePartitions => "delete_partitions", + Operation::DeleteSegments => "delete_segments", + Operation::CreateConsumerGroup => "create_consumer_group", + Operation::DeleteConsumerGroup => "delete_consumer_group", + Operation::CreateUser => "create_user", + Operation::UpdateUser => "update_user", + Operation::DeleteUser => "delete_user", + Operation::ChangePassword => "change_password", + Operation::UpdatePermissions => "update_permissions", + Operation::CreatePersonalAccessToken => "create_personal_access_token", + Operation::DeletePersonalAccessToken => "delete_personal_access_token", + Operation::SendMessages => "send_messages", + Operation::StoreConsumerOffset => "store_consumer_offset", + } +} + +#[must_use] +pub const fn namespace_component(component: Option<u32>) -> u32 { + match component { + Some(value) => value, + None => 0, + } +} + +pub fn emit_sim_event<T>(sim_event: SimEventKind, event: &T) +where + T: StructuredSimEvent, +{ + event.emit(sim_event); +} + +pub fn emit_replica_event(sim_event: SimEventKind, ctx: &ReplicaLogContext) { + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + ); +} + +pub fn emit_namespace_progress_event( + sim_event: SimEventKind, + ctx: &ReplicaLogContext, + op: u64, + pipeline_depth: usize, +) { + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + op, + pipeline_depth, + ); +} + +impl StructuredSimEvent for RequestLogEvent { + fn emit(&self, sim_event: SimEventKind) { + let ctx = self.replica; + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + client_id = self.client_id, + request_id = self.request_id, + operation = operation_as_str(self.operation), + ); + } +} + +impl StructuredSimEvent for PrepareLogEvent { + fn emit(&self, sim_event: SimEventKind) { + let ctx = self.replica; + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + op = self.op, + parent_checksum = self.parent_checksum, + prepare_checksum = self.prepare_checksum, + client_id = self.client_id, + request_id = self.request_id, + operation = operation_as_str(self.operation), + pipeline_depth = self.pipeline_depth, + ); + } +} + +impl StructuredSimEvent for AckLogEvent { + fn emit(&self, sim_event: SimEventKind) { + let ctx = self.replica; + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + op = self.op, + prepare_checksum = self.prepare_checksum, + ack_from_replica = self.ack_from_replica, + ack_count = self.ack_count, + quorum = self.quorum, + quorum_reached = self.quorum_reached, + ); + } +} + +impl StructuredSimEvent for CommitLogEvent { + fn emit(&self, sim_event: SimEventKind) { + let ctx = self.replica; + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + op = self.op, + client_id = self.client_id, + request_id = self.request_id, + operation = operation_as_str(self.operation), + pipeline_depth = self.pipeline_depth, + ); + } +} + +impl StructuredSimEvent for ViewChangeLogEvent { + fn emit(&self, sim_event: SimEventKind) { + let ctx = self.replica; + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + old_view = self.old_view, + new_view = self.new_view, + reason = self.reason.as_str(), + ); + } +} + +impl StructuredSimEvent for ControlActionLogEvent { + fn emit(&self, sim_event: SimEventKind) { + let ctx = self.replica; + tracing::event!( + target: "iggy.sim", + tracing::Level::DEBUG, + sim_event = sim_event.as_str(), + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + action = self.action.as_str(), + target_replica = self.target_replica.unwrap_or_default(), + op = self.op.unwrap_or_default(), + action_commit = self.commit.unwrap_or_default(), + ); + } +} diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index be33c1190..443682f8f 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, VsrConsensus}; +use crate::{ + Consensus, IgnoreReason, Pipeline, PipelineEntry, PlaneKind, PrepareOkOutcome, Sequencer, + Status, VsrConsensus, +}; use iggy_binary_protocol::{ Command2, GenericHeader, Message, PrepareHeader, PrepareOkHeader, ReplyHeader, }; @@ -34,6 +37,7 @@ use std::ops::AsyncFnOnce; #[allow(clippy::future_not_send)] pub async fn pipeline_prepare_common<C, F>( consensus: &C, + plane: PlaneKind, prepare: C::Message<C::ReplicateHeader>, on_replicate: F, ) where @@ -45,7 +49,7 @@ pub async fn pipeline_prepare_common<C, F>( assert!(!consensus.is_syncing(), "on_request: must not be syncing"); consensus.verify_pipeline(); - consensus.pipeline_message(&prepare); + consensus.pipeline_message(plane, &prepare); on_replicate(prepare).await; } @@ -116,7 +120,7 @@ where pub fn replicate_preflight<B, P>( consensus: &VsrConsensus<B, P>, header: &PrepareHeader, -) -> Result<u64, &'static str> +) -> Result<u64, IgnoreReason> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Entry = PipelineEntry>, @@ -124,17 +128,17 @@ where assert_eq!(header.command, Command2::Prepare); if consensus.is_syncing() { - return Err("sync"); + return Err(IgnoreReason::Syncing); } let current_op = consensus.sequencer().current_sequence(); if consensus.status() != Status::Normal { - return Err("not normal state"); + return Err(IgnoreReason::NotNormal); } if header.view > consensus.view() { - return Err("newer view"); + return Err(IgnoreReason::NewerView); } if consensus.is_follower() { @@ -149,17 +153,17 @@ where /// # Errors /// Returns a static error string if the replica is not primary or not in /// normal status. -pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), &'static str> +pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), IgnoreReason> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Entry = PipelineEntry>, { if !consensus.is_primary() { - return Err("not primary"); + return Err(IgnoreReason::NotPrimary); } if consensus.status() != Status::Normal { - return Err("not normal"); + return Err(IgnoreReason::NotNormal); } Ok(()) @@ -170,12 +174,22 @@ where /// After recording the ack, walks forward from `current_commit + 1` advancing /// the commit number only while consecutive ops have achieved quorum. This /// prevents committing ops that have gaps in quorum acknowledgment. -pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: &PrepareOkHeader) -> bool +pub fn ack_quorum_reached<B, P>( + consensus: &VsrConsensus<B, P>, + plane: PlaneKind, + ack: &PrepareOkHeader, +) -> bool where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Entry = PipelineEntry>, { - if !consensus.handle_prepare_ok(ack) { + if !matches!( + consensus.handle_prepare_ok(plane, ack), + PrepareOkOutcome::Accepted { + quorum_reached: true, + .. + } + ) { return false; } @@ -506,7 +520,7 @@ mod tests { namespace: 0, reserved: [0; 120], }; - let _ = consensus.handle_start_view_change(&svc); + let _ = consensus.handle_start_view_change(PlaneKind::Metadata, &svc); // Simulate an in-flight loopback message queued between SVC and DVC quorum. let stale_msg = Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>()); @@ -529,7 +543,7 @@ mod tests { log_view: 0, reserved: [0; 100], }; - let actions = consensus.handle_do_view_change(&dvc); + let actions = consensus.handle_do_view_change(PlaneKind::Metadata, &dvc); // View change completed: should have SendStartView action. assert!( @@ -655,9 +669,9 @@ mod tests { let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, LocalPipeline::new()); consensus.init(); - consensus.pipeline_message(&prepare_message(1, 0, 10)); - consensus.pipeline_message(&prepare_message(2, 10, 20)); - consensus.pipeline_message(&prepare_message(3, 20, 30)); + consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(1, 0, 10)); + consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(2, 10, 20)); + consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(3, 20, 30)); consensus.advance_commit_number(3); @@ -672,9 +686,9 @@ mod tests { let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, LocalPipeline::new()); consensus.init(); - consensus.pipeline_message(&prepare_message(5, 0, 50)); - consensus.pipeline_message(&prepare_message(6, 50, 60)); - consensus.pipeline_message(&prepare_message(7, 60, 70)); + consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(5, 0, 50)); + consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(6, 50, 60)); + consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(7, 60, 70)); consensus.advance_commit_number(6); let drained = drain_committable_prefix(&consensus); diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index bd7fab4be..484135f19 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -17,8 +17,9 @@ use crate::stm::StateMachine; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; use consensus::{ - Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, Project, Sequencer, VsrConsensus, - ack_preflight, ack_quorum_reached, build_reply_message, drain_committable_prefix, + CommitLogEvent, Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, PlaneKind, Project, + ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, ack_preflight, + ack_quorum_reached, build_reply_message, drain_committable_prefix, emit_sim_event, fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, @@ -291,10 +292,20 @@ where async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); - // TODO: Bunch of asserts. - debug!("handling metadata request"); + emit_sim_event( + SimEventKind::ClientRequestReceived, + &RequestLogEvent { + replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Metadata), + client_id: message.header().client, + request_id: message.header().request, + operation: message.header().operation, + }, + ); let prepare = message.project(consensus); - pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; + pipeline_prepare_common(consensus, PlaneKind::Metadata, prepare, |prepare| { + self.on_replicate(prepare) + }) + .await; } async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { @@ -307,8 +318,14 @@ where Ok(current_op) => current_op, Err(reason) => { warn!( - replica = consensus.replica(), - "on_replicate: ignoring ({reason})" + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + view = consensus.view(), + op = header.op, + operation = ?header.operation, + reason = reason.as_str(), + "ignoring prepare during replicate preflight" ); return; } @@ -319,7 +336,16 @@ where let is_old_prepare = fence_old_prepare_by_commit(consensus, &header) || journal.handle().header(header.op as usize).is_some(); let message = if is_old_prepare { - warn!("received old prepare, not replicating"); + warn!( + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + view = consensus.view(), + op = header.op, + commit = consensus.commit(), + operation = ?header.operation, + "received old prepare, skipping replication" + ); message } else { self.replicate(message).await @@ -338,15 +364,22 @@ where { Ok(true) => { debug!( - replica = consensus.replica(), - "on_replicate: forced checkpoint at op={snap_op}" + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + checkpoint_op = snap_op, + "forced checkpoint completed" ); } Ok(false) => {} Err(e) => { error!( - replica = consensus.replica(), - "on_replicate: forced checkpoint failed: {e}" + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + checkpoint_op = snap_op, + error = %e, + "forced checkpoint failed" ); return; } @@ -366,8 +399,13 @@ where // Append to journal. if let Err(e) = journal.handle().append(message).await { error!( - replica = consensus.replica(), - "on_replicate: journal append failed: {e}" + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + op = header.op, + operation = ?header.operation, + error = %e, + "journal append failed" ); return; } @@ -386,7 +424,15 @@ where let header = message.header(); if let Err(reason) = ack_preflight(consensus) { - warn!("on_ack: ignoring ({reason})"); + warn!( + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + view = consensus.view(), + op = header.op, + reason = reason.as_str(), + "ignoring ack during preflight" + ); return; } @@ -396,23 +442,39 @@ where .entry_by_op_and_checksum(header.op, header.prepare_checksum) .is_none() { - debug!("on_ack: prepare not in pipeline op={}", header.op); + debug!( + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + op = header.op, + prepare_checksum = header.prepare_checksum, + "ack target prepare not in pipeline" + ); return; } } - if ack_quorum_reached(consensus, header) { + if ack_quorum_reached(consensus, PlaneKind::Metadata, header) { let journal = self.journal.as_ref().unwrap(); - debug!("on_ack: quorum received for op={}", header.op); + debug!( + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + op = header.op, + "ack quorum received" + ); let drained = drain_committable_prefix(consensus); if let (Some(first), Some(last)) = (drained.first(), drained.last()) { debug!( - "on_ack: draining committed prefix ops=[{}..={}] count={}", - first.header.op, - last.header.op, - drained.len() + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + first_op = first.header.op, + last_op = last.header.op, + drained_count = drained.len(), + "draining committed metadata prefix" ); } @@ -434,20 +496,31 @@ where let response = self.mux_stm.update(prepare).unwrap_or_else(|err| { warn!( - "on_ack: state machine error for op={}: {err}", - prepare_header.op + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + op = prepare_header.op, + operation = ?prepare_header.operation, + error = %err, + "state machine update failed for committed metadata entry" ); bytes::Bytes::new() }); - debug!("on_ack: state applied for op={}", prepare_header.op); + let pipeline_depth = consensus.pipeline().borrow().len(); + let event = CommitLogEvent { + replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Metadata), + op: prepare_header.op, + client_id: prepare_header.client, + request_id: prepare_header.request, + operation: prepare_header.operation, + pipeline_depth, + }; + emit_sim_event(SimEventKind::OperationCommitted, &event); let generic_reply = build_reply_message(consensus, &prepare_header, response).into_generic(); let reply_buffers = freeze_client_reply(generic_reply); - debug!( - "on_ack: sending reply to client={} for op={}", - prepare_header.client, prepare_header.op - ); + emit_sim_event(SimEventKind::ClientReplyEmitted, &event); // TODO: Propagate send error instead of panicking; requires bus error design. consensus diff --git a/core/partitions/src/iggy_index_writer.rs b/core/partitions/src/iggy_index_writer.rs index 31e016059..3aa05ac5b 100644 --- a/core/partitions/src/iggy_index_writer.rs +++ b/core/partitions/src/iggy_index_writer.rs @@ -58,8 +58,10 @@ impl IggyIndexWriter { let size = index_size_bytes.load(Ordering::Relaxed); trace!( - "Opened sparse index file for writing: {file_path}, size: {}", - size + target: "iggy.partitions.storage", + file = file_path, + size, + "opened sparse index file for writing" ); Ok(Self { @@ -91,7 +93,13 @@ impl IggyIndexWriter { self.fsync().await?; } - trace!("Saved {len} sparse index bytes to file: {}", self.file_path); + trace!( + target: "iggy.partitions.storage", + file = self.file_path.as_str(), + bytes = len, + position, + "saved sparse index bytes to file" + ); Ok(()) } diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 380dbb30b..c49911b3d 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -194,6 +194,7 @@ impl Partition for IggyPartition { if let Err(err) = self.store_consumer_offset(consumer, last_offset) { // warning for now. warn!( + target: "iggy.partitions.diag", consumer = ?consumer, last_offset, %err, diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 26a1c2d13..94614d715 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -27,10 +27,11 @@ use crate::segment::Segment; use crate::types::PartitionsConfig; use consensus::PlaneIdentity; use consensus::{ - Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, Project, Sequencer, - VsrConsensus, ack_preflight, build_reply_message, fence_old_prepare_by_commit, - pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, - send_prepare_ok as send_prepare_ok_common, + CommitLogEvent, Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, PlaneKind, + Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, + ack_preflight, build_reply_message, emit_namespace_progress_event, emit_sim_event, + fence_old_prepare_by_commit, pipeline_prepare_common, replicate_preflight, + replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; use iggy_binary_protocol::{ Command2, ConsensusHeader, GenericHeader, Message, Operation, PrepareHeader, PrepareOkHeader, @@ -397,12 +398,28 @@ where .consensus() .expect("on_request: consensus not initialized"); - debug!(?namespace, "handling partition request"); + emit_sim_event( + SimEventKind::ClientRequestReceived, + &RequestLogEvent { + replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + client_id: message.header().client, + request_id: message.header().request, + operation: message.header().operation, + }, + ); let message = if message.header().operation == Operation::SendMessages { match convert_request_message(namespace, message) { Ok(message) => message, Err(error) => { - warn!(?namespace, %error, "on_request: failed to convert SendMessages"); + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + namespace_raw = namespace.inner(), + operation = ?Operation::SendMessages, + error = %error, + "failed to convert send_messages request" + ); return; } } @@ -410,7 +427,10 @@ where message }; let prepare = message.project(consensus); - pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; + pipeline_prepare_common(consensus, PlaneKind::Partitions, prepare, |prepare| { + self.on_replicate(prepare) + }) + .await; } async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { @@ -424,8 +444,15 @@ where Ok(current_op) => current_op, Err(reason) => { warn!( - replica = consensus.replica(), - "on_replicate: ignoring ({reason})" + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + view = consensus.view(), + op = header.op, + namespace_raw = header.namespace, + operation = ?header.operation, + reason = reason.as_str(), + "ignoring prepare during replicate preflight" ); return; } @@ -433,7 +460,17 @@ where let is_old_prepare = fence_old_prepare_by_commit(consensus, &header); if is_old_prepare { - warn!("received old prepare, not replicating"); + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + view = consensus.view(), + op = header.op, + commit = consensus.commit(), + namespace_raw = header.namespace, + operation = ?header.operation, + "received old prepare, skipping replication" + ); return; } @@ -444,11 +481,14 @@ where if let Err(error) = self.apply_replicated_operation(&namespace, message).await { warn!( - replica = consensus.replica(), + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), op = header.op, - ?namespace, + namespace_raw = namespace.inner(), + operation = ?header.operation, %error, - "on_replicate: failed to apply replicated operation" + "failed to apply replicated partition operation" ); return; } @@ -457,11 +497,14 @@ where && let Err(error) = self.commit_messages(&namespace, true).await { warn!( - replica = consensus.replica(), + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), op = header.op, - ?namespace, + namespace_raw = namespace.inner(), + operation = ?header.operation, %error, - "on_replicate: failed to durably persist replicated operation" + "failed to durably persist replicated partition operation" ); return; } @@ -470,6 +513,12 @@ where debug_assert_eq!(header.op, current_op + 1); consensus.sequencer().set_sequence(header.op); consensus.set_last_prepare_checksum(header.checksum); + emit_namespace_progress_event( + SimEventKind::NamespaceProgressUpdated, + &ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + header.op, + consensus.pipeline().borrow().len(), + ); self.send_prepare_ok(&header).await; } @@ -479,7 +528,15 @@ where let consensus = self.consensus().expect("on_ack: consensus not initialized"); if let Err(reason) = ack_preflight(consensus) { - warn!("on_ack: ignoring ({reason})"); + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + view = consensus.view(), + op = header.op, + reason = reason.as_str(), + "ignoring ack during preflight" + ); return; } @@ -489,12 +546,19 @@ where .entry_by_op_and_checksum(header.op, header.prepare_checksum) .is_none() { - debug!("on_ack: prepare not in pipeline op={}", header.op); + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + op = header.op, + prepare_checksum = header.prepare_checksum, + "ack target prepare not in pipeline" + ); return; } } - consensus.handle_prepare_ok(header); + consensus.handle_prepare_ok(PlaneKind::Partitions, header); // SAFETY(IGGY-66): Per-namespace drain independent of global commit. // @@ -528,6 +592,12 @@ where let new_commit = pipeline.global_commit_frontier(consensus.commit()); drop(pipeline); consensus.advance_commit_number(new_commit); + emit_namespace_progress_event( + SimEventKind::NamespaceProgressUpdated, + &ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + new_commit, + consensus.pipeline().borrow().len(), + ); } } @@ -578,10 +648,13 @@ where self.append_send_messages_to_journal(namespace, message) .await?; debug!( + target: "iggy.partitions.diag", + plane = "partitions", replica = consensus.replica(), op = header.op, - ?namespace, - "on_replicate: send_messages appended to partition journal" + namespace_raw = namespace.inner(), + operation = ?header.operation, + "replicated send_messages appended to partition journal" ); Ok(()) } @@ -611,10 +684,14 @@ where 2 => PollingConsumer::ConsumerGroup(consumer_id, 0), _ => { warn!( + target: "iggy.partitions.diag", + plane = "partitions", replica = consensus.replica(), op = header.op, + namespace_raw = namespace.inner(), + operation = ?header.operation, consumer_kind, - "on_replicate: unknown consumer kind" + "unknown consumer kind while applying replicated offset update" ); return Err(IggyError::InvalidCommand); } @@ -626,21 +703,28 @@ where let _ = partition.store_consumer_offset(consumer, offset); debug!( + target: "iggy.partitions.diag", + plane = "partitions", replica = consensus.replica(), op = header.op, + namespace_raw = namespace.inner(), + operation = ?header.operation, consumer_kind, consumer_id, offset, - "on_replicate: consumer offset stored" + "replicated consumer offset stored" ); Ok(()) } _ => { warn!( + target: "iggy.partitions.diag", + plane = "partitions", replica = consensus.replica(), + namespace_raw = namespace.inner(), op = header.op, - "on_replicate: unexpected operation {:?}", - header.operation + operation = ?header.operation, + "unexpected replicated partition operation" ); Ok(()) } @@ -769,8 +853,10 @@ where let Some(index_bytes) = index_bytes else { warn!( - ?namespace, - "commit_messages: failed to build a sparse index entry from pending journal batches" + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + "failed to build sparse index entry from pending journal batches" ); return Err(IggyError::InvalidCommand); }; @@ -824,10 +910,13 @@ where ) { if let (Some(first), Some(last)) = (drained.first(), drained.last()) { debug!( - "on_ack: draining committed ops=[{}..={}] count={}", - first.header.op, - last.header.op, - drained.len() + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + first_op = first.header.op, + last_op = last.header.op, + drained_count = drained.len(), + "draining committed partition ops" ); } @@ -840,47 +929,40 @@ where } in drained { let entry_namespace = IggyNamespace::from_raw(prepare_header.namespace); - - match prepare_header.operation { - Operation::SendMessages => { - if committed_ns.insert(entry_namespace) - && let Err(error) = self.commit_messages(&entry_namespace, false).await - { - failed_ns.insert(entry_namespace); - warn!( - ?entry_namespace, - op = prepare_header.op, - %error, - "on_ack: failed to commit partition messages" - ); - } - if failed_ns.contains(&entry_namespace) { - continue; - } - debug!("on_ack: messages committed for op={}", prepare_header.op); - } - Operation::StoreConsumerOffset => { - // TODO: Commit consumer offset update. - debug!( - "on_ack: consumer offset committed for op={}", - prepare_header.op - ); - } - _ => { - warn!( - "on_ack: unexpected operation {:?} for op={}", - prepare_header.operation, prepare_header.op - ); - } + if !self + .commit_partition_entry( + consensus, + prepare_header, + entry_namespace, + &mut committed_ns, + &mut failed_ns, + ) + .await + { + continue; } + let pipeline_depth = consensus.pipeline().borrow().len(); + let event = CommitLogEvent { + replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + op: prepare_header.op, + client_id: prepare_header.client, + request_id: prepare_header.request, + operation: prepare_header.operation, + pipeline_depth, + }; + emit_sim_event(SimEventKind::OperationCommitted, &event); + emit_namespace_progress_event( + SimEventKind::NamespaceProgressUpdated, + &event.replica, + prepare_header.op, + pipeline_depth, + ); + let generic_reply = build_reply_message(consensus, &prepare_header, bytes::Bytes::new()).into_generic(); let reply_buffers = freeze_client_reply(generic_reply); - debug!( - "on_ack: sending reply to client={} for op={}", - prepare_header.client, prepare_header.op - ); + emit_sim_event(SimEventKind::ClientReplyEmitted, &event); if let Err(error) = consensus .message_bus() @@ -888,21 +970,81 @@ where .await { warn!( + target: "iggy.partitions.diag", + plane = "partitions", client = prepare_header.client, op = prepare_header.op, + namespace_raw = entry_namespace.inner(), %error, - "on_ack: failed to send reply to client" + "failed to send reply to client" ); } } if !failed_ns.is_empty() { warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), failed_namespaces = failed_ns.len(), - "on_ack: some namespaces failed local commit handling" + "some namespaces failed local commit handling" ); } } + async fn commit_partition_entry( + &self, + consensus: &VsrConsensus<B, NamespacedPipeline>, + prepare_header: PrepareHeader, + entry_namespace: IggyNamespace, + committed_ns: &mut HashSet<IggyNamespace>, + failed_ns: &mut HashSet<IggyNamespace>, + ) -> bool { + match prepare_header.operation { + Operation::SendMessages => { + if committed_ns.insert(entry_namespace) + && let Err(error) = self.commit_messages(&entry_namespace, false).await + { + failed_ns.insert(entry_namespace); + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + namespace_raw = entry_namespace.inner(), + op = prepare_header.op, + operation = ?prepare_header.operation, + %error, + "failed to commit partition messages" + ); + } + !failed_ns.contains(&entry_namespace) + } + Operation::StoreConsumerOffset => { + // TODO: Commit consumer offset update. + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + op = prepare_header.op, + namespace_raw = entry_namespace.inner(), + "consumer offset committed" + ); + true + } + _ => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + op = prepare_header.op, + namespace_raw = entry_namespace.inner(), + operation = ?prepare_header.operation, + "unexpected committed partition operation" + ); + true + } + } + } + /// Persist frozen batches to disk and update segment bookkeeping. async fn persist_frozen_batches_to_disk( &self, @@ -947,10 +1089,12 @@ where .await .map_err(|error| { warn!( - ?namespace, + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), batch_count, %error, - "persist: failed to save frozen batches" + "failed to save frozen batches" ); error })?; @@ -960,15 +1104,24 @@ where .await .map_err(|error| { warn!( - ?namespace, + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), batch_count, %error, - "persist: failed to save sparse indexes" + "failed to save sparse indexes" ); error })?; - debug!(?namespace, batch_count, ?saved, "persisted batches to disk"); + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + batch_count, + saved_bytes = saved.as_bytes_u64(), + "persisted batches to disk" + ); let partition = self .get_mut_by_ns(namespace) @@ -1063,7 +1216,13 @@ where ); partition.stats.increment_segments_count(1); - debug!(?namespace, start_offset, "rotated to new segment"); + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + start_offset, + "rotated to new segment" + ); Ok(()) } diff --git a/core/partitions/src/messages_writer.rs b/core/partitions/src/messages_writer.rs index 98d5b688e..330937c73 100644 --- a/core/partitions/src/messages_writer.rs +++ b/core/partitions/src/messages_writer.rs @@ -126,7 +126,9 @@ async fn write_frozen_chunked<const ALIGN: usize>( .into(); result.map_err(|err| { error!( + target: "iggy.partitions.storage", file = file_path, + write_position = position, %err, "failed to write frozen messages to segment file" );
