krishvishal commented on code in PR #3092:
URL: https://github.com/apache/iggy/pull/3092#discussion_r3063732274
##########
core/shard/src/lib.rs:
##########
@@ -398,29 +595,245 @@ where
let Some(consensus) = partitions.consensus() else {
return;
};
- let Some(journal) = partitions.journal() else {
+
+ let current_op = consensus.sequencer().current_sequence();
+ let current_commit = consensus.commit_min();
+ let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+
+ dispatch_vsr_actions(consensus, partitions.journal(), &actions).await;
+ }
+
+ /// Tick the metadata consensus and dispatch any resulting actions.
+ #[allow(clippy::future_not_send)]
+ pub async fn tick_metadata(&self)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ {
+ let metadata = self.plane.metadata();
+ let Some(ref consensus) = metadata.consensus else {
return;
};
let current_op = consensus.sequencer().current_sequence();
let current_commit = consensus.commit_min();
- let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+ let actions = consensus.tick(PlaneKind::Metadata, current_op,
current_commit);
+
+ dispatch_vsr_actions(consensus, metadata.journal.as_ref(),
&actions).await;
+ }
+}
+
+/// Dispatch a list of `VsrAction`s by constructing the appropriate
+/// protocol messages and sending them via the consensus message bus.
+#[allow(
+ clippy::future_not_send,
+ clippy::too_many_lines,
+ clippy::cast_possible_truncation
+)]
+async fn dispatch_vsr_actions<B, P, J>(
+ consensus: &VsrConsensus<B, P>,
+ journal: Option<&J>,
+ actions: &[VsrAction],
+) where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Entry = consensus::PipelineEntry>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+{
+ use std::mem::size_of;
+
+ let bus = consensus.message_bus();
+ let self_id = consensus.replica();
+ let cluster = consensus.cluster();
+ let replica_count = consensus.replica_count();
- for action in actions {
- if let consensus::VsrAction::RetransmitPrepares { targets } =
action {
+ let send = |target: u8, msg: Message<GenericHeader>| async move {
+ if let Err(e) = bus.send_to_replica(target, msg).await {
+ tracing::debug!(replica = self_id, target, "bus send failed: {e}");
+ }
+ };
+
+ for action in actions {
+ match action {
+ VsrAction::SendStartViewChange { view, namespace } => {
+ let msg =
Message::<StartViewChangeHeader>::new(size_of::<StartViewChangeHeader>())
+ .transmute_header(|_, h: &mut StartViewChangeHeader| {
+ h.command = Command2::StartViewChange;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.namespace = *namespace;
+ h.size = size_of::<StartViewChangeHeader>() as u32;
+ });
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::SendDoViewChange {
+ view,
+ target,
+ log_view,
+ op,
+ commit,
+ namespace,
+ } => {
+ let msg =
Message::<DoViewChangeHeader>::new(size_of::<DoViewChangeHeader>())
+ .transmute_header(|_, h: &mut DoViewChangeHeader| {
+ h.command = Command2::DoViewChange;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.log_view = *log_view;
+ h.op = *op;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.size = size_of::<DoViewChangeHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ VsrAction::SendStartView {
+ view,
+ op,
+ commit,
+ namespace,
+ } => {
+ let msg =
Message::<StartViewHeader>::new(size_of::<StartViewHeader>())
+ .transmute_header(|_, h: &mut StartViewHeader| {
+ h.command = Command2::StartView;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = *op;
+ h.commit = *commit;
+ h.namespace = *namespace;
+ h.size = size_of::<StartViewHeader>() as u32;
+ });
+ for target in 0..replica_count {
+ if target != self_id {
+ send(target, msg.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::SendPrepareOk {
+ view,
+ from_op,
+ to_op,
+ target,
+ namespace,
+ } => {
+ let Some(journal) = journal else {
+ continue;
+ };
+ for op in *from_op..=*to_op {
+ let Some(prepare_header) = journal.handle().header(op as
usize) else {
+ continue;
+ };
+ let prepare_header = *prepare_header;
+ let msg =
Message::<PrepareOkHeader>::new(size_of::<PrepareOkHeader>())
+ .transmute_header(|_, h: &mut PrepareOkHeader| {
+ h.command = Command2::PrepareOk;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = op;
+ h.commit = consensus.commit_max();
+ h.timestamp = prepare_header.timestamp;
+ h.parent = prepare_header.parent;
+ h.prepare_checksum = prepare_header.checksum;
+ h.request = prepare_header.request;
+ h.operation = prepare_header.operation;
+ h.namespace = *namespace;
+ h.size = size_of::<PrepareOkHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ }
+ VsrAction::RetransmitPrepares { targets } => {
+ let Some(journal) = journal else {
+ continue;
+ };
for (header, replicas) in targets {
- let Some(prepare) = journal.handle().entry(&header).await
else {
+ let Some(prepare) = journal.handle().entry(header).await
else {
continue;
};
for replica in replicas {
- let _ = consensus
- .message_bus()
- .send_to_replica(replica,
prepare.clone().into_generic())
- .await;
+ send(*replica, prepare.clone().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::RebuildPipeline { from_op, to_op } => {
+ let Some(journal) = journal else {
+ continue;
+ };
+ // Collect headers before borrowing the pipeline to avoid
+ // holding borrow_mut() across journal reads.
+ let mut gap_at = None;
+ let entries: Vec<_> = (*from_op..=*to_op)
+ .map_while(|op| {
Review Comment:
Done.
##########
core/shard/src/router.rs:
##########
@@ -79,6 +97,12 @@ where
0
})
} else {
+ // TODO: View change messages (StartViewChange, DoViewChange,
StartView) return
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]