This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 52b198637 feat(consensus): add ClientTable with WAL-backed commit path
and view-change safety (#3023)
52b198637 is described below
commit 52b19863732551c2f5087f8915a919d9f44abfa0
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Apr 7 17:15:20 2026 +0530
feat(consensus): add ClientTable with WAL-backed commit path and
view-change safety (#3023)
---
core/binary_protocol/src/consensus/header.rs | 8 +-
core/binary_protocol/src/consensus/message.rs | 14 +-
core/consensus/src/client_table.rs | 577 +++++++++++++++++++++
core/consensus/src/impls.rs | 221 ++++++--
core/consensus/src/lib.rs | 3 +
core/consensus/src/namespaced_pipeline.rs | 86 ++-
core/consensus/src/observability.rs | 17 +-
core/consensus/src/plane_helpers.rs | 72 ++-
core/iobuf/src/lib.rs | 2 +-
core/journal/src/lib.rs | 2 +-
.../{metadata_journal.rs => prepare_journal.rs} | 40 +-
core/metadata/src/impls/metadata.rs | 157 ++++--
core/metadata/src/impls/recovery.rs | 10 +-
core/partitions/src/iggy_partitions.rs | 299 ++++++++---
core/shard/src/lib.rs | 139 ++++-
core/shard/src/router.rs | 26 +-
core/simulator/src/lib.rs | 11 +
core/simulator/src/replica.rs | 10 +-
18 files changed, 1469 insertions(+), 225 deletions(-)
diff --git a/core/binary_protocol/src/consensus/header.rs
b/core/binary_protocol/src/consensus/header.rs
index 40f66b1e0..ee0969378 100644
--- a/core/binary_protocol/src/consensus/header.rs
+++ b/core/binary_protocol/src/consensus/header.rs
@@ -181,6 +181,7 @@ pub struct ReplyHeader {
pub request_checksum: u128,
pub context: u128,
+ pub client: u128,
pub op: u64,
pub commit: u64,
pub timestamp: u64,
@@ -188,7 +189,7 @@ pub struct ReplyHeader {
pub operation: Operation,
pub operation_padding: [u8; 7],
pub namespace: u64,
- pub reserved: [u8; 48],
+ pub reserved: [u8; 32],
}
const _: () = {
assert!(size_of::<ReplyHeader>() == HEADER_SIZE);
@@ -196,7 +197,7 @@ const _: () = {
offset_of!(ReplyHeader, request_checksum)
== offset_of!(ReplyHeader, reserved_frame) + size_of::<[u8; 66]>()
);
- assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 48]>() ==
HEADER_SIZE);
+ assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 32]>() ==
HEADER_SIZE);
};
impl Default for ReplyHeader {
@@ -213,6 +214,7 @@ impl Default for ReplyHeader {
reserved_frame: [0; 66],
request_checksum: 0,
context: 0,
+ client: 0,
op: 0,
commit: 0,
timestamp: 0,
@@ -220,7 +222,7 @@ impl Default for ReplyHeader {
operation: Operation::Reserved,
operation_padding: [0; 7],
namespace: 0,
- reserved: [0; 48],
+ reserved: [0; 32],
}
}
}
diff --git a/core/binary_protocol/src/consensus/message.rs
b/core/binary_protocol/src/consensus/message.rs
index 834394f33..50ae07ac5 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -52,7 +52,7 @@ where
fn fragments(&self) -> &[Frozen<MESSAGE_ALIGN>];
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RequestBacking {
owned: Owned<MESSAGE_ALIGN>,
}
@@ -362,6 +362,18 @@ where
}
}
+impl<H> Clone for Message<H, RequestBacking>
+where
+ H: ConsensusHeader,
+{
+ fn clone(&self) -> Self {
+ Self {
+ backing: self.backing.clone(),
+ _marker: PhantomData,
+ }
+ }
+}
+
impl<H> Clone for Message<H, ResponseBacking>
where
H: ConsensusHeader,
diff --git a/core/consensus/src/client_table.rs
b/core/consensus/src/client_table.rs
new file mode 100644
index 000000000..620951fa0
--- /dev/null
+++ b/core/consensus/src/client_table.rs
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+ pub client_id: u128,
+ pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+ waker: RefCell<Option<Waker>>,
+ notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+ inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+ /// Create a new `Notify` in the un-notified state.
+ #[must_use]
+ pub fn new() -> Self {
+ Self {
+ inner: Rc::new(NotifyInner {
+ waker: RefCell::new(None),
+ notified: std::cell::Cell::new(false),
+ }),
+ }
+ }
+
+ /// Wake the waiter, if any. If `notified()` is polled later, it will
+ /// resolve immediately.
+ pub fn notify(&self) {
+ self.inner.notified.set(true);
+ if let Some(waker) = self.inner.waker.borrow_mut().take() {
+ waker.wake();
+ }
+ }
+
+ /// Returns a future that resolves when [`notify()`](Self::notify) is
called.
+ ///
+ /// If `notify()` was already called before this future is polled, it
+ /// resolves immediately (permit is consumed).
+ #[allow(clippy::future_not_send)]
+ pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+ std::future::poll_fn(move |cx| {
+ if self.inner.notified.get() {
+ self.inner.notified.set(false);
+ std::task::Poll::Ready(())
+ } else {
+ *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+ })
+ }
+}
+
+impl Default for Notify {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+ /// The cached reply for the client's latest committed request (header +
body).
+ pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the client table.
+#[derive(Debug)]
+pub enum RequestStatus {
+ /// Request not seen before, proceed with consensus.
+ New,
+ /// Exact request already committed, re-send cached reply.
+ Duplicate(Message<ReplyHeader>),
+ /// Request is in the pipeline awaiting commit, drop (client should wait).
+ InProgress,
+ /// Request number is older than the client's latest committed request.
+ /// Already handled in a prior commit cycle, drop silently.
+ Stale,
+}
+
+/// VSR client-table: tracks per-client request state for duplicate detection,
+/// reply caching, and async commit notification.
+///
+/// Uses a fixed-size slot array as the source of truth, with a `HashMap`
+/// as a secondary index for O(1) lookups by client ID.
+///
+/// ## Committed state (`slots` + `index`)
+///
+/// Always contains a valid `ClientEntry` with a non-optional reply.
+/// Updated by `commit_reply` when a request commits through consensus.
+///
+/// ## Pending state (`pending`)
+///
+/// Tracks in-flight requests that have been accepted for consensus but not yet
+/// committed. Each entry holds a [`Notify`] that is fired when the
corresponding
+/// `commit_reply` arrives. Keyed by `ClientRequest` to support future
+/// request pipelining (currently at most one per client).
+///
+/// The `pending` map is local notification state not replicated, not
+/// serialized, not part of the deterministic committed state.
+///
+/// ## Known gaps
+///
+/// - **Message repair**: If a backup never received a prepare (lost message),
+/// `commit_journal` stops at the gap. The client table will be missing
+/// entries for ops beyond the gap until the message repair protocol is
+/// implemented and the missing prepare is retransmitted.
+///
+/// - **Checkpoint serialization**: The slot array is laid out for
deterministic
+/// encode/decode to disk, but serialization is not yet implemented.
+#[derive(Debug)]
+pub struct ClientTable {
+ /// `None` means the slot is free.
+ /// Deterministic iteration order for eviction and serialization.
+ slots: Vec<Option<ClientEntry>>,
+ /// Secondary index: `client_id` → slot index. Rebuilt on decode.
+ index: HashMap<u128, usize>,
+ /// Pending commit waiters, keyed by `(client_id, request)`.
+ /// Keyed by request number (not just client) to support future pipelining.
+ /// Currently at most one per client.
+ pending: HashMap<ClientRequest, Notify>,
+}
+
+impl ClientTable {
+ #[must_use]
+ pub fn new(max_clients: usize) -> Self {
+ let mut slots = Vec::with_capacity(max_clients);
+ slots.resize_with(max_clients, || None);
+ Self {
+ slots,
+ index: HashMap::with_capacity(max_clients),
+ pending: HashMap::new(),
+ }
+ }
+
+ /// Check a request against the table.
+ ///
+ /// Returns:
+ /// - [`RequestStatus::New`]: not seen before, proceed with consensus
+ /// - [`RequestStatus::Duplicate`]: already committed, re-send cached reply
+ /// - [`RequestStatus::InProgress`]: in the pipeline awaiting commit
+ /// - [`RequestStatus::Stale`]: older than the client's latest committed
request
+ ///
+ /// # Panics
+ /// Panics if the internal index points to an empty slot (invariant
violation).
+ #[must_use]
+ pub fn check_request(&self, client_id: u128, request: u64) ->
RequestStatus {
+ // TODO: Once client sessions are added (register/evict protocol like
+ // validate client_id at the session layer instead of
+ // panicking here. Unregistered or invalid clients should be rejected
+ // gracefully at ingress, not inside the client table.
+ assert!(client_id != 0, "client_id 0 is reserved for internal use");
+
+ // Check if already pending in the pipeline.
+ let key = ClientRequest { client_id, request };
+ if self.pending.contains_key(&key) {
+ return RequestStatus::InProgress;
+ }
+
+ let Some(&slot_idx) = self.index.get(&client_id) else {
+ return RequestStatus::New;
+ };
+ let entry = self.slots[slot_idx].as_ref().expect("index/slot
mismatch");
+ let committed_request = entry.reply.header().request;
+
+ if request < committed_request {
+ return RequestStatus::Stale;
+ }
+ if request == committed_request {
+ return RequestStatus::Duplicate(entry.reply.clone());
+ }
+
+ RequestStatus::New
+ }
+
+ /// Register interest in a pending request's commit.
+ ///
+ /// Returns a [`Notify`] the caller can `.notified().await` on. The
`Notify`
+ /// is cloned via `Rc`, so the caller can hold it across `.await` points
+ /// without borrowing the `ClientTable`.
+ ///
+ /// Called after `check_request` returns `New`, before submitting the
request
+ /// to the consensus pipeline.
+ ///
+ /// # Panics
+ /// Panics if there is already a pending waiter for this `(client_id,
request)`.
+ pub fn register_pending(&mut self, client_id: u128, request: u64) ->
Notify {
+ let notify = Notify::new();
+ let key = ClientRequest { client_id, request };
+ let prev = self.pending.insert(key, notify.clone());
+ assert!(
+ prev.is_none(),
+ "client {client_id} request {request} already has a pending waiter"
+ );
+ notify
+ }
+
+ /// Record a committed reply and cache it.
+ ///
+ /// If the client already has a slot, updates it in place. Otherwise
allocates
+ /// a free slot, evicting the client with the oldest commit number if the
table
+ /// is full.
+ ///
+ /// Wakes the pending [`Notify`] for this `(client_id, request)` if one
exists.
+ ///
+ /// Called in `on_ack` after `build_reply_message`.
+ ///
+ /// # Panics
+ /// Panics if the internal index points to an empty slot (invariant
violation).
+ pub fn commit_reply(&mut self, client_id: u128, reply:
Message<ReplyHeader>) {
+ assert!(client_id != 0, "client_id 0 is reserved for internal use");
+ assert_eq!(
+ client_id,
+ reply.header().client,
+ "commit_reply: client_id mismatch (arg={client_id}, header={})",
+ reply.header().client
+ );
+ let request = reply.header().request;
+
+ if let Some(&slot_idx) = self.index.get(&client_id) {
+ let slot = self.slots[slot_idx].as_mut().expect("index/slot
mismatch");
+ // Monotonicity: both commit (op) and request must not regress.
+ assert!(
+ reply.header().commit >= slot.reply.header().commit,
+ "commit_reply: commit regression for client {client_id}: {} ->
{}",
+ slot.reply.header().commit,
+ reply.header().commit
+ );
+ assert!(
+ reply.header().request >= slot.reply.header().request,
+ "commit_reply: request regression for client {client_id}: {}
-> {}",
+ slot.reply.header().request,
+ reply.header().request
+ );
+ slot.reply = reply;
+ } else {
+ // Need a free slot. Evict if full.
+ if self.index.len() >= self.slots.len() {
+ self.evict_oldest();
+ }
+
+ let slot_idx = self.first_free_slot().expect("eviction must free a
slot");
+ self.slots[slot_idx] = Some(ClientEntry { reply });
+ self.index.insert(client_id, slot_idx);
+ }
+
+ // Wake the waiter, if any.
+ let key = ClientRequest { client_id, request };
+ if let Some(notify) = self.pending.remove(&key) {
+ notify.notify();
+ }
+ }
+
+ /// Evict the client with the oldest commit number.
+ ///
+ /// Iterates the fixed-size slot array (deterministic order), so all
replicas
+ /// with the same committed state evict the same client. Ties on commit
number
+ /// are broken by slot index (lowest index wins), which is also
deterministic.
+ ///
+ /// **Dedup caveat**: until checkpoint serialization is implemented,
eviction
+ /// breaks at-most-once semantics for the evicted client — a retransmission
+ /// after eviction will be treated as `New` and re-executed.
+ fn evict_oldest(&mut self) {
+ let mut evictee: Option<(usize, u64)> = None; // (slot_idx, commit)
+
+ for (idx, slot) in self.slots.iter().enumerate() {
+ if let Some(entry) = slot {
+ let commit = entry.reply.header().commit;
+ let should_evict = match evictee {
+ None => true,
+ Some((_, min_commit)) => commit < min_commit,
+ };
+ if should_evict {
+ evictee = Some((idx, commit));
+ }
+ }
+ }
+
+ if let Some((slot_idx, _)) = evictee {
+ let entry = self.slots[slot_idx].take().expect("evictee must
exist");
+ self.index.remove(&entry.reply.header().client);
+ }
+ }
+
+ /// Find the first free slot in the array.
+ fn first_free_slot(&self) -> Option<usize> {
+ self.slots.iter().position(Option::is_none)
+ }
+
+ /// Get the cached reply for a client (for duplicate re-sends).
+ #[must_use]
+ pub fn get_reply(&self, client_id: u128) -> Option<&Message<ReplyHeader>> {
+ let &slot_idx = self.index.get(&client_id)?;
+ self.slots[slot_idx].as_ref().map(|entry| &entry.reply)
+ }
+
+ /// Number of active committed client entries.
+ #[must_use]
+ pub fn count(&self) -> usize {
+ self.index.len()
+ }
+
+ /// Number of pending (in-flight) requests.
+ #[must_use]
+ pub fn pending_count(&self) -> usize {
+ self.pending.len()
+ }
+
+ /// Clear all pending entries (e.g. during view change).
+ ///
+ /// Stale pending entries from a previous view must not survive into the
+ /// new view - `check_request` would return `InProgress` for the orphaned
+ /// keys, silently dropping valid client retries.
+ pub fn clear_pending(&mut self) {
+ self.pending.clear();
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use iggy_binary_protocol::{Command2, Operation};
+
+ fn make_reply_for(client: u128, request: u64, commit: u64) ->
Message<ReplyHeader> {
+ let header_size = std::mem::size_of::<ReplyHeader>();
+ let mut msg = Message::<ReplyHeader>::new(header_size);
+ let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(
+ &mut msg.as_mut_slice()[..header_size],
+ )
+ .expect("zeroed bytes are valid");
+ *header = ReplyHeader {
+ client,
+ request,
+ commit,
+ command: Command2::Reply,
+ operation: Operation::SendMessages,
+ ..ReplyHeader::default()
+ };
+ msg
+ }
+
+ fn make_reply(request: u64, commit: u64) -> Message<ReplyHeader> {
+ make_reply_for(1, request, commit)
+ }
+
+ // Notify tests
+
+ #[test]
+ fn notify_after_await_registration() {
+ let notify = Notify::new();
+ let waiter = notify.clone();
+
+ // Notify before anyone polls, should resolve immediately on first
poll.
+ notify.notify();
+
+ let waker = futures::task::noop_waker();
+ let mut cx = std::task::Context::from_waker(&waker);
+ let mut fut = std::pin::pin!(waiter.notified());
+ assert!(fut.as_mut().poll(&mut cx).is_ready());
+ }
+
+ #[test]
+ fn notify_wakes_pending_poll() {
+ let notify = Notify::new();
+ let waiter = notify.clone();
+
+ let waker = futures::task::noop_waker();
+ let mut cx = std::task::Context::from_waker(&waker);
+ let mut fut = std::pin::pin!(waiter.notified());
+
+ // First poll returns Pending (not yet notified).
+ assert!(fut.as_mut().poll(&mut cx).is_pending());
+
+ // Notify fires.
+ notify.notify();
+
+ // Next poll resolves.
+ assert!(fut.as_mut().poll(&mut cx).is_ready());
+ }
+
+ #[test]
+ fn notify_consumed_after_ready() {
+ let notify = Notify::new();
+ notify.notify();
+
+ let waker = futures::task::noop_waker();
+ let mut cx = std::task::Context::from_waker(&waker);
+
+ // First notified() consumes the permit.
+ let mut fut1 = std::pin::pin!(notify.notified());
+ assert!(fut1.as_mut().poll(&mut cx).is_ready());
+
+ // Second notified() should be pending (permit consumed).
+ let mut fut2 = std::pin::pin!(notify.notified());
+ assert!(fut2.as_mut().poll(&mut cx).is_pending());
+ }
+
+ // ClientTable tests
+
+ #[test]
+ fn check_request_new() {
+ let table = ClientTable::new(10);
+ assert!(matches!(table.check_request(1, 1), RequestStatus::New));
+ }
+
+ #[test]
+ fn check_request_duplicate_after_commit() {
+ let mut table = ClientTable::new(10);
+ table.commit_reply(1, make_reply(1, 10));
+
+ match table.check_request(1, 1) {
+ RequestStatus::Duplicate(cached) => {
+ assert_eq!(cached.header().request, 1);
+ }
+ _ => panic!("expected Duplicate"),
+ }
+ }
+
+ #[test]
+ fn check_request_stale() {
+ let mut table = ClientTable::new(10);
+ table.commit_reply(1, make_reply(5, 10));
+
+ assert!(matches!(table.check_request(1, 3), RequestStatus::Stale));
+ }
+
+ #[test]
+ fn check_request_in_progress_while_pending() {
+ let mut table = ClientTable::new(10);
+ let _notify = table.register_pending(1, 1);
+
+ assert!(matches!(
+ table.check_request(1, 1),
+ RequestStatus::InProgress
+ ));
+ }
+
+ #[test]
+ fn commit_caches_reply() {
+ let mut table = ClientTable::new(10);
+ table.commit_reply(1, make_reply(1, 10));
+
+ let cached = table.get_reply(1).expect("should have cached reply");
+ assert_eq!(cached.header().request, 1);
+ }
+
+ #[test]
+ fn commit_updates_existing_entry() {
+ let mut table = ClientTable::new(10);
+ table.commit_reply(1, make_reply(1, 10));
+ table.commit_reply(1, make_reply(2, 20));
+
+ let cached = table.get_reply(1).expect("should have cached reply");
+ assert_eq!(cached.header().request, 2);
+ assert_eq!(table.count(), 1);
+ }
+
+ #[test]
+ fn register_and_commit_notifies() {
+ let mut table = ClientTable::new(10);
+ let notify = table.register_pending(1, 1);
+
+ assert_eq!(table.pending_count(), 1);
+
+ // Commit fires the notify.
+ table.commit_reply(1, make_reply(1, 10));
+
+ assert_eq!(table.pending_count(), 0);
+
+ let waker = futures::task::noop_waker();
+ let mut cx = std::task::Context::from_waker(&waker);
+ let mut fut = std::pin::pin!(notify.notified());
+ assert!(fut.as_mut().poll(&mut cx).is_ready());
+ }
+
+ #[test]
+ fn eviction_removes_oldest_commit() {
+ let mut table = ClientTable::new(2);
+
+ table.commit_reply(100, make_reply_for(100, 1, 10));
+ table.commit_reply(200, make_reply_for(200, 1, 20));
+ table.commit_reply(300, make_reply_for(300, 1, 30));
+
+ assert!(table.get_reply(100).is_none());
+ assert!(table.get_reply(200).is_some());
+ assert!(table.get_reply(300).is_some());
+ assert_eq!(table.count(), 2);
+ }
+
+ #[test]
+ fn eviction_is_deterministic_by_slot_index() {
+ let mut table = ClientTable::new(2);
+
+ table.commit_reply(100, make_reply_for(100, 1, 10));
+ table.commit_reply(200, make_reply_for(200, 1, 10));
+ table.commit_reply(300, make_reply_for(300, 1, 30));
+
+ assert!(table.get_reply(100).is_none());
+ assert!(table.get_reply(200).is_some());
+ assert!(table.get_reply(300).is_some());
+ }
+
+ #[test]
+ fn new_request_after_commit_is_new() {
+ let mut table = ClientTable::new(10);
+ table.commit_reply(1, make_reply(1, 10));
+
+ assert!(matches!(table.check_request(1, 2), RequestStatus::New));
+ }
+
+ #[test]
+ fn slot_reuse_after_eviction() {
+ let mut table = ClientTable::new(1);
+
+ table.commit_reply(100, make_reply_for(100, 1, 10));
+ table.commit_reply(200, make_reply_for(200, 1, 20));
+
+ assert!(table.get_reply(100).is_none());
+ assert!(table.get_reply(200).is_some());
+ assert_eq!(table.count(), 1);
+ }
+
+ #[test]
+ #[should_panic(expected = "already has a pending waiter")]
+ fn register_pending_twice_panics() {
+ let mut table = ClientTable::new(10);
+ let _n1 = table.register_pending(1, 1);
+ let _n2 = table.register_pending(1, 1);
+ }
+}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 7c4b27d04..55101854c 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client_table::ClientTable;
use crate::vsr_timeout::{TimeoutKind, TimeoutManager};
use crate::{
AckLogEvent, Consensus, ControlActionLogEvent, DvcQuorumArray,
IgnoreReason, Pipeline,
@@ -86,6 +87,10 @@ pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
/// Maximum number of replicas in a cluster.
pub const REPLICAS_MAX: usize = 32;
+/// Maximum number of clients tracked in the clients table.
+/// When exceeded, the client with the oldest committed request is evicted.
+pub const CLIENTS_TABLE_MAX: usize = 8192;
+
#[derive(Debug)]
pub struct PipelineEntry {
pub header: PrepareHeader,
@@ -403,7 +408,7 @@ pub enum Status {
}
/// Actions to be taken by the caller after processing a VSR event.
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone)]
pub enum VsrAction {
/// Send `StartViewChange` to all replicas.
SendStartViewChange { view: u32, namespace: u64 },
@@ -423,13 +428,26 @@ pub enum VsrAction {
commit: u64,
namespace: u64,
},
- /// Send `PrepareOK` to primary.
+ /// Send `PrepareOK` for each op in `[from_op, to_op]` that is present in
the WAL.
+ ///
+ /// The caller MUST verify each op exists in the journal before sending.
+ /// Sending `PrepareOk` for a missing op is a safety violation, it can
+ /// cause the primary to commit an op without enough replicas holding the
data.
SendPrepareOk {
view: u32,
- op: u64,
+ from_op: u64,
+ to_op: u64,
target: u8,
namespace: u64,
},
+ /// Retransmit uncommitted prepares from the WAL to replicas that haven't
acked.
+ ///
+ /// Emitted when the primary's prepare timeout fires and there are
+ /// uncommitted entries in the pipeline. Each entry is a prepare header
+ /// (for journal lookup) and the list of replica IDs that need it.
+ RetransmitPrepares {
+ targets: Vec<(PrepareHeader, Vec<u8>)>,
+ },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -477,7 +495,16 @@ where
// * `replica.log_view = 0` when replica_count=1.
log_view: Cell<u32>,
status: Cell<Status>,
- commit: Cell<u64>,
+
+ /// Highest op number that has been locally executed (state machine
applied,
+ /// client table updated). Advances one-by-one in `commit_journal` (backup)
+ /// and `on_ack` (primary). On a normal primary, `commit_min ==
commit_max`.
+ commit_min: Cell<u64>,
+
+ /// Highest op number known to be committed by the cluster. Advances
+ /// immediately when the replica learns about commits (from prepare
+ /// messages, commit heartbeats, or view change messages).
+ commit_max: Cell<u64>,
sequencer: LocalSequencer,
@@ -502,6 +529,9 @@ where
sent_own_do_view_change: Cell<bool>,
timeouts: RefCell<TimeoutManager>,
+
+ /// VSR client-table for duplicate detection and reply caching.
+ client_table: RefCell<ClientTable>,
}
impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
@@ -534,7 +564,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
log_view: Cell::new(0),
status: Cell::new(Status::Recovering),
sequencer: LocalSequencer::new(0),
- commit: Cell::new(0),
+ commit_min: Cell::new(0),
+ commit_max: Cell::new(0),
last_timestamp: Cell::new(0),
last_prepare_checksum: Cell::new(0),
pipeline: RefCell::new(pipeline),
@@ -546,6 +577,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
sent_own_start_view_change: Cell::new(false),
sent_own_do_view_change: Cell::new(false),
timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
+ client_table: RefCell::new(ClientTable::new(CLIENTS_TABLE_MAX)),
}
}
@@ -567,14 +599,38 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.primary_index(self.view.get()) == self.replica
}
+ /// Advance `commit_max` - the highest op known to be committed by the
cluster.
+ ///
+ /// Called when the replica learns about new commits from the primary
+ /// (via prepare messages, commit heartbeats, or view change messages).
+ ///
/// # Panics
- /// If the stored commit number somehow exceeds the given `commit` after
update.
- pub fn advance_commit_number(&self, commit: u64) {
- if commit > self.commit.get() {
- self.commit.set(commit);
+ /// If `commit_max` would be less than `commit_min` after the update
+ /// (invariant violation).
+ pub fn advance_commit_max(&self, commit: u64) {
+ if commit > self.commit_max.get() {
+ self.commit_max.set(commit);
}
+ assert!(self.commit_max.get() >= self.commit_min.get());
+ }
- assert!(self.commit.get() >= commit);
+ /// Advance `commit_min` - the highest op locally executed.
+ ///
+ /// Called after each op is applied through `commit_journal` (backup)
+ /// or `on_ack` (primary). Must advance sequentially (by 1).
+ ///
+ /// # Panics
+ /// - If `op` is not exactly `commit_min + 1` (must advance sequentially).
+ /// - If `commit_min` would exceed `commit_max` after the update.
+ pub fn advance_commit_min(&self, op: u64) {
+ assert_eq!(
+ op,
+ self.commit_min.get() + 1,
+ "commit_min must advance sequentially: expected {}, got {op}",
+ self.commit_min.get() + 1
+ );
+ self.commit_min.set(op);
+ assert!(self.commit_max.get() >= self.commit_min.get());
}
/// Maximum number of faulty replicas that can be tolerated.
@@ -590,9 +646,16 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.max_faulty() + 1
}
+ /// Highest op locally executed (state machine applied, client table
updated).
+ #[must_use]
+ pub const fn commit_min(&self) -> u64 {
+ self.commit_min.get()
+ }
+
+ /// Highest op known to be committed by the cluster.
#[must_use]
- pub const fn commit(&self) -> u64 {
- self.commit.get()
+ pub const fn commit_max(&self) -> u64 {
+ self.commit_max.get()
}
#[must_use]
@@ -632,6 +695,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
&mut self.pipeline
}
+ #[must_use]
+ pub const fn client_table(&self) -> &RefCell<ClientTable> {
+ &self.client_table
+ }
+
#[must_use]
pub const fn cluster(&self) -> u128 {
self.cluster
@@ -706,6 +774,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.sent_own_start_view_change.set(false);
self.sent_own_do_view_change.set(false);
self.loopback_queue.borrow_mut().clear();
+ self.client_table.borrow_mut().clear_pending();
}
/// Process one tick. Call this periodically (e.g., every 10ms).
@@ -742,6 +811,12 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
timeouts = self.timeouts.borrow_mut();
}
+ if timeouts.fired(TimeoutKind::Prepare) {
+ drop(timeouts);
+ actions.extend(self.handle_prepare_timeout());
+ timeouts = self.timeouts.borrow_mut();
+ }
+
if timeouts.fired(TimeoutKind::ViewChangeStatus) {
drop(timeouts);
actions.extend(self.handle_view_change_status_timeout(plane));
@@ -919,6 +994,55 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
vec![action]
}
+ /// Collect uncommitted pipeline entries that should be retransmitted.
+ ///
+ /// Returns `(PrepareHeader, Vec<u8>)` pairs: each op that hasn't reached
+ /// quorum paired with the replica IDs that haven't acked it.
+ fn retransmit_targets(&self) -> Vec<(PrepareHeader, Vec<u8>)> {
+ let pipeline = self.pipeline.borrow();
+ let current_op = self.sequencer.current_sequence();
+ let replica_count = self.replica_count;
+ let mut targets = Vec::new();
+
+ let mut op = self.commit_max() + 1;
+ while op <= current_op {
+ if let Some(entry) = pipeline.entry_by_op(op)
+ && !entry.ok_quorum_received
+ {
+ let missing: Vec<u8> = (0..replica_count).filter(|&r|
!entry.has_ack(r)).collect();
+ if !missing.is_empty() {
+ targets.push((entry.header, missing));
+ }
+ }
+ op += 1;
+ }
+
+ targets
+ }
+
+ /// Retransmit uncommitted prepares when the prepare timeout fires.
+ ///
+ /// Only acts on the primary in normal status with a non-empty pipeline.
+ /// Resets the timeout with backoff on each firing.
+ fn handle_prepare_timeout(&self) -> Vec<VsrAction> {
+ if !self.is_primary() || self.status.get() != Status::Normal {
+ return Vec::new();
+ }
+
+ if self.pipeline.borrow().is_empty() {
+ return Vec::new();
+ }
+
+ let targets = self.retransmit_targets();
+ if targets.is_empty() {
+ return Vec::new();
+ }
+
+ self.timeouts.borrow_mut().backoff(TimeoutKind::Prepare);
+
+ vec![VsrAction::RetransmitPrepares { targets }]
+ }
+
/// Handle a received `StartViewChange` message.
///
/// "When replica i receives STARTVIEWCHANGE messages for its view-number
@@ -1004,7 +1128,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
let primary_candidate = self.primary_index(self.view.get());
let current_op = self.sequencer.current_sequence();
- let current_commit = self.commit.get();
+ // DVC uses commit_min: the replica's actual execution progress.
+ let current_commit = self.commit_min.get();
// Start DVC timeout
self.timeouts
@@ -1137,7 +1262,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
let current_op = self.sequencer.current_sequence();
- let current_commit = self.commit.get();
+ // Use commit_min: the replica's actual execution progress.
+ let current_commit = self.commit_min.get();
// If we haven't sent our own DVC yet, record it
if !self.sent_own_do_view_change.get() {
@@ -1184,6 +1310,15 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
///
/// # Panics
/// If `header.namespace` does not match this replica's namespace.
+ /// # Client-table maintenance
+ ///
+ /// Backups maintain the client-table during normal operation via
+ /// `commit_journal` in `on_replicate`, which walks the WAL and updates
+ /// the client table for each committed op. The WAL survives view changes,
+ /// so the new primary can process any committed op it received.
+ ///
+ /// Gap: if a backup never received a prepare (lost message),
+ /// `commit_journal` stops at the gap. Requires message repair.
pub fn handle_start_view(&self, plane: PlaneKind, header:
&StartViewHeader) -> Vec<VsrAction> {
assert_eq!(header.namespace, self.namespace, "SV routed to wrong
group");
let from_replica = header.replica;
@@ -1210,13 +1345,19 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.view.set(msg_view);
self.log_view.set(msg_view);
self.status.set(Status::Normal);
- self.advance_commit_number(msg_commit);
+ self.advance_commit_max(msg_commit);
self.reset_view_change_state();
// Stale pipeline entries from the old view must be discarded
self.pipeline.borrow_mut().clear();
- // Update our op to match the new primary's log
+ // TODO: TigerBeetle's StartView message carries uncommitted op
headers,
+ // allowing the backup to install them into the WAL and set op to a
+ // WAL-verified value. We don't carry headers yet, so we blindly trust
+ // msg_op. This is correct for truncation (sequencer > msg_op) but
wrong
+ // when the backup is behind (sequencer < msg_op) — the gap between the
+ // WAL and msg_op becomes unreachable without message repair. Fix by
+ // either carrying headers in StartView or implementing message repair.
self.sequencer.set_sequence(msg_op);
// Update timeouts for normal backup operation
@@ -1228,12 +1369,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
timeouts.start(TimeoutKind::NormalHeartbeat);
}
- // Send PrepareOK for uncommitted ops (commit+1 to op)
- let mut actions = Vec::new();
- for op_num in (msg_commit + 1)..=msg_op {
+ // Send PrepareOK for uncommitted ops that we actually have in the WAL.
+ // The caller must verify each op exists before sending.
+ emit_replica_event(
+ SimEventKind::ReplicaStateChanged,
+ &ReplicaLogContext::from_consensus(self, plane),
+ );
+
+ if msg_commit < msg_op {
let action = VsrAction::SendPrepareOk {
view: msg_view,
- op: op_num,
+ from_op: msg_commit + 1,
+ to_op: msg_op,
target: from_replica,
namespace: self.namespace,
};
@@ -1244,18 +1391,22 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
&action,
),
);
- actions.push(action);
+ vec![action]
+ } else {
+ Vec::new()
}
-
- emit_replica_event(
- SimEventKind::ReplicaStateChanged,
- &ReplicaLogContext::from_consensus(self, plane),
- );
-
- actions
}
/// Complete view change as the new primary after collecting DVC quorum.
+ ///
+ /// # Client-table maintenance
+ ///
+ /// Backups populate the client-table during normal operation via
+ /// `commit_journal` in `on_replicate`. The WAL survives view changes, so
+ /// when this replica transitions from backup to primary, its table
+ /// contains entries for all committed ops it received.
+ ///
+ /// Gap: missing prepares (lost messages) require message repair.
fn complete_view_change_as_primary(&self, plane: PlaneKind) ->
Vec<VsrAction> {
let dvc_array = self.do_view_change_from_all_replicas.borrow();
@@ -1269,7 +1420,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
// Update state
self.log_view.set(self.view.get());
self.status.set(Status::Normal);
- self.advance_commit_number(max_commit);
+ self.advance_commit_max(max_commit);
self.sequencer.set_sequence(new_op);
// Stale pipeline entries from the old view are invalid in the new
view.
@@ -1472,7 +1623,7 @@ where
parent: consensus.last_prepare_checksum(),
request_checksum: old.request_checksum,
request: old.request,
- commit: consensus.commit.get(),
+ commit: consensus.commit_max.get(),
op,
timestamp: 0, // 0 for now. Implement correct way to get
timestamp later
operation: old.operation,
@@ -1502,7 +1653,7 @@ where
// It's important to use the view of the replica, not the
received prepare!
view: consensus.view.get(),
op: old.op,
- commit: consensus.commit.get(),
+ commit: consensus.commit_max.get(),
timestamp: old.timestamp,
operation: old.operation,
namespace: old.namespace,
@@ -1554,6 +1705,14 @@ where
pipeline_depth,
},
);
+
+ // Start the prepare timeout so the primary retransmits if backups
+ // don't ack in time. It is only started (not reset) so that an
+ // already-ticking timeout is not pushed out by every new request.
+ let mut timeouts = self.timeouts.borrow_mut();
+ if !timeouts.is_ticking(TimeoutKind::Prepare) {
+ timeouts.start(TimeoutKind::Prepare);
+ }
}
fn verify_pipeline(&self) {
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 200ee87ad..61352ab02 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -103,6 +103,9 @@ where
H: ConsensusHeader;
}
+pub mod client_table;
+pub use client_table::ClientTable;
+
mod impls;
pub use impls::*;
mod plane_mux;
diff --git a/core/consensus/src/namespaced_pipeline.rs
b/core/consensus/src/namespaced_pipeline.rs
index 71e18fb71..0c2328733 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -20,23 +20,15 @@ use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX,
PipelineEntry};
use iggy_binary_protocol::{Message, PrepareHeader};
use std::collections::{HashMap, VecDeque};
-/// Pipeline that partitions entries by namespace for independent commit
draining.
+/// Pipeline that partitions entries by namespace.
///
/// A single global op sequence and hash chain spans all namespaces, but
entries
-/// are stored in per-namespace `VecDeques`. Each namespace tracks its own
commit
-/// frontier so `drain_committable_all` drains quorum'd entries per-namespace
-/// without waiting for the global commit to advance past unrelated namespaces.
+/// are stored in per-namespace `VecDeque`s. Ops are committed strictly in
global
+/// order via `drain_committable_prefix`, which drains the contiguous prefix of
+/// quorum'd ops regardless of namespace. This ensures `commit_min` advances
+/// sequentially.
///
-/// The global commit (on `VsrConsensus`) remains a conservative lower bound
-/// for the VSR protocol (view change, follower commit piggybacking). It only
-/// advances when all ops up to that point are drained. Per-namespace draining
-/// can run ahead of the global commit.
-///
-/// An alternative (simpler) approach would drain purely by per-entry quorum
-/// flag without tracking per-namespace commit numbers, relying solely on
-/// `global_commit_frontier` for the protocol commit. We track per-namespace
-/// commits explicitly for observability and to make the independence model
-/// visible in the data structure.
+/// Per-namespace commit frontiers (`ns_commits`) are tracked for
observability.
#[derive(Debug)]
pub struct NamespacedPipeline {
queues: HashMap<u64, VecDeque<PipelineEntry>>,
@@ -142,6 +134,58 @@ impl NamespacedPipeline {
}
commit
}
+
+ /// Drain the contiguous prefix of committed ops in global op order.
+ ///
+ /// Starting from `commit + 1`, drains ops from their per-namespace queues
+ /// while each consecutive op has `ok_quorum_received == true`. Stops at
the
+ /// first gap or non-quorum op, ensuring ops are committed strictly in
order.
+ ///
+ /// Returns `(drained_entries, new_commit)` where `new_commit` is the
highest
+ /// op drained (or `commit` if nothing was drained).
+ ///
+ /// # Panics
+ /// If internal queue state is inconsistent (namespace found in scan but
+ /// missing from map, or front entry disappears between lookup and pop).
+ pub fn drain_committable_prefix(&mut self, commit: u64) ->
(Vec<PipelineEntry>, u64) {
+ let mut drained = Vec::new();
+ let mut new_commit = commit;
+ let mut op = commit + 1;
+
+ while op <= self.last_push_op {
+ // Find the entry across all namespace queues.
+ let ns = {
+ let mut found_ns = None;
+ for (&ns, queue) in &self.queues {
+ if queue.front().is_some_and(|f| f.header.op == op) {
+ found_ns = Some(ns);
+ break;
+ }
+ }
+ match found_ns {
+ Some(ns) => ns,
+ None => break, // op not at front of any queue - gap
+ }
+ };
+
+ let queue = self.queues.get_mut(&ns).expect("ns must exist");
+ let front = queue.front().expect("front must exist");
+ if !front.ok_quorum_received {
+ break; // not yet committed
+ }
+
+ let entry = queue.pop_front().expect("front exists");
+ self.total_count -= 1;
+ if let Some(ns_commit) = self.ns_commits.get_mut(&ns) {
+ *ns_commit = entry.header.op;
+ }
+ new_commit = entry.header.op;
+ drained.push(entry);
+ op += 1;
+ }
+
+ (drained, new_commit)
+ }
}
impl Pipeline for NamespacedPipeline {
@@ -211,6 +255,11 @@ impl Pipeline for NamespacedPipeline {
for queue in self.queues.values_mut() {
queue.clear();
}
+ // Reset ns_commits, stale values from a previous view could mislead
+ // global_commit_frontier after a view change.
+ for commit in self.ns_commits.values_mut() {
+ *commit = 0;
+ }
self.total_count = 0;
self.last_push_checksum = 0;
self.last_push_op = 0;
@@ -558,23 +607,22 @@ mod tests {
}
#[test]
- fn clear_preserves_ns_commits() {
+ fn clear_resets_ns_commits() {
let mut pipeline = NamespacedPipeline::new();
pipeline.register_namespace(100);
pipeline.register_namespace(200);
pipeline.push_message(make_prepare(1, 0, 10, 100));
pipeline.push_message(make_prepare(2, 10, 20, 200));
- // Mark op 1 as committed in ns 100 before clearing
pipeline.ns_commits.insert(100, 1);
pipeline.clear();
assert!(pipeline.is_empty());
assert_eq!(pipeline.total_count, 0);
- // ns_commits must survive clear -- they represent durable knowledge
- // about already-drained ops, not pipeline state
- assert_eq!(pipeline.ns_commits.get(&100), Some(&1));
+ // ns_commits must be reset on clear -- stale values from a previous
+ // view would mislead global_commit_frontier.
+ assert_eq!(pipeline.ns_commits.get(&100), Some(&0));
assert_eq!(pipeline.ns_commits.get(&200), Some(&0));
}
}
diff --git a/core/consensus/src/observability.rs
b/core/consensus/src/observability.rs
index 865935faf..c4eaed89c 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -114,6 +114,7 @@ pub enum ControlActionKind {
SendDoViewChange,
SendStartView,
SendPrepareOk,
+ SendPrepare,
}
impl ControlActionKind {
@@ -124,6 +125,7 @@ impl ControlActionKind {
Self::SendDoViewChange => "send_do_view_change",
Self::SendStartView => "send_start_view",
Self::SendPrepareOk => "send_prepare_ok",
+ Self::SendPrepare => "send_prepare",
}
}
}
@@ -230,7 +232,7 @@ impl ReplicaLogContext {
namespace: NamespaceLogContext::from_raw(plane,
consensus.namespace()),
view: consensus.view(),
log_view: consensus.log_view(),
- commit: consensus.commit(),
+ commit: consensus.commit_max(),
status: consensus.status(),
role,
}
@@ -322,11 +324,20 @@ impl ControlActionLogEvent {
op: Some(op),
commit: Some(commit),
},
- VsrAction::SendPrepareOk { target, op, .. } => Self {
+ VsrAction::SendPrepareOk {
+ target, from_op, ..
+ } => Self {
replica,
action: ControlActionKind::SendPrepareOk,
target_replica: Some(target),
- op: Some(op),
+ op: Some(from_op),
+ commit: None,
+ },
+ VsrAction::RetransmitPrepares { .. } => Self {
+ replica,
+ action: ControlActionKind::SendPrepare,
+ target_replica: None,
+ op: None,
commit: None,
},
}
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 443682f8f..f980f73f9 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client_table::{Notify, RequestStatus};
use crate::{
Consensus, IgnoreReason, Pipeline, PipelineEntry, PlaneKind,
PrepareOkOutcome, Sequencer,
Status, VsrConsensus,
@@ -28,6 +29,45 @@ use std::ops::AsyncFnOnce;
// TODO: Rework all of those helpers, once the boundaries are more clear and
we have a better picture of the commonalities between all of the planes.
+/// Shared request preflight: duplicate detection + pending registration.
+///
+/// Returns `Some(Notify)` if the request is new and should proceed through
+/// consensus. Returns `None` if the request was already handled (duplicate
+/// reply sent, in-progress, or stale), the caller should return early.
+#[allow(clippy::future_not_send)]
+pub async fn request_preflight<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ client_id: u128,
+ request: u64,
+) -> Option<Notify>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Entry = PipelineEntry>,
+{
+ let status = consensus
+ .client_table()
+ .borrow()
+ .check_request(client_id, request);
+ match status {
+ RequestStatus::Duplicate(cached_reply) => {
+ // Best-effort resend, client may have disconnected.
+ let _ = consensus
+ .message_bus()
+ .send_to_client(client_id, cached_reply.into_generic())
+ .await;
+ None
+ }
+ RequestStatus::InProgress | RequestStatus::Stale => None,
+ RequestStatus::New => {
+ let notify = consensus
+ .client_table()
+ .borrow_mut()
+ .register_pending(client_id, request);
+ Some(notify)
+ }
+ }
+}
+
/// Shared pipeline-first request flow used by metadata and partitions.
///
/// # Panics
@@ -54,6 +94,11 @@ pub async fn pipeline_prepare_common<C, F>(
}
/// Shared commit-based old-prepare fence.
+///
+/// Uses `commit_min` (locally executed), not `commit_max`. A backup may know
+/// that op 50 is committed (`commit_max = 50`) but only have executed up to
+/// op 14 (`commit_min = 14`). A retransmitted prepare for op 15 must NOT be
+/// fenced out, the backup still needs it in the WAL for `commit_journal`.
#[must_use]
pub const fn fence_old_prepare_by_commit<B, P>(
consensus: &VsrConsensus<B, P>,
@@ -63,14 +108,14 @@ where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
P: Pipeline<Entry = PipelineEntry>,
{
- header.op <= consensus.commit()
+ header.op <= consensus.commit_min()
}
/// Shared chain-replication forwarding to the next replica.
///
/// # Panics
/// - If `header.command` is not `Command2::Prepare`.
-/// - If `header.op <= consensus.commit()`.
+/// - If `header.op <= consensus.commit_min()`.
/// - If the computed next replica equals self.
/// - If the message bus send fails.
#[allow(clippy::future_not_send)]
@@ -85,7 +130,7 @@ where
let header = *message.header();
assert_eq!(header.command, Command2::Prepare);
- assert!(header.op > consensus.commit());
+ assert!(header.op > consensus.commit_min());
let next = (consensus.replica() + 1) % consensus.replica_count();
let primary = consensus.primary_index(header.view);
@@ -142,7 +187,7 @@ where
}
if consensus.is_follower() {
- consensus.advance_commit_number(header.commit);
+ consensus.advance_commit_max(header.commit);
}
Ok(current_op)
@@ -194,7 +239,7 @@ where
}
let pipeline = consensus.pipeline().borrow();
- let mut new_commit = consensus.commit();
+ let mut new_commit = consensus.commit_max();
while let Some(entry) = pipeline.entry_by_op(new_commit + 1) {
if !entry.ok_quorum_received {
break;
@@ -203,8 +248,8 @@ where
}
drop(pipeline);
- if new_commit > consensus.commit() {
- consensus.advance_commit_number(new_commit);
+ if new_commit > consensus.commit_max() {
+ consensus.advance_commit_max(new_commit);
return true;
}
@@ -223,7 +268,7 @@ where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
P: Pipeline<Entry = PipelineEntry>,
{
- let commit = consensus.commit();
+ let commit = consensus.commit_max();
let mut drained = Vec::new();
let mut pipeline = consensus.pipeline().borrow_mut();
@@ -273,8 +318,11 @@ where
reserved_frame: [0; 66],
request_checksum: prepare_header.request_checksum,
context: 0,
+ client: prepare_header.client,
op: prepare_header.op,
- commit: consensus.commit(),
+ // Use the prepare's op, not commit_max. This value drives eviction
+ // ordering in ClientTable, it must be deterministic across replicas.
+ commit: prepare_header.op,
timestamp: prepare_header.timestamp,
request: prepare_header.request,
operation: prepare_header.operation,
@@ -354,7 +402,7 @@ pub async fn send_prepare_ok<B, P>(
replica: consensus.replica(),
view: consensus.view(),
op: header.op,
- commit: consensus.commit(),
+ commit: consensus.commit_max(),
timestamp: header.timestamp,
parent: header.parent,
prepare_checksum: header.checksum,
@@ -673,7 +721,7 @@ mod tests {
consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(2,
10, 20));
consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(3,
20, 30));
- consensus.advance_commit_number(3);
+ consensus.advance_commit_max(3);
let drained = drain_committable_prefix(&consensus);
let drained_ops: Vec<_> = drained.into_iter().map(|entry|
entry.header.op).collect();
@@ -690,7 +738,7 @@ mod tests {
consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(6,
50, 60));
consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(7,
60, 70));
- consensus.advance_commit_number(6);
+ consensus.advance_commit_max(6);
let drained = drain_committable_prefix(&consensus);
let drained_ops: Vec<_> = drained.into_iter().map(|entry|
entry.header.op).collect();
diff --git a/core/iobuf/src/lib.rs b/core/iobuf/src/lib.rs
index 0e57c7b79..4e1355457 100644
--- a/core/iobuf/src/lib.rs
+++ b/core/iobuf/src/lib.rs
@@ -23,7 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering, fence};
use aligned_vec::{AVec, ConstAlign};
use compio_buf::IoBuf;
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct Owned<const ALIGN: usize = 4096> {
inner: AVec<u8, ConstAlign<ALIGN>>,
}
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 9c610f187..629a2e4ec 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -19,7 +19,7 @@ use std::io;
use std::ops::{Deref, RangeInclusive};
pub mod file_storage;
-pub mod metadata_journal;
+pub mod prepare_journal;
pub trait Journal<S>
where
diff --git a/core/journal/src/metadata_journal.rs
b/core/journal/src/prepare_journal.rs
similarity index 95%
rename from core/journal/src/metadata_journal.rs
rename to core/journal/src/prepare_journal.rs
index 5520e4c8d..8f332ba2a 100644
--- a/core/journal/src/metadata_journal.rs
+++ b/core/journal/src/prepare_journal.rs
@@ -78,14 +78,14 @@ impl From<io::Error> for JournalError {
}
}
-/// Persistent metadata journal backed by an append-only WAL file.
+/// Persistent prepare journal backed by an append-only WAL file.
///
/// Each WAL entry is a raw `Message<PrepareHeader>`:
/// `[PrepareHeader: 256 bytes][body: header.size - 256 bytes]`
///
/// The in-memory index is a fixed-size slot array indexed by
/// `op % SLOT_COUNT`.
-pub struct MetadataJournal {
+pub struct PrepareJournal {
/// File-backed append-only WAL.
storage: FileStorage,
/// In-memory slot array of entry headers, indexed by `op % SLOT_COUNT`.
@@ -103,9 +103,9 @@ pub struct MetadataJournal {
snapshot_op: Cell<u64>,
}
-impl fmt::Debug for MetadataJournal {
+impl fmt::Debug for PrepareJournal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("MetadataJournal")
+ f.debug_struct("PrepareJournal")
.field("write_offset", &self.storage.file_len())
.field("last_op", &self.last_op.get())
.finish_non_exhaustive()
@@ -118,7 +118,7 @@ const fn slot_for_op(op: u64) -> usize {
}
#[allow(clippy::cast_possible_truncation)]
-impl MetadataJournal {
+impl PrepareJournal {
/// Open the WAL file, scanning forward to rebuild the in-memory index.
///
/// `snapshot_op` is the highest op that has been durably snapshotted.
@@ -273,7 +273,7 @@ impl MetadataJournal {
clippy::cast_sign_loss,
clippy::future_not_send
)]
-impl Journal<FileStorage> for MetadataJournal {
+impl Journal<FileStorage> for PrepareJournal {
type Header = PrepareHeader;
type Entry = Message<PrepareHeader>;
type HeaderRef<'a> = Ref<'a, PrepareHeader>;
@@ -460,7 +460,7 @@ impl Journal<FileStorage> for MetadataJournal {
}
}
-impl JournalHandle for MetadataJournal {
+impl JournalHandle for PrepareJournal {
type Storage = FileStorage;
type Target = Self;
@@ -500,7 +500,7 @@ mod tests {
async fn open_empty_wal() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
assert!(journal.last_op().is_none());
assert!(journal.header(0).is_none());
@@ -510,7 +510,7 @@ mod tests {
async fn append_and_read() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
let msg1 = make_prepare(1, 64);
let msg2 = make_prepare(2, 32);
@@ -538,7 +538,7 @@ mod tests {
let path = dir.path().join("journal.wal");
{
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
journal.append(make_prepare(1, 64)).await.unwrap();
journal.append(make_prepare(2, 128)).await.unwrap();
journal.append(make_prepare(3, 32)).await.unwrap();
@@ -546,7 +546,7 @@ mod tests {
}
// Reopen and verify index is rebuilt
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
assert_eq!(journal.last_op(), Some(3));
for op in 1..=3 {
@@ -563,7 +563,7 @@ mod tests {
let path = dir.path().join("journal.wal");
{
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
journal.append(make_prepare(1, 64)).await.unwrap();
journal.append(make_prepare(2, 128)).await.unwrap();
journal.storage.fsync().await.unwrap();
@@ -579,7 +579,7 @@ mod tests {
}
// Reopen, should recover only the first entry
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
assert_eq!(journal.last_op(), Some(1));
assert!(journal.header(2).is_none());
@@ -592,7 +592,7 @@ mod tests {
async fn iter_headers_from() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
journal.append(make_prepare(1, 32)).await.unwrap();
journal.append(make_prepare(2, 32)).await.unwrap();
@@ -617,7 +617,7 @@ mod tests {
async fn previous_header_navigation() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
journal.append(make_prepare(0, 32)).await.unwrap();
journal.append(make_prepare(1, 32)).await.unwrap();
@@ -632,11 +632,11 @@ mod tests {
async fn slot_wraparound_evicts_snapshotted_entry() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
// Op 3 goes to slot 3
journal.append(make_prepare(3, 32)).await.unwrap();
- // Mark op 3 as snapshotted — safe to evict
+ // Mark op 3 as snapshotted - safe to evict
journal.set_snapshot_op(3);
// Op 3 + SLOT_COUNT goes to the same slot, evicting op 3
let wraparound_op = 3 + SLOT_COUNT as u64;
@@ -656,7 +656,7 @@ mod tests {
async fn drain_shrinks_wal_and_preserves_live_entries() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
// Append 5 entries
for op in 1..=5 {
@@ -696,7 +696,7 @@ mod tests {
// Reopen and verify the drained WAL is valid
drop(journal);
- let journal = MetadataJournal::open(&path, 3).await.unwrap();
+ let journal = PrepareJournal::open(&path, 3).await.unwrap();
assert_eq!(journal.last_op(), Some(5));
for op in 4..=5 {
let h = *journal.header(op as usize).unwrap();
@@ -711,7 +711,7 @@ mod tests {
async fn append_panics_on_evicting_unsnapshotted_entry() {
let dir = tempdir().unwrap();
let path = dir.path().join("journal.wal");
- let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ let journal = PrepareJournal::open(&path, 0).await.unwrap();
journal.append(make_prepare(3, 32)).await.unwrap();
// No snapshot taken, evicting op 3 must panic
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 484135f19..e0f65fc0d 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -21,7 +21,7 @@ use consensus::{
ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus,
ack_preflight,
ack_quorum_reached, build_reply_message, drain_committable_prefix,
emit_sim_event,
fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view,
- pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+ pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
request_preflight,
send_prepare_ok as send_prepare_ok_common,
};
use iggy_binary_protocol::{
@@ -291,6 +291,29 @@ where
{
async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
let consensus = self.consensus.as_ref().unwrap();
+ let client_id = message.header().client;
+ let request = message.header().request;
+
+ // TODO: Add a bounded request queue instead of dropping here.
+ // When the prepare queue (8 max) is full, buffer
+ // incoming requests in a request queue. On commit, pop the next
request
+ // from the request queue and begin preparing it. Only drop when both
+ // queues are full.
+ if consensus.pipeline().borrow().is_full() {
+ warn!(
+ target: "iggy.metadata.diag",
+ plane = "metadata",
+ replica_id = consensus.replica(),
+ client = client_id,
+ request = request,
+ "on_request: pipeline full, dropping request"
+ );
+ return;
+ }
+
+ let Some(_notify) = request_preflight(consensus, client_id,
request).await else {
+ return;
+ };
emit_sim_event(
SimEventKind::ClientRequestReceived,
@@ -342,7 +365,7 @@ where
replica_id = consensus.replica(),
view = consensus.view(),
op = header.op,
- commit = consensus.commit(),
+ commit = consensus.commit_max(),
operation = ?header.operation,
"received old prepare, skipping replication"
);
@@ -355,9 +378,18 @@ where
// TODO handle gap in ops.
+ // Verify hash chain integrity BEFORE checkpoint.
`checkpoint_if_needed`
+ // can drain WAL entries, making previous_header return None.
+ if let Some(previous) = journal.handle().previous_header(&header) {
+ panic_if_hash_chain_would_break_in_same_view(&previous, &header);
+ }
+
// Force a checkpoint if the journal is running low on capacity.
if let Some(coordinator) = &self.coordinator {
- let snap_op = consensus.commit();
+ // Use commit_min (locally executed), not commit_max. WAL entries
+ // between commit_min+1 and commit_max haven't been applied to the
+ // state machine yet, draining them would lose data on crash.
+ let snap_op = consensus.commit_min();
match coordinator
.checkpoint_if_needed(&self.mux_stm, journal, snap_op)
.await
@@ -386,18 +418,25 @@ where
}
}
- // Verify hash chain integrity.
- if let Some(previous) = journal.handle().previous_header(&header) {
- panic_if_hash_chain_would_break_in_same_view(&previous, &header);
+ // TODO: Restore hard assert_eq!(header.op, current_op + 1) once
message repair
+ // is implemented. Without repair, the network can deliver prepares
out of order
+ // and the replica has no way to request the missing ones.
+ if header.op != current_op + 1 {
+ warn!(
+ target: "iggy.metadata.diag",
+ plane = "metadata",
+ replica_id = consensus.replica(),
+ op = header.op,
+ expected = current_op + 1,
+ "on_replicate: dropping out-of-order prepare (gap)"
+ );
+ return;
}
- assert_eq!(header.op, current_op + 1);
-
- consensus.sequencer().set_sequence(header.op);
- consensus.set_last_prepare_checksum(header.checksum);
-
- // Append to journal.
- if let Err(e) = journal.handle().append(message).await {
+ // Append to journal first. Sequencer and checksum are updated AFTER
+ // successful append so a failed write doesn't leave consensus state
+ // pointing at a phantom entry.
+ if let Err(e) = journal.handle().append(message.clone()).await {
error!(
target: "iggy.metadata.diag",
plane = "metadata",
@@ -410,15 +449,19 @@ where
return;
}
+ consensus.sequencer().set_sequence(header.op);
+ consensus.set_last_prepare_checksum(header.checksum);
+
// After successful journal write, send prepare_ok to primary.
self.send_prepare_ok(&header).await;
// If follower, commit any newly committable entries.
if consensus.is_follower() {
- self.commit_journal();
+ self.commit_journal().await;
}
}
+ #[allow(clippy::too_many_lines)]
async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareOkHeader>) {
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
@@ -494,18 +537,15 @@ where
)
});
+ // Committed ops must be infallible — if the state machine
cannot
+ // apply a committed op, replicas will diverge.
let response =
self.mux_stm.update(prepare).unwrap_or_else(|err| {
- warn!(
- target: "iggy.metadata.diag",
- plane = "metadata",
- replica_id = consensus.replica(),
- op = prepare_header.op,
- operation = ?prepare_header.operation,
- error = %err,
- "state machine update failed for committed metadata
entry"
+ panic!(
+ "on_ack: committed metadata op={} failed to apply:
{err}",
+ prepare_header.op
);
- bytes::Bytes::new()
});
+ consensus.advance_commit_min(prepare_header.op);
let pipeline_depth = consensus.pipeline().borrow().len();
let event = CommitLogEvent {
replica: ReplicaLogContext::from_consensus(consensus,
PlaneKind::Metadata),
@@ -517,17 +557,27 @@ where
};
emit_sim_event(SimEventKind::OperationCommitted, &event);
- let generic_reply =
- build_reply_message(consensus, &prepare_header,
response).into_generic();
+ let reply = build_reply_message(consensus, &prepare_header,
response);
+ // Cache reply for duplicate detection:
+ consensus
+ .client_table()
+ .borrow_mut()
+ .commit_reply(prepare_header.client, reply.clone());
+
+ let generic_reply = reply.into_generic();
let reply_buffers = freeze_client_reply(generic_reply);
emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
- // TODO: Propagate send error instead of panicking; requires
bus error design.
- consensus
+ if let Err(e) = consensus
.message_bus()
.send_to_client(prepare_header.client, reply_buffers)
.await
- .unwrap();
+ {
+ warn!(
+ "on_ack: failed to send reply to client={}: {e}",
+ prepare_header.client
+ );
+ }
}
}
}
@@ -559,7 +609,11 @@ where
P: Pipeline<Entry = PipelineEntry>,
J: JournalHandle,
J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
/// Replicate a prepare message to the next replica in the chain.
///
@@ -588,11 +642,46 @@ where
// TODO: Implement jump_to_newer_op
// fn jump_to_newer_op(&self, header: &PrepareHeader) {}
- #[allow(clippy::unused_self)]
- const fn commit_journal(&self) {
- // TODO: Implement commit logic
- // Walk through journal from last committed to current commit number
- // Apply each entry to the state machine
+ /// Walk ops from `commit_min+1` to `commit_max`, applying the state
machine
+ /// and updating the client table for each.
+ ///
+ /// The backup does NOT send replies to clients, only the primary does
that.
+ #[allow(clippy::cast_possible_truncation)]
+ #[allow(clippy::future_not_send)]
+ async fn commit_journal(&self) {
+ let consensus = self.consensus.as_ref().unwrap();
+ let journal = self.journal.as_ref().unwrap();
+
+ while consensus.commit_min() < consensus.commit_max() {
+ let op = consensus.commit_min() + 1;
+
+ let Some(header) = journal.handle().header(op as usize) else {
+ // TODO: Implement message repair: request missing prepare from
+ // primary or other replicas. Until then, the backup stalls
here.
+ break;
+ };
+ let header = *header;
+
+ let Some(prepare) = journal.handle().entry(&header).await else {
+ warn!("commit_journal: prepare body missing for op={op},
stopping");
+ break;
+ };
+
+ // Committed ops must be infallible (see on_ack comment).
+ let response = self.mux_stm.update(prepare).unwrap_or_else(|err| {
+ panic!("commit_journal: committed metadata op={op} failed to
apply: {err}");
+ });
+
+ consensus.advance_commit_min(op);
+
+ let reply = build_reply_message(consensus, &header, response);
+ consensus
+ .client_table()
+ .borrow_mut()
+ .commit_reply(header.client, reply);
+
+ debug!("commit_journal: committed op={op}");
+ }
}
#[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
diff --git a/core/metadata/src/impls/recovery.rs
b/core/metadata/src/impls/recovery.rs
index d4f01dc6a..4d5606e4c 100644
--- a/core/metadata/src/impls/recovery.rs
+++ b/core/metadata/src/impls/recovery.rs
@@ -21,7 +21,7 @@ use crate::stm::snapshot::{MetadataSnapshot, RestoreSnapshot,
Snapshot, Snapshot
use iggy_binary_protocol::consensus::PrepareHeader;
use iggy_binary_protocol::consensus::message::Message;
use iggy_common::IggyError;
-use journal::metadata_journal::{JournalError, MetadataJournal};
+use journal::prepare_journal::{JournalError, PrepareJournal};
use std::fmt;
use std::path::Path;
@@ -82,7 +82,7 @@ impl From<std::io::Error> for RecoveryError {
/// Result of a successful metadata recovery.
pub struct RecoveredMetadata<M> {
- pub journal: MetadataJournal,
+ pub journal: PrepareJournal,
pub snapshot: IggySnapshot,
pub mux_stm: M,
/// `None` means no snapshot existed and no journal entries were replayed.
@@ -125,7 +125,7 @@ where
// 3. Open journal, scan the WAL and build index
let journal_path = metadata_dir.join("journal.wal");
- let journal = MetadataJournal::open(&journal_path,
snapshot.sequence_number()).await?;
+ let journal = PrepareJournal::open(&journal_path,
snapshot.sequence_number()).await?;
// 4. Replay journal entries after snapshot
let headers_to_replay = journal.iter_headers_from(replay_from);
@@ -214,7 +214,7 @@ mod tests {
std::fs::create_dir_all(&metadata_dir).unwrap();
{
- let journal =
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+ let journal =
PrepareJournal::open(&metadata_dir.join("journal.wal"), 0)
.await
.unwrap();
journal.append(make_prepare(1, 32)).await.unwrap();
@@ -242,7 +242,7 @@ mod tests {
// WAL has ops 1-10
{
- let journal =
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+ let journal =
PrepareJournal::open(&metadata_dir.join("journal.wal"), 0)
.await
.unwrap();
for op in 1..=10 {
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 5d9d3d601..7134c84ff 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -29,9 +29,10 @@ use consensus::PlaneIdentity;
use consensus::{
CommitLogEvent, Consensus, NamespacedPipeline, Pipeline, PipelineEntry,
Plane, PlaneKind,
Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind,
VsrConsensus,
- ack_preflight, build_reply_message, emit_namespace_progress_event,
emit_sim_event,
- fence_old_prepare_by_commit, pipeline_prepare_common, replicate_preflight,
- replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
+ ack_preflight, ack_quorum_reached, build_reply_message,
drain_committable_prefix,
+ emit_namespace_progress_event, emit_sim_event, fence_old_prepare_by_commit,
+ pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
request_preflight,
+ send_prepare_ok as send_prepare_ok_common,
};
use iggy_binary_protocol::{
Command2, ConsensusHeader, GenericHeader, Message, Operation,
PrepareHeader, PrepareOkHeader,
@@ -43,7 +44,7 @@ use iggy_common::{
sharding::{IggyNamespace, LocalIdx, ShardId},
};
use iobuf::Frozen;
-use journal::Journal as _;
+use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use std::cell::UnsafeCell;
use std::collections::{HashMap, HashSet};
@@ -61,7 +62,7 @@ use tracing::{debug, warn};
/// For example, shard 0 might have `partition_ids` [0, 2, 4] while shard 1
/// has `partition_ids` [1, 3, 5]. The `LocalIdx` provides the actual index
/// into the `partitions` Vec.
-pub struct IggyPartitions<C> {
+pub struct IggyPartitions<C, J = ()> {
shard_id: ShardId,
config: PartitionsConfig,
/// Collection of partitions, the index of each partition isn't it's ID,
but rather an local index (`LocalIdx`) which is used for lookups.
@@ -77,13 +78,19 @@ pub struct IggyPartitions<C> {
partitions: UnsafeCell<Vec<IggyPartition>>,
namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
consensus: Option<C>,
+ /// Consensus-level WAL for prepare messages.
+ ///
+ /// Stores full `Message<PrepareHeader>` keyed by op. Operations are
applied
+ /// at commit time (not replicate time) by reading from this WAL in
+ /// `commit_journal` (backup) and `on_ack` (primary).
+ journal: Option<J>,
}
const fn freeze_client_reply(message: Message<GenericHeader>) ->
Message<GenericHeader> {
message
}
-impl<C> IggyPartitions<C> {
+impl<C, J> IggyPartitions<C, J> {
#[must_use]
pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
Self {
@@ -92,6 +99,7 @@ impl<C> IggyPartitions<C> {
partitions: UnsafeCell::new(Vec::new()),
namespace_to_local: HashMap::new(),
consensus: None,
+ journal: None,
}
}
@@ -103,9 +111,18 @@ impl<C> IggyPartitions<C> {
partitions: UnsafeCell::new(Vec::with_capacity(capacity)),
namespace_to_local: HashMap::with_capacity(capacity),
consensus: None,
+ journal: None,
}
}
+ pub fn set_journal(&mut self, journal: J) {
+ self.journal = Some(journal);
+ }
+
+ pub const fn journal(&self) -> Option<&J> {
+ self.journal.as_ref()
+ }
+
pub const fn config(&self) -> &PartitionsConfig {
&self.config
}
@@ -389,15 +406,36 @@ impl<C> IggyPartitions<C> {
}
}
-impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>>
+impl<B, J> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>, J>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ J: JournalHandle,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
{
async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
let namespace = IggyNamespace::from_raw(message.header().namespace);
let consensus = self
.consensus()
.expect("on_request: consensus not initialized");
+ let client_id = message.header().client;
+ let request = message.header().request;
+
+ // TODO: Add a bounded request queue instead of dropping here.
+ // When the prepare queue (8 max) is full, buffer
+ // incoming requests in a request queue. On commit, pop the next
request
+ // from the request queue and begin preparing it. Only drop when both
+ // queues are full.
+ if consensus.pipeline().borrow().is_full() {
+ warn!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ replica_id = consensus.replica(),
+ client = client_id,
+ request = request,
+ "on_request: pipeline full, dropping request"
+ );
+ return;
+ }
emit_sim_event(
SimEventKind::ClientRequestReceived,
@@ -408,6 +446,7 @@ where
operation: message.header().operation,
},
);
+
let message = if message.header().operation == Operation::SendMessages
{
match convert_request_message(namespace, message) {
Ok(message) => message,
@@ -427,6 +466,11 @@ where
} else {
message
};
+
+ let Some(_notify) = request_preflight(consensus, client_id,
request).await else {
+ return;
+ };
+
let prepare = message.project(consensus);
pipeline_prepare_common(consensus, PlaneKind::Partitions, prepare,
|prepare| {
self.on_replicate(prepare)
@@ -436,7 +480,6 @@ where
async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareHeader>) {
let header = *message.header();
- let namespace = IggyNamespace::from_raw(header.namespace);
let consensus = self
.consensus()
.expect("on_replicate: consensus not initialized");
@@ -467,7 +510,7 @@ where
replica_id = consensus.replica(),
view = consensus.view(),
op = header.op,
- commit = consensus.commit(),
+ commit = consensus.commit_max(),
namespace_raw = header.namespace,
operation = ?header.operation,
"received old prepare, skipping replication"
@@ -475,26 +518,41 @@ where
return;
}
- // TODO: Figure out the flow of the partition operations.
- // In metadata layer we assume that when an `on_request` or
`on_replicate` is called, it's called from correct shard.
- // I think we need to do the same here, which means that the code from
below is unfallable, the partition should always exist by now!
- let message = self.replicate(message).await;
- if let Err(error) = self.apply_replicated_operation(&namespace,
message).await {
+ // TODO: Restore hard assert_eq!(header.op, current_op + 1) once
message repair
+ // is implemented. Without repair, the network can deliver prepares
out of order
+ // and the replica has no way to request the missing ones.
+ if header.op != current_op + 1 {
+ // Out-of-order prepare: the network delivered a future op before
+ // we received the preceding ones. Drop it and rely on primary
+ // retransmission to re-send both this op and the missing ones.
warn!(
target: "iggy.partitions.diag",
plane = "partitions",
replica_id = consensus.replica(),
op = header.op,
- namespace_raw = namespace.inner(),
- operation = ?header.operation,
- %error,
- "failed to apply replicated partition operation"
+ expected = current_op + 1,
+ "on_replicate: dropping out-of-order prepare (gap)"
+ );
+ return;
+ }
+
+ // Chain-replicate to the next replica before local journal append.
+ let message = self.replicate(message).await;
+
+ let journal = self
+ .journal
+ .as_ref()
+ .expect("on_replicate: journal not initialized");
+ if let Err(e) = journal.handle().append(message).await {
+ warn!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ replica_id = consensus.replica(),
+ "on_replicate: WAL append failed for op={}: {e}", header.op
);
return;
}
- // TODO: Make those assertions be toggleable through a feature flag,
so they can be used only by simulator/tests.
- debug_assert_eq!(header.op, current_op + 1);
consensus.sequencer().set_sequence(header.op);
consensus.set_last_prepare_checksum(header.checksum);
emit_namespace_progress_event(
@@ -505,8 +563,13 @@ where
);
self.send_prepare_ok(&header).await;
+
+ if consensus.is_follower() {
+ self.commit_journal().await;
+ }
}
+ #[allow(clippy::too_many_lines)]
async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareOkHeader>) {
let header = message.header();
let consensus = self.consensus().expect("on_ack: consensus not
initialized");
@@ -542,50 +605,111 @@ where
}
}
- consensus.handle_prepare_ok(PlaneKind::Partitions, header);
-
- // SAFETY(IGGY-66): Per-namespace drain independent of global commit.
- //
- // drain_committable_all() drains each namespace queue independently by
- // quorum flag, so ns_a ops can be drained and replied to clients while
- // ns_b ops block the global commit (e.g., ns_a ops 1,3 drain while
- // ns_b op 2 is pending). This is intentional for partition
independence.
- //
- // View change risk: if a view change occurs before the global commit
- // covers a drained op, the new primary replays from max_commit+1 and
- // re-executes it. append_messages is NOT idempotent -- re-execution
- // produces duplicate partition data.
- //
- // Before this path handles real traffic, two guards are required:
- // 1. Op-based dedup in apply_replicated_operation: skip append if
- // the partition journal already contains data for this op.
- // 2. Client reply dedup by (client_id, request_id): prevent
- // duplicate replies after view change re-execution.
- let drained = {
- let mut pipeline = consensus.pipeline().borrow_mut();
- pipeline.drain_committable_all()
- };
+ if !ack_quorum_reached(consensus, PlaneKind::Partitions, header) {
+ return;
+ }
+
+ let drained = drain_committable_prefix(consensus);
if drained.is_empty() {
return;
}
- self.handle_committed_entries(consensus, drained).await;
+ if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
+ debug!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ replica_id = consensus.replica(),
+ first_op = first.header.op,
+ last_op = last.header.op,
+ drained_count = drained.len(),
+ "draining committed partition ops"
+ );
+ }
- let pipeline = consensus.pipeline().borrow();
- let new_commit = pipeline.global_commit_frontier(consensus.commit());
- drop(pipeline);
- consensus.advance_commit_number(new_commit);
- emit_namespace_progress_event(
- SimEventKind::NamespaceProgressUpdated,
- &ReplicaLogContext::from_consensus(consensus,
PlaneKind::Partitions),
- new_commit,
- consensus.pipeline().borrow().len(),
- );
+ let journal = self
+ .journal
+ .as_ref()
+ .expect("on_ack: journal not initialized");
+ for PipelineEntry {
+ header: prepare_header,
+ ..
+ } in drained
+ {
+ let entry_namespace =
IggyNamespace::from_raw(prepare_header.namespace);
+
+ // Read full message from WAL and apply operation at commit time.
+ let prepare = journal
+ .handle()
+ .entry(&prepare_header)
+ .await
+ .unwrap_or_else(|| {
+ panic!(
+ "on_ack: committed prepare op={} must be in WAL",
+ prepare_header.op
+ )
+ });
+ // Committed ops must be infallible — if the state machine cannot
+ // apply a committed op, replicas will diverge.
+ if let Err(e) = self
+ .apply_replicated_operation(&entry_namespace, prepare)
+ .await
+ {
+ panic!(
+ "on_ack: committed op={} failed to apply: {e}",
+ prepare_header.op
+ );
+ }
+
+ // Flush after every op, each apply appends new data to the
+ // partition journal that must be durable before advancing
commit_min.
+ if let Err(e) = self.commit_messages(&entry_namespace).await {
+ panic!(
+ "on_ack: committed op={} failed to flush: {e}",
+ prepare_header.op
+ );
+ }
+
+ consensus.advance_commit_min(prepare_header.op);
+
+ let pipeline_depth = consensus.pipeline().borrow().len();
+ let event = CommitLogEvent {
+ replica: ReplicaLogContext::from_consensus(consensus,
PlaneKind::Partitions),
+ op: prepare_header.op,
+ client_id: prepare_header.client,
+ request_id: prepare_header.request,
+ operation: prepare_header.operation,
+ pipeline_depth,
+ };
+ emit_sim_event(SimEventKind::OperationCommitted, &event);
+
+ let reply = build_reply_message(consensus, &prepare_header,
bytes::Bytes::new());
+ consensus
+ .client_table()
+ .borrow_mut()
+ .commit_reply(prepare_header.client, reply.clone());
+
+ let reply_buffers = freeze_client_reply(reply.into_generic());
+ emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
+
+ if let Err(e) = consensus
+ .message_bus()
+ .send_to_client(prepare_header.client, reply_buffers)
+ .await
+ {
+ warn!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ client = prepare_header.client,
+ op = prepare_header.op,
+ "on_ack: failed to send reply to client: {e}"
+ );
+ }
+ }
}
}
-impl<B> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>>
+impl<B, J> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>, J>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
@@ -601,9 +725,11 @@ where
}
}
-impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
+impl<B, J> IggyPartitions<VsrConsensus<B, NamespacedPipeline>, J>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ J: JournalHandle,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
{
/// # Panics
/// Panics if consensus is not initialized.
@@ -751,6 +877,56 @@ where
replicate_to_next_in_chain(consensus, message).await
}
+ /// Walk ops from `commit_min+1` to `commit_max`, applying operations and
+ /// updating the client table for each.
+ ///
+ /// The backup does NOT send replies to clients, only the primary does
that.
+ #[allow(clippy::cast_possible_truncation)]
+ async fn commit_journal(&self) {
+ let consensus = self
+ .consensus()
+ .expect("commit_journal: consensus not initialized");
+ let journal = self
+ .journal
+ .as_ref()
+ .expect("commit_journal: journal not initialized");
+
+ while consensus.commit_min() < consensus.commit_max() {
+ let op = consensus.commit_min() + 1;
+
+ let Some(header) = journal.handle().header(op as usize) else {
+ // TODO: Implement message repair: request missing prepare from
+ // primary or other replicas. Until then, the backup stalls
here.
+ break;
+ };
+ let header = *header;
+
+ let Some(prepare) = journal.handle().entry(&header).await else {
+ warn!("commit_journal: prepare body missing for op={op},
stopping");
+ break;
+ };
+
+ let ns = IggyNamespace::from_raw(header.namespace);
+
+ // Committed ops must be infallible (see on_ack comment).
+ if let Err(e) = self.apply_replicated_operation(&ns,
prepare).await {
+ panic!("commit_journal: committed op={op} failed to apply:
{e}");
+ }
+ if let Err(e) = self.commit_messages(&ns).await {
+ panic!("commit_journal: committed op={op} failed to flush:
{e}");
+ }
+
+ consensus.advance_commit_min(op);
+
+ let reply = build_reply_message(consensus, &header,
bytes::Bytes::new());
+ consensus
+ .client_table()
+ .borrow_mut()
+ .commit_reply(header.client, reply);
+ debug!("commit_journal: committed op={op}");
+ }
+ }
+
/// Persist prepared messages to segment storage and advance the durable
frontier.
///
/// Updates segment metadata, stats, flushes journal to disk if thresholds
@@ -941,9 +1117,14 @@ where
pipeline_depth,
);
- let generic_reply =
- build_reply_message(consensus, &prepare_header,
bytes::Bytes::new()).into_generic();
- let reply_buffers = freeze_client_reply(generic_reply);
+ let reply = build_reply_message(consensus, &prepare_header,
bytes::Bytes::new());
+ // Cache reply for duplicate detection:
+ consensus
+ .client_table()
+ .borrow_mut()
+ .commit_reply(prepare_header.client, reply.clone());
+
+ let reply_buffers = freeze_client_reply(reply.into_generic());
emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
if let Err(error) = consensus
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 3e17f4ba2..a77fd3742 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -18,7 +18,9 @@
mod router;
pub mod shards_table;
-use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane,
VsrConsensus};
+use consensus::{
+ MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, PlaneKind,
Sequencer, VsrConsensus,
+};
use iggy_binary_protocol::{
GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader,
RequestHeader,
};
@@ -31,10 +33,12 @@ use metadata::stm::StateMachine;
use partitions::IggyPartitions;
use shards_table::ShardsTable;
-pub type ShardPlane<B, J, S, M> = MuxPlane<
+// MJ - Metadata Journal
+// PJ - Partitions Journal
+pub type ShardPlane<B, MJ, S, M, PJ> = MuxPlane<
variadic!(
- IggyMetadata<VsrConsensus<B>, J, S, M>,
- IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
+ IggyMetadata<VsrConsensus<B>, MJ, S, M>,
+ IggyPartitions<VsrConsensus<B, NamespacedPipeline>, PJ>
),
>;
@@ -89,13 +93,13 @@ impl<R: Send + 'static> ShardFrame<R> {
}
}
-pub struct IggyShard<B, J, S, M, T = (), R: Send + 'static = ()>
+pub struct IggyShard<B, MJ, S, M, PJ = (), T = (), R: Send + 'static = ()>
where
B: MessageBus,
{
pub id: u16,
pub name: String,
- pub plane: ShardPlane<B, J, S, M>,
+ pub plane: ShardPlane<B, MJ, S, M, PJ>,
/// Channel senders to every shard, indexed by shard id.
/// Includes a sender to self so that local routing goes through the
@@ -110,7 +114,7 @@ where
shards_table: T,
}
-impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
+impl<B, MJ, S, M, PJ, T, R: Send + 'static> IggyShard<B, MJ, S, M, PJ, T, R>
where
B: MessageBus,
T: ShardsTable,
@@ -124,8 +128,8 @@ where
pub const fn new(
id: u16,
name: String,
- metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
- partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+ metadata: IggyMetadata<VsrConsensus<B>, MJ, S, M>,
+ partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>, PJ>,
senders: Vec<Sender<ShardFrame<R>>>,
inbox: Receiver<ShardFrame<R>>,
shards_table: T,
@@ -149,8 +153,8 @@ where
pub fn without_inbox(
id: u16,
name: String,
- metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
- partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+ metadata: IggyMetadata<VsrConsensus<B>, MJ, S, M>,
+ partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>, PJ>,
shards_table: T,
) -> Self {
// TODO: previously we used unbounded channel with flume,
@@ -176,7 +180,7 @@ where
/// Local message processing — these methods handle messages that have been
/// routed to this shard via the message pump.
-impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
+impl<B, MJ, S, M, PJ, T, R: Send + 'static> IggyShard<B, MJ, S, M, PJ, T, R>
where
B: MessageBus,
{
@@ -188,9 +192,15 @@ where
pub async fn on_message(&self, message: Message<GenericHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
@@ -214,9 +224,15 @@ where
pub async fn on_request(&self, request: Message<RequestHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
@@ -233,9 +249,15 @@ where
pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
@@ -252,9 +274,15 @@ where
pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
@@ -282,9 +310,15 @@ where
pub async fn process_loopback(&self, buf: &mut
Vec<Message<GenericHeader>>) -> usize
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
@@ -333,9 +367,60 @@ where
Data =
iggy_binary_protocol::Message<iggy_binary_protocol::GenericHeader>,
Client = u128,
>,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
{
let partitions = self.plane.partitions_mut();
partitions.init_partition_in_memory(namespace);
partitions.register_namespace_in_pipeline(namespace.inner());
}
+
+ /// Tick the partitions consensus and dispatch any resulting actions.
+ ///
+ /// Currently handles `RetransmitPrepares` (primary retransmits
+ /// uncommitted prepares when the prepare timeout fires).
+ #[allow(clippy::future_not_send)]
+ pub async fn tick_partitions(&self)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ {
+ let partitions = self.plane.partitions();
+ let Some(consensus) = partitions.consensus() else {
+ return;
+ };
+ let Some(journal) = partitions.journal() else {
+ return;
+ };
+
+ let current_op = consensus.sequencer().current_sequence();
+ let current_commit = consensus.commit_min();
+ let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
+
+ for action in actions {
+ if let consensus::VsrAction::RetransmitPrepares { targets } =
action {
+ for (header, replicas) in targets {
+ let Some(prepare) = journal.handle().entry(&header).await
else {
+ continue;
+ };
+ for replica in replicas {
+ let _ = consensus
+ .message_bus()
+ .send_to_replica(replica,
prepare.clone().into_generic())
+ .await;
+ }
+ }
+ }
+ // TODO: Dispatch other VsrActions (view change messages, etc.)
+ }
+ }
}
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 8965a6a05..177cddaa0 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -30,7 +30,7 @@ use metadata::stm::StateMachine;
/// through the channel into the target shard's message pump. This ensures
/// that every mutation on a shard is serialized through a single point (the
/// pump), preventing concurrent access from independent async tasks.
-impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
+impl<B, MJ, S, M, PJ, T, R> IggyShard<B, MJ, S, M, PJ, T, R>
where
B: MessageBus,
T: ShardsTable,
@@ -144,9 +144,15 @@ where
pub async fn run_message_pump(&self, stop: Receiver<()>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
@@ -178,9 +184,15 @@ where
async fn process_frame(&self, frame: ShardFrame<R>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
- J: JournalHandle,
- <J as JournalHandle>::Target: Journal<
- <J as JournalHandle>::Storage,
+ MJ: JournalHandle,
+ <MJ as JournalHandle>::Target: Journal<
+ <MJ as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ PJ: JournalHandle,
+ <PJ as JournalHandle>::Target: Journal<
+ <PJ as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index a6a6b09cb..d9d9029a3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -226,6 +226,17 @@ impl Simulator {
self.crashed.contains(&replica_index)
}
+ /// Advance consensus timeouts and dispatch actions on every replica.
+ ///
+ /// Call this once per simulation tick. Handles prepare retransmission
+ /// (and eventually view-change messages) via the timeout system.
+ #[allow(clippy::future_not_send)]
+ pub async fn tick(&self) {
+ for replica in &self.replicas {
+ replica.tick_partitions().await;
+ }
+ }
+
/// Poll messages directly from a replica's partition.
///
/// # Errors
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 730e0af18..c9373d3ef 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -33,8 +33,13 @@ const CLUSTER_ID: u128 = 1;
// For now there is only one shard per replica,
// we will add support for multiple shards per replica in the future.
-pub type Replica =
- shard::IggyShard<SharedSimOutbox, SimJournal<MemStorage>, SimSnapshot,
SimMuxStateMachine>;
+pub type Replica = shard::IggyShard<
+ SharedSimOutbox,
+ SimJournal<MemStorage>, // MJ: metadata journal
+ SimSnapshot,
+ SimMuxStateMachine,
+ SimJournal<MemStorage>, // PJ: partitions journal
+>;
pub fn new_replica(id: u8, name: String, bus: &Arc<SimOutbox>, replica_count:
u8) -> Replica {
let users: Users = UsersInner::new().into();
@@ -69,6 +74,7 @@ pub fn new_replica(id: u8, name: String, bus:
&Arc<SimOutbox>, replica_count: u8
};
let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)),
partitions_config);
+ partitions.set_journal(SimJournal::<MemStorage>::default());
// TODO: namespace=0 collides with metadata consensus. Safe for now
because the simulator
// routes by Operation type, but a shared view change bus would produce
namespace collisions.