hubcio commented on code in PR #3092:
URL: https://github.com/apache/iggy/pull/3092#discussion_r3061262688
##########
core/consensus/src/impls.rs:
##########
@@ -1397,6 +1459,58 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
}
+ /// Handle a `Commit` (heartbeat) message from the primary.
+ ///
+ /// Advances `commit_max` and resets the backup's `NormalHeartbeat` timeout
+ /// so it doesn't start a spurious view change. Returns `true` if
+ /// `commit_max` advanced, signalling the caller to run `commit_journal`.
+ ///
+ /// Only accepts heartbeats with a strictly newer monotonic timestamp
+ /// to prevent old/replayed messages from suppressing view changes.
+ ///
+ /// # Panics
+ /// - If `header.namespace` does not match this replica's namespace.
+ /// - If `header.replica` is not the primary for `header.view` (protocol
violation).
+ pub fn handle_commit(&self, header: &iggy_binary_protocol::CommitHeader)
-> bool {
+ assert_eq!(
+ header.namespace, self.namespace,
+ "Commit routed to wrong group"
+ );
+
+ if self.is_primary() {
+ return false;
+ }
+
+ if self.status.get() != Status::Normal {
+ return false;
+ }
+
+ if header.view != self.view.get() {
+ return false;
+ }
+
+ assert_eq!(
Review Comment:
this `assert_eq!` is reachable from untrusted network input: `on_message` ->
`MessageBag::Commit` -> `on_commit` -> `handle_commit` -> panic. a single
malformed Commit with the wrong `replica` field kills the shard.
`handle_start_view` (line 1392) uses `if != { return }` for the identical
check - this should follow the same pattern.
fix: replace with `if header.replica != self.primary_index(header.view) {
return false; }`
##########
core/shard/src/lib.rs:
##########
@@ -398,29 +595,245 @@ where
let Some(consensus) = partitions.consensus() else {
return;
};
- let Some(journal) = partitions.journal() else {
+
+ let current_op = consensus.sequencer().current_sequence();
+ let current_commit = consensus.commit_min();
+ let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+
+ dispatch_vsr_actions(consensus, partitions.journal(), &actions).await;
+ }
+
+ /// Tick the metadata consensus and dispatch any resulting actions.
+ #[allow(clippy::future_not_send)]
+ pub async fn tick_metadata(&self)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ {
+ let metadata = self.plane.metadata();
+ let Some(ref consensus) = metadata.consensus else {
return;
};
let current_op = consensus.sequencer().current_sequence();
let current_commit = consensus.commit_min();
- let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+ let actions = consensus.tick(PlaneKind::Metadata, current_op,
current_commit);
+
+ dispatch_vsr_actions(consensus, metadata.journal.as_ref(),
&actions).await;
+ }
+}
+
+/// Dispatch a list of `VsrAction`s by constructing the appropriate
+/// protocol messages and sending them via the consensus message bus.
+#[allow(
+ clippy::future_not_send,
+ clippy::too_many_lines,
+ clippy::cast_possible_truncation
+)]
+async fn dispatch_vsr_actions<B, P, J>(
+ consensus: &VsrConsensus<B, P>,
+ journal: Option<&J>,
+ actions: &[VsrAction],
+) where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Entry = consensus::PipelineEntry>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+{
+ use std::mem::size_of;
+
+ let bus = consensus.message_bus();
+ let self_id = consensus.replica();
+ let cluster = consensus.cluster();
+ let replica_count = consensus.replica_count();
- for action in actions {
- if let consensus::VsrAction::RetransmitPrepares { targets } =
action {
+ let send = |target: u8, msg: Message<GenericHeader>| async move {
+ if let Err(e) = bus.send_to_replica(target, msg).await {
+ tracing::debug!(replica = self_id, target, "bus send failed: {e}");
+ }
+ };
+
+ for action in actions {
+ match action {
+ VsrAction::SendStartViewChange { view, namespace } => {
+ let msg =
Message::<StartViewChangeHeader>::new(size_of::<StartViewChangeHeader>())
+ .transmute_header(|_, h: &mut StartViewChangeHeader| {
+ h.command = Command2::StartViewChange;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.namespace = *namespace;
+ h.size = size_of::<StartViewChangeHeader>() as u32;
+ });
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::SendDoViewChange {
+ view,
+ target,
+ log_view,
+ op,
+ commit,
+ namespace,
+ } => {
+ let msg =
Message::<DoViewChangeHeader>::new(size_of::<DoViewChangeHeader>())
+ .transmute_header(|_, h: &mut DoViewChangeHeader| {
+ h.command = Command2::DoViewChange;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.log_view = *log_view;
+ h.op = *op;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.size = size_of::<DoViewChangeHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ VsrAction::SendStartView {
+ view,
+ op,
+ commit,
+ namespace,
+ } => {
+ let msg =
Message::<StartViewHeader>::new(size_of::<StartViewHeader>())
+ .transmute_header(|_, h: &mut StartViewHeader| {
+ h.command = Command2::StartView;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = *op;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.size = size_of::<StartViewHeader>() as u32;
+ });
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::SendPrepareOk {
+ view,
+ from_op,
+ to_op,
+ target,
+ namespace,
+ } => {
+ let Some(journal) = journal else {
+ continue;
+ };
+ for op in *from_op..=*to_op {
+ let Some(prepare_header) = journal.handle().header(op as
usize) else {
+ continue;
+ };
+ let prepare_header = *prepare_header;
+ let msg =
Message::<PrepareOkHeader>::new(size_of::<PrepareOkHeader>())
+ .transmute_header(|_, h: &mut PrepareOkHeader| {
+ h.command = Command2::PrepareOk;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = op;
+ h.commit = consensus.commit_max();
+ h.timestamp = prepare_header.timestamp;
+ h.parent = prepare_header.parent;
+ h.prepare_checksum = prepare_header.checksum;
+ h.request = prepare_header.request;
+ h.operation = prepare_header.operation;
+ h.namespace = *namespace;
+ h.size = size_of::<PrepareOkHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ }
+ VsrAction::RetransmitPrepares { targets } => {
+ let Some(journal) = journal else {
+ continue;
+ };
for (header, replicas) in targets {
- let Some(prepare) = journal.handle().entry(&header).await
else {
+ let Some(prepare) = journal.handle().entry(header).await
else {
continue;
};
for replica in replicas {
- let _ = consensus
- .message_bus()
- .send_to_replica(replica,
prepare.clone().into_generic())
- .await;
+ send(*replica, prepare.clone().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::RebuildPipeline { from_op, to_op } => {
+ let Some(journal) = journal else {
+ continue;
+ };
+ // Collect headers before borrowing the pipeline to avoid
+ // holding borrow_mut() across journal reads.
+ let mut gap_at = None;
+ let entries: Vec<_> = (*from_op..=*to_op)
+ .map_while(|op| {
Review Comment:
when `map_while` stops at a journal gap, the pipeline tail ends at `gap_at -
1`, but `complete_view_change_as_primary` already set the sequencer to `new_op`
(impls.rs:1538). on the next client request:
1. `next_sequence()` returns `new_op + 1`
2. `pipeline.push()` asserts `header.op == tail.header.op + 1` (impls.rs:203)
3. `new_op + 1 != gap_at` -> panic
the primary crashes on the first client request after a view change with a
journal gap. in practice C2 (the commit_min/commit_max assert) fires first
since the heartbeat timeout is shorter, but this is independently a guaranteed
crash.
fix: when a gap is encountered, adjust the sequencer to `gap_at - 1` so the
next prepare chains correctly. or refuse to enter `Status::Normal` if the
journal is incomplete in the uncommitted range.
##########
core/consensus/src/impls.rs:
##########
@@ -1043,6 +1076,35 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
vec![VsrAction::RetransmitPrepares { targets }]
}
+ /// Primary heartbeat: send commit point to all backups so they know
+ /// the primary is alive and can advance their own `commit_max`.
+ fn handle_commit_message_timeout(&self) -> Vec<VsrAction> {
+ if !self.is_primary() || self.status.get() != Status::Normal {
+ return Vec::new();
+ }
+
+ assert_eq!(
Review Comment:
this assert will deterministically panic whenever a behind backup becomes
the new primary after a view change.
`complete_view_change_as_primary` calls `advance_commit_max(max_commit)`
(line 1537) without touching `commit_min`, then starts `CommitMessage` timeout
(line 1555). if the new primary had `commit_min < max_commit` (normal case - it
missed some commits), this assert fires on the next tick. no `commit_journal`
is called anywhere in the view change path to catch up `commit_min`.
cascading failure: crashed primary triggers another view change, next
primary may also be behind, can exhaust all replicas.
the existing test doesn't cover this because all backups are fully caught up
before the crash.
fix options: (a) call `commit_journal` in the view change completion path
before starting CommitMessage timeout, (b) defer CommitMessage timeout until
`commit_min == commit_max`, or (c) replace assert with a guard that skips the
heartbeat when `commit_min < commit_max`.
##########
core/shard/src/router.rs:
##########
@@ -79,6 +97,12 @@ where
0
})
} else {
+ // TODO: View change messages (StartViewChange, DoViewChange,
StartView) return
Review Comment:
Commit messages also return `Operation::Reserved` from
`CommitHeader::operation()`, so they fall through here and route to shard 0.
fine for single-shard simulator, but in multi-shard production, partition-plane
commit heartbeats go to the wrong shard, get namespace-mismatched, and are
dropped - causing spurious view changes.
worth adding Commit to the TODO list alongside the view change messages.
##########
core/shard/src/router.rs:
##########
@@ -79,6 +97,12 @@ where
0
})
} else {
+ // TODO: View change messages (StartViewChange, DoViewChange,
StartView) return
+ // Operation::Reserved, so they always land here and route to
shard 0. This is
+ // correct only in single-shard deployments. In multi-shard,
partition-plane view
+ // change messages must reach the shard owning that consensus
group. Fixing this
+ // requires routing by Command2 + consensus namespace (a u64)
rather than by
+ // Operation + IggyNamespace, since view change headers don't
carry an IggyNamespace.
0
};
let _ = self.senders[target as
usize].send(ShardFrame::fire_and_forget(generic));
Review Comment:
now that view change messages (SVC, DVC, SV) and commit heartbeats flow
through this path, `let _ =` silently dropping on a full channel has worse
consequences than before. dropped heartbeats cause backup timeouts, dropped
SVC/DVC delays leader election. VSR retransmission handles eventual recovery,
but the feedback loop (dropped heartbeats -> view changes -> more messages ->
more drops) is worth keeping in mind.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]