This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 52b198637 feat(consensus): add ClientTable with WAL-backed commit path 
and view-change safety (#3023)
52b198637 is described below

commit 52b19863732551c2f5087f8915a919d9f44abfa0
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Apr 7 17:15:20 2026 +0530

    feat(consensus): add ClientTable with WAL-backed commit path and 
view-change safety (#3023)
---
 core/binary_protocol/src/consensus/header.rs       |   8 +-
 core/binary_protocol/src/consensus/message.rs      |  14 +-
 core/consensus/src/client_table.rs                 | 577 +++++++++++++++++++++
 core/consensus/src/impls.rs                        | 221 ++++++--
 core/consensus/src/lib.rs                          |   3 +
 core/consensus/src/namespaced_pipeline.rs          |  86 ++-
 core/consensus/src/observability.rs                |  17 +-
 core/consensus/src/plane_helpers.rs                |  72 ++-
 core/iobuf/src/lib.rs                              |   2 +-
 core/journal/src/lib.rs                            |   2 +-
 .../{metadata_journal.rs => prepare_journal.rs}    |  40 +-
 core/metadata/src/impls/metadata.rs                | 157 ++++--
 core/metadata/src/impls/recovery.rs                |  10 +-
 core/partitions/src/iggy_partitions.rs             | 299 ++++++++---
 core/shard/src/lib.rs                              | 139 ++++-
 core/shard/src/router.rs                           |  26 +-
 core/simulator/src/lib.rs                          |  11 +
 core/simulator/src/replica.rs                      |  10 +-
 18 files changed, 1469 insertions(+), 225 deletions(-)

diff --git a/core/binary_protocol/src/consensus/header.rs 
b/core/binary_protocol/src/consensus/header.rs
index 40f66b1e0..ee0969378 100644
--- a/core/binary_protocol/src/consensus/header.rs
+++ b/core/binary_protocol/src/consensus/header.rs
@@ -181,6 +181,7 @@ pub struct ReplyHeader {
 
     pub request_checksum: u128,
     pub context: u128,
+    pub client: u128,
     pub op: u64,
     pub commit: u64,
     pub timestamp: u64,
@@ -188,7 +189,7 @@ pub struct ReplyHeader {
     pub operation: Operation,
     pub operation_padding: [u8; 7],
     pub namespace: u64,
-    pub reserved: [u8; 48],
+    pub reserved: [u8; 32],
 }
 const _: () = {
     assert!(size_of::<ReplyHeader>() == HEADER_SIZE);
@@ -196,7 +197,7 @@ const _: () = {
         offset_of!(ReplyHeader, request_checksum)
             == offset_of!(ReplyHeader, reserved_frame) + size_of::<[u8; 66]>()
     );
-    assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 48]>() == 
HEADER_SIZE);
+    assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 32]>() == 
HEADER_SIZE);
 };
 
 impl Default for ReplyHeader {
@@ -213,6 +214,7 @@ impl Default for ReplyHeader {
             reserved_frame: [0; 66],
             request_checksum: 0,
             context: 0,
+            client: 0,
             op: 0,
             commit: 0,
             timestamp: 0,
@@ -220,7 +222,7 @@ impl Default for ReplyHeader {
             operation: Operation::Reserved,
             operation_padding: [0; 7],
             namespace: 0,
-            reserved: [0; 48],
+            reserved: [0; 32],
         }
     }
 }
diff --git a/core/binary_protocol/src/consensus/message.rs 
b/core/binary_protocol/src/consensus/message.rs
index 834394f33..50ae07ac5 100644
--- a/core/binary_protocol/src/consensus/message.rs
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -52,7 +52,7 @@ where
     fn fragments(&self) -> &[Frozen<MESSAGE_ALIGN>];
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RequestBacking {
     owned: Owned<MESSAGE_ALIGN>,
 }
@@ -362,6 +362,18 @@ where
     }
 }
 
+impl<H> Clone for Message<H, RequestBacking>
+where
+    H: ConsensusHeader,
+{
+    fn clone(&self) -> Self {
+        Self {
+            backing: self.backing.clone(),
+            _marker: PhantomData,
+        }
+    }
+}
+
 impl<H> Clone for Message<H, ResponseBacking>
 where
     H: ConsensusHeader,
diff --git a/core/consensus/src/client_table.rs 
b/core/consensus/src/client_table.rs
new file mode 100644
index 000000000..620951fa0
--- /dev/null
+++ b/core/consensus/src/client_table.rs
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+    pub client_id: u128,
+    pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+    waker: RefCell<Option<Waker>>,
+    notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+    inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+    /// Create a new `Notify` in the un-notified state.
+    #[must_use]
+    pub fn new() -> Self {
+        Self {
+            inner: Rc::new(NotifyInner {
+                waker: RefCell::new(None),
+                notified: std::cell::Cell::new(false),
+            }),
+        }
+    }
+
+    /// Wake the waiter, if any. If `notified()` is polled later, it will
+    /// resolve immediately.
+    pub fn notify(&self) {
+        self.inner.notified.set(true);
+        if let Some(waker) = self.inner.waker.borrow_mut().take() {
+            waker.wake();
+        }
+    }
+
+    /// Returns a future that resolves when [`notify()`](Self::notify) is 
called.
+    ///
+    /// If `notify()` was already called before this future is polled, it
+    /// resolves immediately (permit is consumed).
+    #[allow(clippy::future_not_send)]
+    pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+        std::future::poll_fn(move |cx| {
+            if self.inner.notified.get() {
+                self.inner.notified.set(false);
+                std::task::Poll::Ready(())
+            } else {
+                *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+                std::task::Poll::Pending
+            }
+        })
+    }
+}
+
+impl Default for Notify {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+    /// The cached reply for the client's latest committed request (header + 
body).
+    pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the client table.
+#[derive(Debug)]
+pub enum RequestStatus {
+    /// Request not seen before, proceed with consensus.
+    New,
+    /// Exact request already committed, re-send cached reply.
+    Duplicate(Message<ReplyHeader>),
+    /// Request is in the pipeline awaiting commit, drop (client should wait).
+    InProgress,
+    /// Request number is older than the client's latest committed request.
+    /// Already handled in a prior commit cycle, drop silently.
+    Stale,
+}
+
+/// VSR client-table: tracks per-client request state for duplicate detection,
+/// reply caching, and async commit notification.
+///
+/// Uses a fixed-size slot array as the source of truth, with a `HashMap`
+/// as a secondary index for O(1) lookups by client ID.
+///
+/// ## Committed state (`slots` + `index`)
+///
+/// Always contains a valid `ClientEntry` with a non-optional reply.
+/// Updated by `commit_reply` when a request commits through consensus.
+///
+/// ## Pending state (`pending`)
+///
+/// Tracks in-flight requests that have been accepted for consensus but not yet
+/// committed. Each entry holds a [`Notify`] that is fired when the 
corresponding
+/// `commit_reply` arrives. Keyed by `ClientRequest` to support future
+/// request pipelining (currently at most one per client).
+///
+/// The `pending` map is local notification state not replicated, not
+/// serialized, not part of the deterministic committed state.
+///
+/// ## Known gaps
+///
+/// - **Message repair**: If a backup never received a prepare (lost message),
+///   `commit_journal` stops at the gap. The client table will be missing
+///   entries for ops beyond the gap until the message repair protocol is
+///   implemented and the missing prepare is retransmitted.
+///
+/// - **Checkpoint serialization**: The slot array is laid out for 
deterministic
+///   encode/decode to disk, but serialization is not yet implemented.
+#[derive(Debug)]
+pub struct ClientTable {
+    /// `None` means the slot is free.
+    /// Deterministic iteration order for eviction and serialization.
+    slots: Vec<Option<ClientEntry>>,
+    /// Secondary index: `client_id` → slot index. Rebuilt on decode.
+    index: HashMap<u128, usize>,
+    /// Pending commit waiters, keyed by `(client_id, request)`.
+    /// Keyed by request number (not just client) to support future pipelining.
+    /// Currently at most one per client.
+    pending: HashMap<ClientRequest, Notify>,
+}
+
+impl ClientTable {
+    #[must_use]
+    pub fn new(max_clients: usize) -> Self {
+        let mut slots = Vec::with_capacity(max_clients);
+        slots.resize_with(max_clients, || None);
+        Self {
+            slots,
+            index: HashMap::with_capacity(max_clients),
+            pending: HashMap::new(),
+        }
+    }
+
+    /// Check a request against the table.
+    ///
+    /// Returns:
+    /// - [`RequestStatus::New`]: not seen before, proceed with consensus
+    /// - [`RequestStatus::Duplicate`]: already committed, re-send cached reply
+    /// - [`RequestStatus::InProgress`]: in the pipeline awaiting commit
+    /// - [`RequestStatus::Stale`]: older than the client's latest committed 
request
+    ///
+    /// # Panics
+    /// Panics if the internal index points to an empty slot (invariant 
violation).
+    #[must_use]
+    pub fn check_request(&self, client_id: u128, request: u64) -> 
RequestStatus {
+        // TODO: Once client sessions are added (register/evict protocol like
+        // validate client_id at the session layer instead of
+        // panicking here. Unregistered or invalid clients should be rejected
+        // gracefully at ingress, not inside the client table.
+        assert!(client_id != 0, "client_id 0 is reserved for internal use");
+
+        // Check if already pending in the pipeline.
+        let key = ClientRequest { client_id, request };
+        if self.pending.contains_key(&key) {
+            return RequestStatus::InProgress;
+        }
+
+        let Some(&slot_idx) = self.index.get(&client_id) else {
+            return RequestStatus::New;
+        };
+        let entry = self.slots[slot_idx].as_ref().expect("index/slot 
mismatch");
+        let committed_request = entry.reply.header().request;
+
+        if request < committed_request {
+            return RequestStatus::Stale;
+        }
+        if request == committed_request {
+            return RequestStatus::Duplicate(entry.reply.clone());
+        }
+
+        RequestStatus::New
+    }
+
+    /// Register interest in a pending request's commit.
+    ///
+    /// Returns a [`Notify`] the caller can `.notified().await` on. The 
`Notify`
+    /// is cloned via `Rc`, so the caller can hold it across `.await` points
+    /// without borrowing the `ClientTable`.
+    ///
+    /// Called after `check_request` returns `New`, before submitting the 
request
+    /// to the consensus pipeline.
+    ///
+    /// # Panics
+    /// Panics if there is already a pending waiter for this `(client_id, 
request)`.
+    pub fn register_pending(&mut self, client_id: u128, request: u64) -> 
Notify {
+        let notify = Notify::new();
+        let key = ClientRequest { client_id, request };
+        let prev = self.pending.insert(key, notify.clone());
+        assert!(
+            prev.is_none(),
+            "client {client_id} request {request} already has a pending waiter"
+        );
+        notify
+    }
+
+    /// Record a committed reply and cache it.
+    ///
+    /// If the client already has a slot, updates it in place. Otherwise 
allocates
+    /// a free slot, evicting the client with the oldest commit number if the 
table
+    /// is full.
+    ///
+    /// Wakes the pending [`Notify`] for this `(client_id, request)` if one 
exists.
+    ///
+    /// Called in `on_ack` after `build_reply_message`.
+    ///
+    /// # Panics
+    /// Panics if the internal index points to an empty slot (invariant 
violation).
+    pub fn commit_reply(&mut self, client_id: u128, reply: 
Message<ReplyHeader>) {
+        assert!(client_id != 0, "client_id 0 is reserved for internal use");
+        assert_eq!(
+            client_id,
+            reply.header().client,
+            "commit_reply: client_id mismatch (arg={client_id}, header={})",
+            reply.header().client
+        );
+        let request = reply.header().request;
+
+        if let Some(&slot_idx) = self.index.get(&client_id) {
+            let slot = self.slots[slot_idx].as_mut().expect("index/slot 
mismatch");
+            // Monotonicity: both commit (op) and request must not regress.
+            assert!(
+                reply.header().commit >= slot.reply.header().commit,
+                "commit_reply: commit regression for client {client_id}: {} -> 
{}",
+                slot.reply.header().commit,
+                reply.header().commit
+            );
+            assert!(
+                reply.header().request >= slot.reply.header().request,
+                "commit_reply: request regression for client {client_id}: {} 
-> {}",
+                slot.reply.header().request,
+                reply.header().request
+            );
+            slot.reply = reply;
+        } else {
+            // Need a free slot. Evict if full.
+            if self.index.len() >= self.slots.len() {
+                self.evict_oldest();
+            }
+
+            let slot_idx = self.first_free_slot().expect("eviction must free a 
slot");
+            self.slots[slot_idx] = Some(ClientEntry { reply });
+            self.index.insert(client_id, slot_idx);
+        }
+
+        // Wake the waiter, if any.
+        let key = ClientRequest { client_id, request };
+        if let Some(notify) = self.pending.remove(&key) {
+            notify.notify();
+        }
+    }
+
+    /// Evict the client with the oldest commit number.
+    ///
+    /// Iterates the fixed-size slot array (deterministic order), so all 
replicas
+    /// with the same committed state evict the same client. Ties on commit 
number
+    /// are broken by slot index (lowest index wins), which is also 
deterministic.
+    ///
+    /// **Dedup caveat**: until checkpoint serialization is implemented, 
eviction
+    /// breaks at-most-once semantics for the evicted client — a retransmission
+    /// after eviction will be treated as `New` and re-executed.
+    fn evict_oldest(&mut self) {
+        let mut evictee: Option<(usize, u64)> = None; // (slot_idx, commit)
+
+        for (idx, slot) in self.slots.iter().enumerate() {
+            if let Some(entry) = slot {
+                let commit = entry.reply.header().commit;
+                let should_evict = match evictee {
+                    None => true,
+                    Some((_, min_commit)) => commit < min_commit,
+                };
+                if should_evict {
+                    evictee = Some((idx, commit));
+                }
+            }
+        }
+
+        if let Some((slot_idx, _)) = evictee {
+            let entry = self.slots[slot_idx].take().expect("evictee must 
exist");
+            self.index.remove(&entry.reply.header().client);
+        }
+    }
+
+    /// Find the first free slot in the array.
+    fn first_free_slot(&self) -> Option<usize> {
+        self.slots.iter().position(Option::is_none)
+    }
+
+    /// Get the cached reply for a client (for duplicate re-sends).
+    #[must_use]
+    pub fn get_reply(&self, client_id: u128) -> Option<&Message<ReplyHeader>> {
+        let &slot_idx = self.index.get(&client_id)?;
+        self.slots[slot_idx].as_ref().map(|entry| &entry.reply)
+    }
+
+    /// Number of active committed client entries.
+    #[must_use]
+    pub fn count(&self) -> usize {
+        self.index.len()
+    }
+
+    /// Number of pending (in-flight) requests.
+    #[must_use]
+    pub fn pending_count(&self) -> usize {
+        self.pending.len()
+    }
+
+    /// Clear all pending entries (e.g. during view change).
+    ///
+    /// Stale pending entries from a previous view must not survive into the
+    /// new view - `check_request` would return `InProgress` for the orphaned
+    /// keys, silently dropping valid client retries.
+    pub fn clear_pending(&mut self) {
+        self.pending.clear();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_binary_protocol::{Command2, Operation};
+
+    fn make_reply_for(client: u128, request: u64, commit: u64) -> 
Message<ReplyHeader> {
+        let header_size = std::mem::size_of::<ReplyHeader>();
+        let mut msg = Message::<ReplyHeader>::new(header_size);
+        let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(
+            &mut msg.as_mut_slice()[..header_size],
+        )
+        .expect("zeroed bytes are valid");
+        *header = ReplyHeader {
+            client,
+            request,
+            commit,
+            command: Command2::Reply,
+            operation: Operation::SendMessages,
+            ..ReplyHeader::default()
+        };
+        msg
+    }
+
+    fn make_reply(request: u64, commit: u64) -> Message<ReplyHeader> {
+        make_reply_for(1, request, commit)
+    }
+
+    // Notify tests
+
+    #[test]
+    fn notify_after_await_registration() {
+        let notify = Notify::new();
+        let waiter = notify.clone();
+
+        // Notify before anyone polls, should resolve immediately on first 
poll.
+        notify.notify();
+
+        let waker = futures::task::noop_waker();
+        let mut cx = std::task::Context::from_waker(&waker);
+        let mut fut = std::pin::pin!(waiter.notified());
+        assert!(fut.as_mut().poll(&mut cx).is_ready());
+    }
+
+    #[test]
+    fn notify_wakes_pending_poll() {
+        let notify = Notify::new();
+        let waiter = notify.clone();
+
+        let waker = futures::task::noop_waker();
+        let mut cx = std::task::Context::from_waker(&waker);
+        let mut fut = std::pin::pin!(waiter.notified());
+
+        // First poll returns Pending (not yet notified).
+        assert!(fut.as_mut().poll(&mut cx).is_pending());
+
+        // Notify fires.
+        notify.notify();
+
+        // Next poll resolves.
+        assert!(fut.as_mut().poll(&mut cx).is_ready());
+    }
+
+    #[test]
+    fn notify_consumed_after_ready() {
+        let notify = Notify::new();
+        notify.notify();
+
+        let waker = futures::task::noop_waker();
+        let mut cx = std::task::Context::from_waker(&waker);
+
+        // First notified() consumes the permit.
+        let mut fut1 = std::pin::pin!(notify.notified());
+        assert!(fut1.as_mut().poll(&mut cx).is_ready());
+
+        // Second notified() should be pending (permit consumed).
+        let mut fut2 = std::pin::pin!(notify.notified());
+        assert!(fut2.as_mut().poll(&mut cx).is_pending());
+    }
+
+    // ClientTable tests
+
+    #[test]
+    fn check_request_new() {
+        let table = ClientTable::new(10);
+        assert!(matches!(table.check_request(1, 1), RequestStatus::New));
+    }
+
+    #[test]
+    fn check_request_duplicate_after_commit() {
+        let mut table = ClientTable::new(10);
+        table.commit_reply(1, make_reply(1, 10));
+
+        match table.check_request(1, 1) {
+            RequestStatus::Duplicate(cached) => {
+                assert_eq!(cached.header().request, 1);
+            }
+            _ => panic!("expected Duplicate"),
+        }
+    }
+
+    #[test]
+    fn check_request_stale() {
+        let mut table = ClientTable::new(10);
+        table.commit_reply(1, make_reply(5, 10));
+
+        assert!(matches!(table.check_request(1, 3), RequestStatus::Stale));
+    }
+
+    #[test]
+    fn check_request_in_progress_while_pending() {
+        let mut table = ClientTable::new(10);
+        let _notify = table.register_pending(1, 1);
+
+        assert!(matches!(
+            table.check_request(1, 1),
+            RequestStatus::InProgress
+        ));
+    }
+
+    #[test]
+    fn commit_caches_reply() {
+        let mut table = ClientTable::new(10);
+        table.commit_reply(1, make_reply(1, 10));
+
+        let cached = table.get_reply(1).expect("should have cached reply");
+        assert_eq!(cached.header().request, 1);
+    }
+
+    #[test]
+    fn commit_updates_existing_entry() {
+        let mut table = ClientTable::new(10);
+        table.commit_reply(1, make_reply(1, 10));
+        table.commit_reply(1, make_reply(2, 20));
+
+        let cached = table.get_reply(1).expect("should have cached reply");
+        assert_eq!(cached.header().request, 2);
+        assert_eq!(table.count(), 1);
+    }
+
+    #[test]
+    fn register_and_commit_notifies() {
+        let mut table = ClientTable::new(10);
+        let notify = table.register_pending(1, 1);
+
+        assert_eq!(table.pending_count(), 1);
+
+        // Commit fires the notify.
+        table.commit_reply(1, make_reply(1, 10));
+
+        assert_eq!(table.pending_count(), 0);
+
+        let waker = futures::task::noop_waker();
+        let mut cx = std::task::Context::from_waker(&waker);
+        let mut fut = std::pin::pin!(notify.notified());
+        assert!(fut.as_mut().poll(&mut cx).is_ready());
+    }
+
+    #[test]
+    fn eviction_removes_oldest_commit() {
+        let mut table = ClientTable::new(2);
+
+        table.commit_reply(100, make_reply_for(100, 1, 10));
+        table.commit_reply(200, make_reply_for(200, 1, 20));
+        table.commit_reply(300, make_reply_for(300, 1, 30));
+
+        assert!(table.get_reply(100).is_none());
+        assert!(table.get_reply(200).is_some());
+        assert!(table.get_reply(300).is_some());
+        assert_eq!(table.count(), 2);
+    }
+
+    #[test]
+    fn eviction_is_deterministic_by_slot_index() {
+        let mut table = ClientTable::new(2);
+
+        table.commit_reply(100, make_reply_for(100, 1, 10));
+        table.commit_reply(200, make_reply_for(200, 1, 10));
+        table.commit_reply(300, make_reply_for(300, 1, 30));
+
+        assert!(table.get_reply(100).is_none());
+        assert!(table.get_reply(200).is_some());
+        assert!(table.get_reply(300).is_some());
+    }
+
+    #[test]
+    fn new_request_after_commit_is_new() {
+        let mut table = ClientTable::new(10);
+        table.commit_reply(1, make_reply(1, 10));
+
+        assert!(matches!(table.check_request(1, 2), RequestStatus::New));
+    }
+
+    #[test]
+    fn slot_reuse_after_eviction() {
+        let mut table = ClientTable::new(1);
+
+        table.commit_reply(100, make_reply_for(100, 1, 10));
+        table.commit_reply(200, make_reply_for(200, 1, 20));
+
+        assert!(table.get_reply(100).is_none());
+        assert!(table.get_reply(200).is_some());
+        assert_eq!(table.count(), 1);
+    }
+
+    #[test]
+    #[should_panic(expected = "already has a pending waiter")]
+    fn register_pending_twice_panics() {
+        let mut table = ClientTable::new(10);
+        let _n1 = table.register_pending(1, 1);
+        let _n2 = table.register_pending(1, 1);
+    }
+}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 7c4b27d04..55101854c 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client_table::ClientTable;
 use crate::vsr_timeout::{TimeoutKind, TimeoutManager};
 use crate::{
     AckLogEvent, Consensus, ControlActionLogEvent, DvcQuorumArray, 
IgnoreReason, Pipeline,
@@ -86,6 +87,10 @@ pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
 /// Maximum number of replicas in a cluster.
 pub const REPLICAS_MAX: usize = 32;
 
+/// Maximum number of clients tracked in the clients table.
+/// When exceeded, the client with the oldest committed request is evicted.
+pub const CLIENTS_TABLE_MAX: usize = 8192;
+
 #[derive(Debug)]
 pub struct PipelineEntry {
     pub header: PrepareHeader,
@@ -403,7 +408,7 @@ pub enum Status {
 }
 
 /// Actions to be taken by the caller after processing a VSR event.
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone)]
 pub enum VsrAction {
     /// Send `StartViewChange` to all replicas.
     SendStartViewChange { view: u32, namespace: u64 },
@@ -423,13 +428,26 @@ pub enum VsrAction {
         commit: u64,
         namespace: u64,
     },
-    /// Send `PrepareOK` to primary.
+    /// Send `PrepareOK` for each op in `[from_op, to_op]` that is present in 
the WAL.
+    ///
+    /// The caller MUST verify each op exists in the journal before sending.
+    /// Sending `PrepareOk` for a missing op is a safety violation, it can
+    /// cause the primary to commit an op without enough replicas holding the 
data.
     SendPrepareOk {
         view: u32,
-        op: u64,
+        from_op: u64,
+        to_op: u64,
         target: u8,
         namespace: u64,
     },
+    /// Retransmit uncommitted prepares from the WAL to replicas that haven't 
acked.
+    ///
+    /// Emitted when the primary's prepare timeout fires and there are
+    /// uncommitted entries in the pipeline. Each entry is a prepare header
+    /// (for journal lookup) and the list of replica IDs that need it.
+    RetransmitPrepares {
+        targets: Vec<(PrepareHeader, Vec<u8>)>,
+    },
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -477,7 +495,16 @@ where
     // * `replica.log_view = 0` when replica_count=1.
     log_view: Cell<u32>,
     status: Cell<Status>,
-    commit: Cell<u64>,
+
+    /// Highest op number that has been locally executed (state machine 
applied,
+    /// client table updated). Advances one-by-one in `commit_journal` (backup)
+    /// and `on_ack` (primary). On a normal primary, `commit_min == 
commit_max`.
+    commit_min: Cell<u64>,
+
+    /// Highest op number known to be committed by the cluster. Advances
+    /// immediately when the replica learns about commits (from prepare
+    /// messages, commit heartbeats, or view change messages).
+    commit_max: Cell<u64>,
 
     sequencer: LocalSequencer,
 
@@ -502,6 +529,9 @@ where
     sent_own_do_view_change: Cell<bool>,
 
     timeouts: RefCell<TimeoutManager>,
+
+    /// VSR client-table for duplicate detection and reply caching.
+    client_table: RefCell<ClientTable>,
 }
 
 impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
@@ -534,7 +564,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             log_view: Cell::new(0),
             status: Cell::new(Status::Recovering),
             sequencer: LocalSequencer::new(0),
-            commit: Cell::new(0),
+            commit_min: Cell::new(0),
+            commit_max: Cell::new(0),
             last_timestamp: Cell::new(0),
             last_prepare_checksum: Cell::new(0),
             pipeline: RefCell::new(pipeline),
@@ -546,6 +577,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             sent_own_start_view_change: Cell::new(false),
             sent_own_do_view_change: Cell::new(false),
             timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
+            client_table: RefCell::new(ClientTable::new(CLIENTS_TABLE_MAX)),
         }
     }
 
@@ -567,14 +599,38 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.primary_index(self.view.get()) == self.replica
     }
 
+    /// Advance `commit_max` - the highest op known to be committed by the 
cluster.
+    ///
+    /// Called when the replica learns about new commits from the primary
+    /// (via prepare messages, commit heartbeats, or view change messages).
+    ///
     /// # Panics
-    /// If the stored commit number somehow exceeds the given `commit` after 
update.
-    pub fn advance_commit_number(&self, commit: u64) {
-        if commit > self.commit.get() {
-            self.commit.set(commit);
+    /// If `commit_max` would be less than `commit_min` after the update
+    /// (invariant violation).
+    pub fn advance_commit_max(&self, commit: u64) {
+        if commit > self.commit_max.get() {
+            self.commit_max.set(commit);
         }
+        assert!(self.commit_max.get() >= self.commit_min.get());
+    }
 
-        assert!(self.commit.get() >= commit);
+    /// Advance `commit_min` - the highest op locally executed.
+    ///
+    /// Called after each op is applied through `commit_journal` (backup)
+    /// or `on_ack` (primary). Must advance sequentially (by 1).
+    ///
+    /// # Panics
+    /// - If `op` is not exactly `commit_min + 1` (must advance sequentially).
+    /// - If `commit_min` would exceed `commit_max` after the update.
+    pub fn advance_commit_min(&self, op: u64) {
+        assert_eq!(
+            op,
+            self.commit_min.get() + 1,
+            "commit_min must advance sequentially: expected {}, got {op}",
+            self.commit_min.get() + 1
+        );
+        self.commit_min.set(op);
+        assert!(self.commit_max.get() >= self.commit_min.get());
     }
 
     /// Maximum number of faulty replicas that can be tolerated.
@@ -590,9 +646,16 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.max_faulty() + 1
     }
 
+    /// Highest op locally executed (state machine applied, client table 
updated).
+    #[must_use]
+    pub const fn commit_min(&self) -> u64 {
+        self.commit_min.get()
+    }
+
+    /// Highest op known to be committed by the cluster.
     #[must_use]
-    pub const fn commit(&self) -> u64 {
-        self.commit.get()
+    pub const fn commit_max(&self) -> u64 {
+        self.commit_max.get()
     }
 
     #[must_use]
@@ -632,6 +695,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         &mut self.pipeline
     }
 
+    #[must_use]
+    pub const fn client_table(&self) -> &RefCell<ClientTable> {
+        &self.client_table
+    }
+
     #[must_use]
     pub const fn cluster(&self) -> u128 {
         self.cluster
@@ -706,6 +774,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.sent_own_start_view_change.set(false);
         self.sent_own_do_view_change.set(false);
         self.loopback_queue.borrow_mut().clear();
+        self.client_table.borrow_mut().clear_pending();
     }
 
     /// Process one tick. Call this periodically (e.g., every 10ms).
@@ -742,6 +811,12 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts = self.timeouts.borrow_mut();
         }
 
+        if timeouts.fired(TimeoutKind::Prepare) {
+            drop(timeouts);
+            actions.extend(self.handle_prepare_timeout());
+            timeouts = self.timeouts.borrow_mut();
+        }
+
         if timeouts.fired(TimeoutKind::ViewChangeStatus) {
             drop(timeouts);
             actions.extend(self.handle_view_change_status_timeout(plane));
@@ -919,6 +994,55 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         vec![action]
     }
 
+    /// Collect uncommitted pipeline entries that should be retransmitted.
+    ///
+    /// Returns `(PrepareHeader, Vec<u8>)` pairs: each op that hasn't reached
+    /// quorum paired with the replica IDs that haven't acked it.
+    fn retransmit_targets(&self) -> Vec<(PrepareHeader, Vec<u8>)> {
+        let pipeline = self.pipeline.borrow();
+        let current_op = self.sequencer.current_sequence();
+        let replica_count = self.replica_count;
+        let mut targets = Vec::new();
+
+        let mut op = self.commit_max() + 1;
+        while op <= current_op {
+            if let Some(entry) = pipeline.entry_by_op(op)
+                && !entry.ok_quorum_received
+            {
+                let missing: Vec<u8> = (0..replica_count).filter(|&r| 
!entry.has_ack(r)).collect();
+                if !missing.is_empty() {
+                    targets.push((entry.header, missing));
+                }
+            }
+            op += 1;
+        }
+
+        targets
+    }
+
+    /// Retransmit uncommitted prepares when the prepare timeout fires.
+    ///
+    /// Only acts on the primary in normal status with a non-empty pipeline.
+    /// Resets the timeout with backoff on each firing.
+    fn handle_prepare_timeout(&self) -> Vec<VsrAction> {
+        if !self.is_primary() || self.status.get() != Status::Normal {
+            return Vec::new();
+        }
+
+        if self.pipeline.borrow().is_empty() {
+            return Vec::new();
+        }
+
+        let targets = self.retransmit_targets();
+        if targets.is_empty() {
+            return Vec::new();
+        }
+
+        self.timeouts.borrow_mut().backoff(TimeoutKind::Prepare);
+
+        vec![VsrAction::RetransmitPrepares { targets }]
+    }
+
     /// Handle a received `StartViewChange` message.
     ///
     /// "When replica i receives STARTVIEWCHANGE messages for its view-number
@@ -1004,7 +1128,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
             let primary_candidate = self.primary_index(self.view.get());
             let current_op = self.sequencer.current_sequence();
-            let current_commit = self.commit.get();
+            // DVC uses commit_min: the replica's actual execution progress.
+            let current_commit = self.commit_min.get();
 
             // Start DVC timeout
             self.timeouts
@@ -1137,7 +1262,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }
 
         let current_op = self.sequencer.current_sequence();
-        let current_commit = self.commit.get();
+        // Use commit_min: the replica's actual execution progress.
+        let current_commit = self.commit_min.get();
 
         // If we haven't sent our own DVC yet, record it
         if !self.sent_own_do_view_change.get() {
@@ -1184,6 +1310,15 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     ///
     /// # Panics
     /// If `header.namespace` does not match this replica's namespace.
+    /// # Client-table maintenance
+    ///
+    /// Backups maintain the client-table during normal operation via
+    /// `commit_journal` in `on_replicate`, which walks the WAL and updates
+    /// the client table for each committed op. The WAL survives view changes,
+    /// so the new primary can process any committed op it received.
+    ///
+    /// Gap: if a backup never received a prepare (lost message),
+    /// `commit_journal` stops at the gap. Requires message repair.
     pub fn handle_start_view(&self, plane: PlaneKind, header: 
&StartViewHeader) -> Vec<VsrAction> {
         assert_eq!(header.namespace, self.namespace, "SV routed to wrong 
group");
         let from_replica = header.replica;
@@ -1210,13 +1345,19 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.view.set(msg_view);
         self.log_view.set(msg_view);
         self.status.set(Status::Normal);
-        self.advance_commit_number(msg_commit);
+        self.advance_commit_max(msg_commit);
         self.reset_view_change_state();
 
         // Stale pipeline entries from the old view must be discarded
         self.pipeline.borrow_mut().clear();
 
-        // Update our op to match the new primary's log
+        // TODO: TigerBeetle's StartView message carries uncommitted op 
headers,
+        // allowing the backup to install them into the WAL and set op to a
+        // WAL-verified value. We don't carry headers yet, so we blindly trust
+        // msg_op. This is correct for truncation (sequencer > msg_op) but 
wrong
+        // when the backup is behind (sequencer < msg_op) — the gap between the
+        // WAL and msg_op becomes unreachable without message repair. Fix by
+        // either carrying headers in StartView or implementing message repair.
         self.sequencer.set_sequence(msg_op);
 
         // Update timeouts for normal backup operation
@@ -1228,12 +1369,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts.start(TimeoutKind::NormalHeartbeat);
         }
 
-        // Send PrepareOK for uncommitted ops (commit+1 to op)
-        let mut actions = Vec::new();
-        for op_num in (msg_commit + 1)..=msg_op {
+        // Send PrepareOK for uncommitted ops that we actually have in the WAL.
+        // The caller must verify each op exists before sending.
+        emit_replica_event(
+            SimEventKind::ReplicaStateChanged,
+            &ReplicaLogContext::from_consensus(self, plane),
+        );
+
+        if msg_commit < msg_op {
             let action = VsrAction::SendPrepareOk {
                 view: msg_view,
-                op: op_num,
+                from_op: msg_commit + 1,
+                to_op: msg_op,
                 target: from_replica,
                 namespace: self.namespace,
             };
@@ -1244,18 +1391,22 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                     &action,
                 ),
             );
-            actions.push(action);
+            vec![action]
+        } else {
+            Vec::new()
         }
-
-        emit_replica_event(
-            SimEventKind::ReplicaStateChanged,
-            &ReplicaLogContext::from_consensus(self, plane),
-        );
-
-        actions
     }
 
     /// Complete view change as the new primary after collecting DVC quorum.
+    ///
+    /// # Client-table maintenance
+    ///
+    /// Backups populate the client-table during normal operation via
+    /// `commit_journal` in `on_replicate`. The WAL survives view changes, so
+    /// when this replica transitions from backup to primary, its table
+    /// contains entries for all committed ops it received.
+    ///
+    /// Gap: missing prepares (lost messages) require message repair.
     fn complete_view_change_as_primary(&self, plane: PlaneKind) -> 
Vec<VsrAction> {
         let dvc_array = self.do_view_change_from_all_replicas.borrow();
 
@@ -1269,7 +1420,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         // Update state
         self.log_view.set(self.view.get());
         self.status.set(Status::Normal);
-        self.advance_commit_number(max_commit);
+        self.advance_commit_max(max_commit);
         self.sequencer.set_sequence(new_op);
 
         // Stale pipeline entries from the old view are invalid in the new 
view.
@@ -1472,7 +1623,7 @@ where
                 parent: consensus.last_prepare_checksum(),
                 request_checksum: old.request_checksum,
                 request: old.request,
-                commit: consensus.commit.get(),
+                commit: consensus.commit_max.get(),
                 op,
                 timestamp: 0, // 0 for now. Implement correct way to get 
timestamp later
                 operation: old.operation,
@@ -1502,7 +1653,7 @@ where
                 // It's important to use the view of the replica, not the 
received prepare!
                 view: consensus.view.get(),
                 op: old.op,
-                commit: consensus.commit.get(),
+                commit: consensus.commit_max.get(),
                 timestamp: old.timestamp,
                 operation: old.operation,
                 namespace: old.namespace,
@@ -1554,6 +1705,14 @@ where
                 pipeline_depth,
             },
         );
+
+        // Start the prepare timeout so the primary retransmits if backups
+        // don't ack in time. It is only started (not reset) so that an
+        // already-ticking timeout is not pushed out by every new request.
+        let mut timeouts = self.timeouts.borrow_mut();
+        if !timeouts.is_ticking(TimeoutKind::Prepare) {
+            timeouts.start(TimeoutKind::Prepare);
+        }
     }
 
     fn verify_pipeline(&self) {
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 200ee87ad..61352ab02 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -103,6 +103,9 @@ where
         H: ConsensusHeader;
 }
 
+pub mod client_table;
+pub use client_table::ClientTable;
+
 mod impls;
 pub use impls::*;
 mod plane_mux;
diff --git a/core/consensus/src/namespaced_pipeline.rs 
b/core/consensus/src/namespaced_pipeline.rs
index 71e18fb71..0c2328733 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -20,23 +20,15 @@ use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX, 
PipelineEntry};
 use iggy_binary_protocol::{Message, PrepareHeader};
 use std::collections::{HashMap, VecDeque};
 
-/// Pipeline that partitions entries by namespace for independent commit 
draining.
+/// Pipeline that partitions entries by namespace.
 ///
 /// A single global op sequence and hash chain spans all namespaces, but 
entries
-/// are stored in per-namespace `VecDeques`. Each namespace tracks its own 
commit
-/// frontier so `drain_committable_all` drains quorum'd entries per-namespace
-/// without waiting for the global commit to advance past unrelated namespaces.
+/// are stored in per-namespace `VecDeque`s. Ops are committed strictly in 
global
+/// order via `drain_committable_prefix`, which drains the contiguous prefix of
+/// quorum'd ops regardless of namespace. This ensures `commit_min` advances
+/// sequentially.
 ///
-/// The global commit (on `VsrConsensus`) remains a conservative lower bound
-/// for the VSR protocol (view change, follower commit piggybacking). It only
-/// advances when all ops up to that point are drained. Per-namespace draining
-/// can run ahead of the global commit.
-///
-/// An alternative (simpler) approach would drain purely by per-entry quorum
-/// flag without tracking per-namespace commit numbers, relying solely on
-/// `global_commit_frontier` for the protocol commit. We track per-namespace
-/// commits explicitly for observability and to make the independence model
-/// visible in the data structure.
+/// Per-namespace commit frontiers (`ns_commits`) are tracked for 
observability.
 #[derive(Debug)]
 pub struct NamespacedPipeline {
     queues: HashMap<u64, VecDeque<PipelineEntry>>,
@@ -142,6 +134,58 @@ impl NamespacedPipeline {
         }
         commit
     }
+
+    /// Drain the contiguous prefix of committed ops in global op order.
+    ///
+    /// Starting from `commit + 1`, drains ops from their per-namespace queues
+    /// while each consecutive op has `ok_quorum_received == true`. Stops at 
the
+    /// first gap or non-quorum op, ensuring ops are committed strictly in 
order.
+    ///
+    /// Returns `(drained_entries, new_commit)` where `new_commit` is the 
highest
+    /// op drained (or `commit` if nothing was drained).
+    ///
+    /// # Panics
+    /// If internal queue state is inconsistent (namespace found in scan but
+    /// missing from map, or front entry disappears between lookup and pop).
+    pub fn drain_committable_prefix(&mut self, commit: u64) -> 
(Vec<PipelineEntry>, u64) {
+        let mut drained = Vec::new();
+        let mut new_commit = commit;
+        let mut op = commit + 1;
+
+        while op <= self.last_push_op {
+            // Find the entry across all namespace queues.
+            let ns = {
+                let mut found_ns = None;
+                for (&ns, queue) in &self.queues {
+                    if queue.front().is_some_and(|f| f.header.op == op) {
+                        found_ns = Some(ns);
+                        break;
+                    }
+                }
+                match found_ns {
+                    Some(ns) => ns,
+                    None => break, // op not at front of any queue - gap
+                }
+            };
+
+            let queue = self.queues.get_mut(&ns).expect("ns must exist");
+            let front = queue.front().expect("front must exist");
+            if !front.ok_quorum_received {
+                break; // not yet committed
+            }
+
+            let entry = queue.pop_front().expect("front exists");
+            self.total_count -= 1;
+            if let Some(ns_commit) = self.ns_commits.get_mut(&ns) {
+                *ns_commit = entry.header.op;
+            }
+            new_commit = entry.header.op;
+            drained.push(entry);
+            op += 1;
+        }
+
+        (drained, new_commit)
+    }
 }
 
 impl Pipeline for NamespacedPipeline {
@@ -211,6 +255,11 @@ impl Pipeline for NamespacedPipeline {
         for queue in self.queues.values_mut() {
             queue.clear();
         }
+        // Reset ns_commits, stale values from a previous view could mislead
+        // global_commit_frontier after a view change.
+        for commit in self.ns_commits.values_mut() {
+            *commit = 0;
+        }
         self.total_count = 0;
         self.last_push_checksum = 0;
         self.last_push_op = 0;
@@ -558,23 +607,22 @@ mod tests {
     }
 
     #[test]
-    fn clear_preserves_ns_commits() {
+    fn clear_resets_ns_commits() {
         let mut pipeline = NamespacedPipeline::new();
         pipeline.register_namespace(100);
         pipeline.register_namespace(200);
         pipeline.push_message(make_prepare(1, 0, 10, 100));
         pipeline.push_message(make_prepare(2, 10, 20, 200));
 
-        // Mark op 1 as committed in ns 100 before clearing
         pipeline.ns_commits.insert(100, 1);
 
         pipeline.clear();
         assert!(pipeline.is_empty());
         assert_eq!(pipeline.total_count, 0);
 
-        // ns_commits must survive clear -- they represent durable knowledge
-        // about already-drained ops, not pipeline state
-        assert_eq!(pipeline.ns_commits.get(&100), Some(&1));
+        // ns_commits must be reset on clear -- stale values from a previous
+        // view would mislead global_commit_frontier.
+        assert_eq!(pipeline.ns_commits.get(&100), Some(&0));
         assert_eq!(pipeline.ns_commits.get(&200), Some(&0));
     }
 }
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
index 865935faf..c4eaed89c 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -114,6 +114,7 @@ pub enum ControlActionKind {
     SendDoViewChange,
     SendStartView,
     SendPrepareOk,
+    SendPrepare,
 }
 
 impl ControlActionKind {
@@ -124,6 +125,7 @@ impl ControlActionKind {
             Self::SendDoViewChange => "send_do_view_change",
             Self::SendStartView => "send_start_view",
             Self::SendPrepareOk => "send_prepare_ok",
+            Self::SendPrepare => "send_prepare",
         }
     }
 }
@@ -230,7 +232,7 @@ impl ReplicaLogContext {
             namespace: NamespaceLogContext::from_raw(plane, 
consensus.namespace()),
             view: consensus.view(),
             log_view: consensus.log_view(),
-            commit: consensus.commit(),
+            commit: consensus.commit_max(),
             status: consensus.status(),
             role,
         }
@@ -322,11 +324,20 @@ impl ControlActionLogEvent {
                 op: Some(op),
                 commit: Some(commit),
             },
-            VsrAction::SendPrepareOk { target, op, .. } => Self {
+            VsrAction::SendPrepareOk {
+                target, from_op, ..
+            } => Self {
                 replica,
                 action: ControlActionKind::SendPrepareOk,
                 target_replica: Some(target),
-                op: Some(op),
+                op: Some(from_op),
+                commit: None,
+            },
+            VsrAction::RetransmitPrepares { .. } => Self {
+                replica,
+                action: ControlActionKind::SendPrepare,
+                target_replica: None,
+                op: None,
                 commit: None,
             },
         }
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 443682f8f..f980f73f9 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client_table::{Notify, RequestStatus};
 use crate::{
     Consensus, IgnoreReason, Pipeline, PipelineEntry, PlaneKind, 
PrepareOkOutcome, Sequencer,
     Status, VsrConsensus,
@@ -28,6 +29,45 @@ use std::ops::AsyncFnOnce;
 
 // TODO: Rework all of those helpers, once the boundaries are more clear and 
we have a better picture of the commonalities between all of the planes.
 
+/// Shared request preflight: duplicate detection + pending registration.
+///
+/// Returns `Some(Notify)` if the request is new and should proceed through
+/// consensus. Returns `None` if the request was already handled (duplicate
+/// reply sent, in-progress, or stale), the caller should return early.
+#[allow(clippy::future_not_send)]
+pub async fn request_preflight<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    client_id: u128,
+    request: u64,
+) -> Option<Notify>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Entry = PipelineEntry>,
+{
+    let status = consensus
+        .client_table()
+        .borrow()
+        .check_request(client_id, request);
+    match status {
+        RequestStatus::Duplicate(cached_reply) => {
+            // Best-effort resend, client may have disconnected.
+            let _ = consensus
+                .message_bus()
+                .send_to_client(client_id, cached_reply.into_generic())
+                .await;
+            None
+        }
+        RequestStatus::InProgress | RequestStatus::Stale => None,
+        RequestStatus::New => {
+            let notify = consensus
+                .client_table()
+                .borrow_mut()
+                .register_pending(client_id, request);
+            Some(notify)
+        }
+    }
+}
+
 /// Shared pipeline-first request flow used by metadata and partitions.
 ///
 /// # Panics
@@ -54,6 +94,11 @@ pub async fn pipeline_prepare_common<C, F>(
 }
 
 /// Shared commit-based old-prepare fence.
+///
+/// Uses `commit_min` (locally executed), not `commit_max`. A backup may know
+/// that op 50 is committed (`commit_max = 50`) but only have executed up to
+/// op 14 (`commit_min = 14`). A retransmitted prepare for op 15 must NOT be
+/// fenced out, the backup still needs it in the WAL for `commit_journal`.
 #[must_use]
 pub const fn fence_old_prepare_by_commit<B, P>(
     consensus: &VsrConsensus<B, P>,
@@ -63,14 +108,14 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Entry = PipelineEntry>,
 {
-    header.op <= consensus.commit()
+    header.op <= consensus.commit_min()
 }
 
 /// Shared chain-replication forwarding to the next replica.
 ///
 /// # Panics
 /// - If `header.command` is not `Command2::Prepare`.
-/// - If `header.op <= consensus.commit()`.
+/// - If `header.op <= consensus.commit_min()`.
 /// - If the computed next replica equals self.
 /// - If the message bus send fails.
 #[allow(clippy::future_not_send)]
@@ -85,7 +130,7 @@ where
     let header = *message.header();
 
     assert_eq!(header.command, Command2::Prepare);
-    assert!(header.op > consensus.commit());
+    assert!(header.op > consensus.commit_min());
 
     let next = (consensus.replica() + 1) % consensus.replica_count();
     let primary = consensus.primary_index(header.view);
@@ -142,7 +187,7 @@ where
     }
 
     if consensus.is_follower() {
-        consensus.advance_commit_number(header.commit);
+        consensus.advance_commit_max(header.commit);
     }
 
     Ok(current_op)
@@ -194,7 +239,7 @@ where
     }
 
     let pipeline = consensus.pipeline().borrow();
-    let mut new_commit = consensus.commit();
+    let mut new_commit = consensus.commit_max();
     while let Some(entry) = pipeline.entry_by_op(new_commit + 1) {
         if !entry.ok_quorum_received {
             break;
@@ -203,8 +248,8 @@ where
     }
     drop(pipeline);
 
-    if new_commit > consensus.commit() {
-        consensus.advance_commit_number(new_commit);
+    if new_commit > consensus.commit_max() {
+        consensus.advance_commit_max(new_commit);
         return true;
     }
 
@@ -223,7 +268,7 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Entry = PipelineEntry>,
 {
-    let commit = consensus.commit();
+    let commit = consensus.commit_max();
     let mut drained = Vec::new();
     let mut pipeline = consensus.pipeline().borrow_mut();
 
@@ -273,8 +318,11 @@ where
         reserved_frame: [0; 66],
         request_checksum: prepare_header.request_checksum,
         context: 0,
+        client: prepare_header.client,
         op: prepare_header.op,
-        commit: consensus.commit(),
+        // Use the prepare's op, not commit_max. This value drives eviction
+        // ordering in ClientTable, it must be deterministic across replicas.
+        commit: prepare_header.op,
         timestamp: prepare_header.timestamp,
         request: prepare_header.request,
         operation: prepare_header.operation,
@@ -354,7 +402,7 @@ pub async fn send_prepare_ok<B, P>(
         replica: consensus.replica(),
         view: consensus.view(),
         op: header.op,
-        commit: consensus.commit(),
+        commit: consensus.commit_max(),
         timestamp: header.timestamp,
         parent: header.parent,
         prepare_checksum: header.checksum,
@@ -673,7 +721,7 @@ mod tests {
         consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(2, 
10, 20));
         consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(3, 
20, 30));
 
-        consensus.advance_commit_number(3);
+        consensus.advance_commit_max(3);
 
         let drained = drain_committable_prefix(&consensus);
         let drained_ops: Vec<_> = drained.into_iter().map(|entry| 
entry.header.op).collect();
@@ -690,7 +738,7 @@ mod tests {
         consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(6, 
50, 60));
         consensus.pipeline_message(PlaneKind::Metadata, &prepare_message(7, 
60, 70));
 
-        consensus.advance_commit_number(6);
+        consensus.advance_commit_max(6);
         let drained = drain_committable_prefix(&consensus);
         let drained_ops: Vec<_> = drained.into_iter().map(|entry| 
entry.header.op).collect();
 
diff --git a/core/iobuf/src/lib.rs b/core/iobuf/src/lib.rs
index 0e57c7b79..4e1355457 100644
--- a/core/iobuf/src/lib.rs
+++ b/core/iobuf/src/lib.rs
@@ -23,7 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering, fence};
 use aligned_vec::{AVec, ConstAlign};
 use compio_buf::IoBuf;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Owned<const ALIGN: usize = 4096> {
     inner: AVec<u8, ConstAlign<ALIGN>>,
 }
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 9c610f187..629a2e4ec 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -19,7 +19,7 @@ use std::io;
 use std::ops::{Deref, RangeInclusive};
 
 pub mod file_storage;
-pub mod metadata_journal;
+pub mod prepare_journal;
 
 pub trait Journal<S>
 where
diff --git a/core/journal/src/metadata_journal.rs 
b/core/journal/src/prepare_journal.rs
similarity index 95%
rename from core/journal/src/metadata_journal.rs
rename to core/journal/src/prepare_journal.rs
index 5520e4c8d..8f332ba2a 100644
--- a/core/journal/src/metadata_journal.rs
+++ b/core/journal/src/prepare_journal.rs
@@ -78,14 +78,14 @@ impl From<io::Error> for JournalError {
     }
 }
 
-/// Persistent metadata journal backed by an append-only WAL file.
+/// Persistent prepare journal backed by an append-only WAL file.
 ///
 /// Each WAL entry is a raw `Message<PrepareHeader>`:
 /// `[PrepareHeader: 256 bytes][body: header.size - 256 bytes]`
 ///
 /// The in-memory index is a fixed-size slot array indexed by
 /// `op % SLOT_COUNT`.
-pub struct MetadataJournal {
+pub struct PrepareJournal {
     /// File-backed append-only WAL.
     storage: FileStorage,
     /// In-memory slot array of entry headers, indexed by `op % SLOT_COUNT`.
@@ -103,9 +103,9 @@ pub struct MetadataJournal {
     snapshot_op: Cell<u64>,
 }
 
-impl fmt::Debug for MetadataJournal {
+impl fmt::Debug for PrepareJournal {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("MetadataJournal")
+        f.debug_struct("PrepareJournal")
             .field("write_offset", &self.storage.file_len())
             .field("last_op", &self.last_op.get())
             .finish_non_exhaustive()
@@ -118,7 +118,7 @@ const fn slot_for_op(op: u64) -> usize {
 }
 
 #[allow(clippy::cast_possible_truncation)]
-impl MetadataJournal {
+impl PrepareJournal {
     /// Open the WAL file, scanning forward to rebuild the in-memory index.
     ///
     /// `snapshot_op` is the highest op that has been durably snapshotted.
@@ -273,7 +273,7 @@ impl MetadataJournal {
     clippy::cast_sign_loss,
     clippy::future_not_send
 )]
-impl Journal<FileStorage> for MetadataJournal {
+impl Journal<FileStorage> for PrepareJournal {
     type Header = PrepareHeader;
     type Entry = Message<PrepareHeader>;
     type HeaderRef<'a> = Ref<'a, PrepareHeader>;
@@ -460,7 +460,7 @@ impl Journal<FileStorage> for MetadataJournal {
     }
 }
 
-impl JournalHandle for MetadataJournal {
+impl JournalHandle for PrepareJournal {
     type Storage = FileStorage;
     type Target = Self;
 
@@ -500,7 +500,7 @@ mod tests {
     async fn open_empty_wal() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         assert!(journal.last_op().is_none());
         assert!(journal.header(0).is_none());
@@ -510,7 +510,7 @@ mod tests {
     async fn append_and_read() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         let msg1 = make_prepare(1, 64);
         let msg2 = make_prepare(2, 32);
@@ -538,7 +538,7 @@ mod tests {
         let path = dir.path().join("journal.wal");
 
         {
-            let journal = MetadataJournal::open(&path, 0).await.unwrap();
+            let journal = PrepareJournal::open(&path, 0).await.unwrap();
             journal.append(make_prepare(1, 64)).await.unwrap();
             journal.append(make_prepare(2, 128)).await.unwrap();
             journal.append(make_prepare(3, 32)).await.unwrap();
@@ -546,7 +546,7 @@ mod tests {
         }
 
         // Reopen and verify index is rebuilt
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
         assert_eq!(journal.last_op(), Some(3));
 
         for op in 1..=3 {
@@ -563,7 +563,7 @@ mod tests {
         let path = dir.path().join("journal.wal");
 
         {
-            let journal = MetadataJournal::open(&path, 0).await.unwrap();
+            let journal = PrepareJournal::open(&path, 0).await.unwrap();
             journal.append(make_prepare(1, 64)).await.unwrap();
             journal.append(make_prepare(2, 128)).await.unwrap();
             journal.storage.fsync().await.unwrap();
@@ -579,7 +579,7 @@ mod tests {
         }
 
         // Reopen, should recover only the first entry
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
         assert_eq!(journal.last_op(), Some(1));
         assert!(journal.header(2).is_none());
 
@@ -592,7 +592,7 @@ mod tests {
     async fn iter_headers_from() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         journal.append(make_prepare(1, 32)).await.unwrap();
         journal.append(make_prepare(2, 32)).await.unwrap();
@@ -617,7 +617,7 @@ mod tests {
     async fn previous_header_navigation() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         journal.append(make_prepare(0, 32)).await.unwrap();
         journal.append(make_prepare(1, 32)).await.unwrap();
@@ -632,11 +632,11 @@ mod tests {
     async fn slot_wraparound_evicts_snapshotted_entry() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         // Op 3 goes to slot 3
         journal.append(make_prepare(3, 32)).await.unwrap();
-        // Mark op 3 as snapshotted — safe to evict
+        // Mark op 3 as snapshotted - safe to evict
         journal.set_snapshot_op(3);
         // Op 3 + SLOT_COUNT goes to the same slot, evicting op 3
         let wraparound_op = 3 + SLOT_COUNT as u64;
@@ -656,7 +656,7 @@ mod tests {
     async fn drain_shrinks_wal_and_preserves_live_entries() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         // Append 5 entries
         for op in 1..=5 {
@@ -696,7 +696,7 @@ mod tests {
 
         // Reopen and verify the drained WAL is valid
         drop(journal);
-        let journal = MetadataJournal::open(&path, 3).await.unwrap();
+        let journal = PrepareJournal::open(&path, 3).await.unwrap();
         assert_eq!(journal.last_op(), Some(5));
         for op in 4..=5 {
             let h = *journal.header(op as usize).unwrap();
@@ -711,7 +711,7 @@ mod tests {
     async fn append_panics_on_evicting_unsnapshotted_entry() {
         let dir = tempdir().unwrap();
         let path = dir.path().join("journal.wal");
-        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        let journal = PrepareJournal::open(&path, 0).await.unwrap();
 
         journal.append(make_prepare(3, 32)).await.unwrap();
         // No snapshot taken, evicting op 3 must panic
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 484135f19..e0f65fc0d 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -21,7 +21,7 @@ use consensus::{
     ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, 
ack_preflight,
     ack_quorum_reached, build_reply_message, drain_committable_prefix, 
emit_sim_event,
     fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view,
-    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, 
request_preflight,
     send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_binary_protocol::{
@@ -291,6 +291,29 @@ where
 {
     async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
+        let client_id = message.header().client;
+        let request = message.header().request;
+
+        // TODO: Add a bounded request queue instead of dropping here.
+        // When the prepare queue (8 max) is full, buffer
+        // incoming requests in a request queue. On commit, pop the next 
request
+        // from the request queue and begin preparing it. Only drop when both
+        // queues are full.
+        if consensus.pipeline().borrow().is_full() {
+            warn!(
+                target: "iggy.metadata.diag",
+                plane = "metadata",
+                replica_id = consensus.replica(),
+                client = client_id,
+                request = request,
+                "on_request: pipeline full, dropping request"
+            );
+            return;
+        }
+
+        let Some(_notify) = request_preflight(consensus, client_id, 
request).await else {
+            return;
+        };
 
         emit_sim_event(
             SimEventKind::ClientRequestReceived,
@@ -342,7 +365,7 @@ where
                 replica_id = consensus.replica(),
                 view = consensus.view(),
                 op = header.op,
-                commit = consensus.commit(),
+                commit = consensus.commit_max(),
                 operation = ?header.operation,
                 "received old prepare, skipping replication"
             );
@@ -355,9 +378,18 @@ where
 
         // TODO handle gap in ops.
 
+        // Verify hash chain integrity BEFORE checkpoint. 
`checkpoint_if_needed`
+        // can drain WAL entries, making previous_header return None.
+        if let Some(previous) = journal.handle().previous_header(&header) {
+            panic_if_hash_chain_would_break_in_same_view(&previous, &header);
+        }
+
         // Force a checkpoint if the journal is running low on capacity.
         if let Some(coordinator) = &self.coordinator {
-            let snap_op = consensus.commit();
+            // Use commit_min (locally executed), not commit_max. WAL entries
+            // between commit_min+1 and commit_max haven't been applied to the
+            // state machine yet, draining them would lose data on crash.
+            let snap_op = consensus.commit_min();
             match coordinator
                 .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
                 .await
@@ -386,18 +418,25 @@ where
             }
         }
 
-        // Verify hash chain integrity.
-        if let Some(previous) = journal.handle().previous_header(&header) {
-            panic_if_hash_chain_would_break_in_same_view(&previous, &header);
+        // TODO: Restore hard assert_eq!(header.op, current_op + 1) once 
message repair
+        // is implemented. Without repair, the network can deliver prepares 
out of order
+        // and the replica has no way to request the missing ones.
+        if header.op != current_op + 1 {
+            warn!(
+                target: "iggy.metadata.diag",
+                plane = "metadata",
+                replica_id = consensus.replica(),
+                op = header.op,
+                expected = current_op + 1,
+                "on_replicate: dropping out-of-order prepare (gap)"
+            );
+            return;
         }
 
-        assert_eq!(header.op, current_op + 1);
-
-        consensus.sequencer().set_sequence(header.op);
-        consensus.set_last_prepare_checksum(header.checksum);
-
-        // Append to journal.
-        if let Err(e) = journal.handle().append(message).await {
+        // Append to journal first. Sequencer and checksum are updated AFTER
+        // successful append so a failed write doesn't leave consensus state
+        // pointing at a phantom entry.
+        if let Err(e) = journal.handle().append(message.clone()).await {
             error!(
                 target: "iggy.metadata.diag",
                 plane = "metadata",
@@ -410,15 +449,19 @@ where
             return;
         }
 
+        consensus.sequencer().set_sequence(header.op);
+        consensus.set_last_prepare_checksum(header.checksum);
+
         // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(&header).await;
 
         // If follower, commit any newly committable entries.
         if consensus.is_follower() {
-            self.commit_journal();
+            self.commit_journal().await;
         }
     }
 
+    #[allow(clippy::too_many_lines)]
     async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareOkHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
@@ -494,18 +537,15 @@ where
                         )
                     });
 
+                // Committed ops must be infallible — if the state machine 
cannot
+                // apply a committed op, replicas will diverge.
                 let response = 
self.mux_stm.update(prepare).unwrap_or_else(|err| {
-                    warn!(
-                        target: "iggy.metadata.diag",
-                        plane = "metadata",
-                        replica_id = consensus.replica(),
-                        op = prepare_header.op,
-                        operation = ?prepare_header.operation,
-                        error = %err,
-                        "state machine update failed for committed metadata 
entry"
+                    panic!(
+                        "on_ack: committed metadata op={} failed to apply: 
{err}",
+                        prepare_header.op
                     );
-                    bytes::Bytes::new()
                 });
+                consensus.advance_commit_min(prepare_header.op);
                 let pipeline_depth = consensus.pipeline().borrow().len();
                 let event = CommitLogEvent {
                     replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Metadata),
@@ -517,17 +557,27 @@ where
                 };
                 emit_sim_event(SimEventKind::OperationCommitted, &event);
 
-                let generic_reply =
-                    build_reply_message(consensus, &prepare_header, 
response).into_generic();
+                let reply = build_reply_message(consensus, &prepare_header, 
response);
+                // Cache reply for duplicate detection:
+                consensus
+                    .client_table()
+                    .borrow_mut()
+                    .commit_reply(prepare_header.client, reply.clone());
+
+                let generic_reply = reply.into_generic();
                 let reply_buffers = freeze_client_reply(generic_reply);
                 emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
 
-                // TODO: Propagate send error instead of panicking; requires 
bus error design.
-                consensus
+                if let Err(e) = consensus
                     .message_bus()
                     .send_to_client(prepare_header.client, reply_buffers)
                     .await
-                    .unwrap();
+                {
+                    warn!(
+                        "on_ack: failed to send reply to client={}: {e}",
+                        prepare_header.client
+                    );
+                }
             }
         }
     }
@@ -559,7 +609,11 @@ where
     P: Pipeline<Entry = PipelineEntry>,
     J: JournalHandle,
     J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
-    M: StateMachine<Input = Message<PrepareHeader>>,
+    M: StateMachine<
+            Input = Message<PrepareHeader>,
+            Output = bytes::Bytes,
+            Error = iggy_common::IggyError,
+        >,
 {
     /// Replicate a prepare message to the next replica in the chain.
     ///
@@ -588,11 +642,46 @@ where
     // TODO: Implement jump_to_newer_op
     // fn jump_to_newer_op(&self, header: &PrepareHeader) {}
 
-    #[allow(clippy::unused_self)]
-    const fn commit_journal(&self) {
-        // TODO: Implement commit logic
-        // Walk through journal from last committed to current commit number
-        // Apply each entry to the state machine
+    /// Walk ops from `commit_min+1` to `commit_max`, applying the state 
machine
+    /// and updating the client table for each.
+    ///
+    /// The backup does NOT send replies to clients, only the primary does 
that.
+    #[allow(clippy::cast_possible_truncation)]
+    #[allow(clippy::future_not_send)]
+    async fn commit_journal(&self) {
+        let consensus = self.consensus.as_ref().unwrap();
+        let journal = self.journal.as_ref().unwrap();
+
+        while consensus.commit_min() < consensus.commit_max() {
+            let op = consensus.commit_min() + 1;
+
+            let Some(header) = journal.handle().header(op as usize) else {
+                // TODO: Implement message repair: request missing prepare from
+                // primary or other replicas. Until then, the backup stalls 
here.
+                break;
+            };
+            let header = *header;
+
+            let Some(prepare) = journal.handle().entry(&header).await else {
+                warn!("commit_journal: prepare body missing for op={op}, 
stopping");
+                break;
+            };
+
+            // Committed ops must be infallible (see on_ack comment).
+            let response = self.mux_stm.update(prepare).unwrap_or_else(|err| {
+                panic!("commit_journal: committed metadata op={op} failed to 
apply: {err}");
+            });
+
+            consensus.advance_commit_min(op);
+
+            let reply = build_reply_message(consensus, &header, response);
+            consensus
+                .client_table()
+                .borrow_mut()
+                .commit_reply(header.client, reply);
+
+            debug!("commit_journal: committed op={op}");
+        }
     }
 
     #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
diff --git a/core/metadata/src/impls/recovery.rs 
b/core/metadata/src/impls/recovery.rs
index d4f01dc6a..4d5606e4c 100644
--- a/core/metadata/src/impls/recovery.rs
+++ b/core/metadata/src/impls/recovery.rs
@@ -21,7 +21,7 @@ use crate::stm::snapshot::{MetadataSnapshot, RestoreSnapshot, 
Snapshot, Snapshot
 use iggy_binary_protocol::consensus::PrepareHeader;
 use iggy_binary_protocol::consensus::message::Message;
 use iggy_common::IggyError;
-use journal::metadata_journal::{JournalError, MetadataJournal};
+use journal::prepare_journal::{JournalError, PrepareJournal};
 use std::fmt;
 use std::path::Path;
 
@@ -82,7 +82,7 @@ impl From<std::io::Error> for RecoveryError {
 
 /// Result of a successful metadata recovery.
 pub struct RecoveredMetadata<M> {
-    pub journal: MetadataJournal,
+    pub journal: PrepareJournal,
     pub snapshot: IggySnapshot,
     pub mux_stm: M,
     /// `None` means no snapshot existed and no journal entries were replayed.
@@ -125,7 +125,7 @@ where
 
     // 3. Open journal, scan the WAL and build index
     let journal_path = metadata_dir.join("journal.wal");
-    let journal = MetadataJournal::open(&journal_path, 
snapshot.sequence_number()).await?;
+    let journal = PrepareJournal::open(&journal_path, 
snapshot.sequence_number()).await?;
 
     // 4. Replay journal entries after snapshot
     let headers_to_replay = journal.iter_headers_from(replay_from);
@@ -214,7 +214,7 @@ mod tests {
         std::fs::create_dir_all(&metadata_dir).unwrap();
 
         {
-            let journal = 
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+            let journal = 
PrepareJournal::open(&metadata_dir.join("journal.wal"), 0)
                 .await
                 .unwrap();
             journal.append(make_prepare(1, 32)).await.unwrap();
@@ -242,7 +242,7 @@ mod tests {
 
         // WAL has ops 1-10
         {
-            let journal = 
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+            let journal = 
PrepareJournal::open(&metadata_dir.join("journal.wal"), 0)
                 .await
                 .unwrap();
             for op in 1..=10 {
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 5d9d3d601..7134c84ff 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -29,9 +29,10 @@ use consensus::PlaneIdentity;
 use consensus::{
     CommitLogEvent, Consensus, NamespacedPipeline, Pipeline, PipelineEntry, 
Plane, PlaneKind,
     Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, 
VsrConsensus,
-    ack_preflight, build_reply_message, emit_namespace_progress_event, 
emit_sim_event,
-    fence_old_prepare_by_commit, pipeline_prepare_common, replicate_preflight,
-    replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
+    ack_preflight, ack_quorum_reached, build_reply_message, 
drain_committable_prefix,
+    emit_namespace_progress_event, emit_sim_event, fence_old_prepare_by_commit,
+    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, 
request_preflight,
+    send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_binary_protocol::{
     Command2, ConsensusHeader, GenericHeader, Message, Operation, 
PrepareHeader, PrepareOkHeader,
@@ -43,7 +44,7 @@ use iggy_common::{
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
 use iobuf::Frozen;
-use journal::Journal as _;
+use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use std::cell::UnsafeCell;
 use std::collections::{HashMap, HashSet};
@@ -61,7 +62,7 @@ use tracing::{debug, warn};
 /// For example, shard 0 might have `partition_ids` [0, 2, 4] while shard 1
 /// has `partition_ids` [1, 3, 5]. The `LocalIdx` provides the actual index
 /// into the `partitions` Vec.
-pub struct IggyPartitions<C> {
+pub struct IggyPartitions<C, J = ()> {
     shard_id: ShardId,
     config: PartitionsConfig,
     /// Collection of partitions, the index of each partition isn't it's ID, 
but rather an local index (`LocalIdx`) which is used for lookups.
@@ -77,13 +78,19 @@ pub struct IggyPartitions<C> {
     partitions: UnsafeCell<Vec<IggyPartition>>,
     namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
     consensus: Option<C>,
+    /// Consensus-level WAL for prepare messages.
+    ///
+    /// Stores full `Message<PrepareHeader>` keyed by op. Operations are 
applied
+    /// at commit time (not replicate time) by reading from this WAL in
+    /// `commit_journal` (backup) and `on_ack` (primary).
+    journal: Option<J>,
 }
 
 const fn freeze_client_reply(message: Message<GenericHeader>) -> 
Message<GenericHeader> {
     message
 }
 
-impl<C> IggyPartitions<C> {
+impl<C, J> IggyPartitions<C, J> {
     #[must_use]
     pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
         Self {
@@ -92,6 +99,7 @@ impl<C> IggyPartitions<C> {
             partitions: UnsafeCell::new(Vec::new()),
             namespace_to_local: HashMap::new(),
             consensus: None,
+            journal: None,
         }
     }
 
@@ -103,9 +111,18 @@ impl<C> IggyPartitions<C> {
             partitions: UnsafeCell::new(Vec::with_capacity(capacity)),
             namespace_to_local: HashMap::with_capacity(capacity),
             consensus: None,
+            journal: None,
         }
     }
 
+    pub fn set_journal(&mut self, journal: J) {
+        self.journal = Some(journal);
+    }
+
+    pub const fn journal(&self) -> Option<&J> {
+        self.journal.as_ref()
+    }
+
     pub const fn config(&self) -> &PartitionsConfig {
         &self.config
     }
@@ -389,15 +406,36 @@ impl<C> IggyPartitions<C> {
     }
 }
 
-impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, 
NamespacedPipeline>>
+impl<B, J> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, 
NamespacedPipeline>, J>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    J: JournalHandle,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
 {
     async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let namespace = IggyNamespace::from_raw(message.header().namespace);
         let consensus = self
             .consensus()
             .expect("on_request: consensus not initialized");
+        let client_id = message.header().client;
+        let request = message.header().request;
+
+        // TODO: Add a bounded request queue instead of dropping here.
+        // When the prepare queue (8 max) is full, buffer
+        // incoming requests in a request queue. On commit, pop the next 
request
+        // from the request queue and begin preparing it. Only drop when both
+        // queues are full.
+        if consensus.pipeline().borrow().is_full() {
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                client = client_id,
+                request = request,
+                "on_request: pipeline full, dropping request"
+            );
+            return;
+        }
 
         emit_sim_event(
             SimEventKind::ClientRequestReceived,
@@ -408,6 +446,7 @@ where
                 operation: message.header().operation,
             },
         );
+
         let message = if message.header().operation == Operation::SendMessages 
{
             match convert_request_message(namespace, message) {
                 Ok(message) => message,
@@ -427,6 +466,11 @@ where
         } else {
             message
         };
+
+        let Some(_notify) = request_preflight(consensus, client_id, 
request).await else {
+            return;
+        };
+
         let prepare = message.project(consensus);
         pipeline_prepare_common(consensus, PlaneKind::Partitions, prepare, 
|prepare| {
             self.on_replicate(prepare)
@@ -436,7 +480,6 @@ where
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
         let header = *message.header();
-        let namespace = IggyNamespace::from_raw(header.namespace);
         let consensus = self
             .consensus()
             .expect("on_replicate: consensus not initialized");
@@ -467,7 +510,7 @@ where
                 replica_id = consensus.replica(),
                 view = consensus.view(),
                 op = header.op,
-                commit = consensus.commit(),
+                commit = consensus.commit_max(),
                 namespace_raw = header.namespace,
                 operation = ?header.operation,
                 "received old prepare, skipping replication"
@@ -475,26 +518,41 @@ where
             return;
         }
 
-        // TODO: Figure out the flow of the partition operations.
-        // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
-        // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
-        let message = self.replicate(message).await;
-        if let Err(error) = self.apply_replicated_operation(&namespace, 
message).await {
+        // TODO: Restore hard assert_eq!(header.op, current_op + 1) once 
message repair
+        // is implemented. Without repair, the network can deliver prepares 
out of order
+        // and the replica has no way to request the missing ones.
+        if header.op != current_op + 1 {
+            // Out-of-order prepare: the network delivered a future op before
+            // we received the preceding ones. Drop it and rely on primary
+            // retransmission to re-send both this op and the missing ones.
             warn!(
                 target: "iggy.partitions.diag",
                 plane = "partitions",
                 replica_id = consensus.replica(),
                 op = header.op,
-                namespace_raw = namespace.inner(),
-                operation = ?header.operation,
-                %error,
-                "failed to apply replicated partition operation"
+                expected = current_op + 1,
+                "on_replicate: dropping out-of-order prepare (gap)"
+            );
+            return;
+        }
+
+        // Chain-replicate to the next replica before local journal append.
+        let message = self.replicate(message).await;
+
+        let journal = self
+            .journal
+            .as_ref()
+            .expect("on_replicate: journal not initialized");
+        if let Err(e) = journal.handle().append(message).await {
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                "on_replicate: WAL append failed for op={}: {e}", header.op
             );
             return;
         }
 
-        // TODO: Make those assertions be toggleable through a feature flag, 
so they can be used only by simulator/tests.
-        debug_assert_eq!(header.op, current_op + 1);
         consensus.sequencer().set_sequence(header.op);
         consensus.set_last_prepare_checksum(header.checksum);
         emit_namespace_progress_event(
@@ -505,8 +563,13 @@ where
         );
 
         self.send_prepare_ok(&header).await;
+
+        if consensus.is_follower() {
+            self.commit_journal().await;
+        }
     }
 
+    #[allow(clippy::too_many_lines)]
     async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareOkHeader>) {
         let header = message.header();
         let consensus = self.consensus().expect("on_ack: consensus not 
initialized");
@@ -542,50 +605,111 @@ where
             }
         }
 
-        consensus.handle_prepare_ok(PlaneKind::Partitions, header);
-
-        // SAFETY(IGGY-66): Per-namespace drain independent of global commit.
-        //
-        // drain_committable_all() drains each namespace queue independently by
-        // quorum flag, so ns_a ops can be drained and replied to clients while
-        // ns_b ops block the global commit (e.g., ns_a ops 1,3 drain while
-        // ns_b op 2 is pending). This is intentional for partition 
independence.
-        //
-        // View change risk: if a view change occurs before the global commit
-        // covers a drained op, the new primary replays from max_commit+1 and
-        // re-executes it. append_messages is NOT idempotent -- re-execution
-        // produces duplicate partition data.
-        //
-        // Before this path handles real traffic, two guards are required:
-        //   1. Op-based dedup in apply_replicated_operation: skip append if
-        //      the partition journal already contains data for this op.
-        //   2. Client reply dedup by (client_id, request_id): prevent
-        //      duplicate replies after view change re-execution.
-        let drained = {
-            let mut pipeline = consensus.pipeline().borrow_mut();
-            pipeline.drain_committable_all()
-        };
+        if !ack_quorum_reached(consensus, PlaneKind::Partitions, header) {
+            return;
+        }
+
+        let drained = drain_committable_prefix(consensus);
 
         if drained.is_empty() {
             return;
         }
 
-        self.handle_committed_entries(consensus, drained).await;
+        if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
+            debug!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                first_op = first.header.op,
+                last_op = last.header.op,
+                drained_count = drained.len(),
+                "draining committed partition ops"
+            );
+        }
 
-        let pipeline = consensus.pipeline().borrow();
-        let new_commit = pipeline.global_commit_frontier(consensus.commit());
-        drop(pipeline);
-        consensus.advance_commit_number(new_commit);
-        emit_namespace_progress_event(
-            SimEventKind::NamespaceProgressUpdated,
-            &ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
-            new_commit,
-            consensus.pipeline().borrow().len(),
-        );
+        let journal = self
+            .journal
+            .as_ref()
+            .expect("on_ack: journal not initialized");
+        for PipelineEntry {
+            header: prepare_header,
+            ..
+        } in drained
+        {
+            let entry_namespace = 
IggyNamespace::from_raw(prepare_header.namespace);
+
+            // Read full message from WAL and apply operation at commit time.
+            let prepare = journal
+                .handle()
+                .entry(&prepare_header)
+                .await
+                .unwrap_or_else(|| {
+                    panic!(
+                        "on_ack: committed prepare op={} must be in WAL",
+                        prepare_header.op
+                    )
+                });
+            // Committed ops must be infallible — if the state machine cannot
+            // apply a committed op, replicas will diverge.
+            if let Err(e) = self
+                .apply_replicated_operation(&entry_namespace, prepare)
+                .await
+            {
+                panic!(
+                    "on_ack: committed op={} failed to apply: {e}",
+                    prepare_header.op
+                );
+            }
+
+            // Flush after every op, each apply appends new data to the
+            // partition journal that must be durable before advancing 
commit_min.
+            if let Err(e) = self.commit_messages(&entry_namespace).await {
+                panic!(
+                    "on_ack: committed op={} failed to flush: {e}",
+                    prepare_header.op
+                );
+            }
+
+            consensus.advance_commit_min(prepare_header.op);
+
+            let pipeline_depth = consensus.pipeline().borrow().len();
+            let event = CommitLogEvent {
+                replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                op: prepare_header.op,
+                client_id: prepare_header.client,
+                request_id: prepare_header.request,
+                operation: prepare_header.operation,
+                pipeline_depth,
+            };
+            emit_sim_event(SimEventKind::OperationCommitted, &event);
+
+            let reply = build_reply_message(consensus, &prepare_header, 
bytes::Bytes::new());
+            consensus
+                .client_table()
+                .borrow_mut()
+                .commit_reply(prepare_header.client, reply.clone());
+
+            let reply_buffers = freeze_client_reply(reply.into_generic());
+            emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
+
+            if let Err(e) = consensus
+                .message_bus()
+                .send_to_client(prepare_header.client, reply_buffers)
+                .await
+            {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    client = prepare_header.client,
+                    op = prepare_header.op,
+                    "on_ack: failed to send reply to client: {e}"
+                );
+            }
+        }
     }
 }
 
-impl<B> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, 
NamespacedPipeline>>
+impl<B, J> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, 
NamespacedPipeline>, J>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
@@ -601,9 +725,11 @@ where
     }
 }
 
-impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
+impl<B, J> IggyPartitions<VsrConsensus<B, NamespacedPipeline>, J>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    J: JournalHandle,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
 {
     /// # Panics
     /// Panics if consensus is not initialized.
@@ -751,6 +877,56 @@ where
         replicate_to_next_in_chain(consensus, message).await
     }
 
+    /// Walk ops from `commit_min+1` to `commit_max`, applying operations and
+    /// updating the client table for each.
+    ///
+    /// The backup does NOT send replies to clients, only the primary does 
that.
+    #[allow(clippy::cast_possible_truncation)]
+    async fn commit_journal(&self) {
+        let consensus = self
+            .consensus()
+            .expect("commit_journal: consensus not initialized");
+        let journal = self
+            .journal
+            .as_ref()
+            .expect("commit_journal: journal not initialized");
+
+        while consensus.commit_min() < consensus.commit_max() {
+            let op = consensus.commit_min() + 1;
+
+            let Some(header) = journal.handle().header(op as usize) else {
+                // TODO: Implement message repair: request missing prepare from
+                // primary or other replicas. Until then, the backup stalls 
here.
+                break;
+            };
+            let header = *header;
+
+            let Some(prepare) = journal.handle().entry(&header).await else {
+                warn!("commit_journal: prepare body missing for op={op}, 
stopping");
+                break;
+            };
+
+            let ns = IggyNamespace::from_raw(header.namespace);
+
+            // Committed ops must be infallible (see on_ack comment).
+            if let Err(e) = self.apply_replicated_operation(&ns, 
prepare).await {
+                panic!("commit_journal: committed op={op} failed to apply: 
{e}");
+            }
+            if let Err(e) = self.commit_messages(&ns).await {
+                panic!("commit_journal: committed op={op} failed to flush: 
{e}");
+            }
+
+            consensus.advance_commit_min(op);
+
+            let reply = build_reply_message(consensus, &header, 
bytes::Bytes::new());
+            consensus
+                .client_table()
+                .borrow_mut()
+                .commit_reply(header.client, reply);
+            debug!("commit_journal: committed op={op}");
+        }
+    }
+
     /// Persist prepared messages to segment storage and advance the durable 
frontier.
     ///
     /// Updates segment metadata, stats, flushes journal to disk if thresholds
@@ -941,9 +1117,14 @@ where
                 pipeline_depth,
             );
 
-            let generic_reply =
-                build_reply_message(consensus, &prepare_header, 
bytes::Bytes::new()).into_generic();
-            let reply_buffers = freeze_client_reply(generic_reply);
+            let reply = build_reply_message(consensus, &prepare_header, 
bytes::Bytes::new());
+            // Cache reply for duplicate detection:
+            consensus
+                .client_table()
+                .borrow_mut()
+                .commit_reply(prepare_header.client, reply.clone());
+
+            let reply_buffers = freeze_client_reply(reply.into_generic());
             emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
 
             if let Err(error) = consensus
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 3e17f4ba2..a77fd3742 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -18,7 +18,9 @@
 mod router;
 pub mod shards_table;
 
-use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, 
VsrConsensus};
+use consensus::{
+    MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, PlaneKind, 
Sequencer, VsrConsensus,
+};
 use iggy_binary_protocol::{
     GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader, 
RequestHeader,
 };
@@ -31,10 +33,12 @@ use metadata::stm::StateMachine;
 use partitions::IggyPartitions;
 use shards_table::ShardsTable;
 
-pub type ShardPlane<B, J, S, M> = MuxPlane<
+// MJ - Metadata Journal
+// PJ - Partitions Journal
+pub type ShardPlane<B, MJ, S, M, PJ> = MuxPlane<
     variadic!(
-        IggyMetadata<VsrConsensus<B>, J, S, M>,
-        IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
+        IggyMetadata<VsrConsensus<B>, MJ, S, M>,
+        IggyPartitions<VsrConsensus<B, NamespacedPipeline>, PJ>
     ),
 >;
 
@@ -89,13 +93,13 @@ impl<R: Send + 'static> ShardFrame<R> {
     }
 }
 
-pub struct IggyShard<B, J, S, M, T = (), R: Send + 'static = ()>
+pub struct IggyShard<B, MJ, S, M, PJ = (), T = (), R: Send + 'static = ()>
 where
     B: MessageBus,
 {
     pub id: u16,
     pub name: String,
-    pub plane: ShardPlane<B, J, S, M>,
+    pub plane: ShardPlane<B, MJ, S, M, PJ>,
 
     /// Channel senders to every shard, indexed by shard id.
     /// Includes a sender to self so that local routing goes through the
@@ -110,7 +114,7 @@ where
     shards_table: T,
 }
 
-impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
+impl<B, MJ, S, M, PJ, T, R: Send + 'static> IggyShard<B, MJ, S, M, PJ, T, R>
 where
     B: MessageBus,
     T: ShardsTable,
@@ -124,8 +128,8 @@ where
     pub const fn new(
         id: u16,
         name: String,
-        metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
-        partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+        metadata: IggyMetadata<VsrConsensus<B>, MJ, S, M>,
+        partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>, PJ>,
         senders: Vec<Sender<ShardFrame<R>>>,
         inbox: Receiver<ShardFrame<R>>,
         shards_table: T,
@@ -149,8 +153,8 @@ where
     pub fn without_inbox(
         id: u16,
         name: String,
-        metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
-        partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+        metadata: IggyMetadata<VsrConsensus<B>, MJ, S, M>,
+        partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>, PJ>,
         shards_table: T,
     ) -> Self {
         // TODO: previously we used unbounded channel with flume,
@@ -176,7 +180,7 @@ where
 
 /// Local message processing — these methods handle messages that have been
 /// routed to this shard via the message pump.
-impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
+impl<B, MJ, S, M, PJ, T, R: Send + 'static> IggyShard<B, MJ, S, M, PJ, T, R>
 where
     B: MessageBus,
 {
@@ -188,9 +192,15 @@ where
     pub async fn on_message(&self, message: Message<GenericHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
@@ -214,9 +224,15 @@ where
     pub async fn on_request(&self, request: Message<RequestHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
@@ -233,9 +249,15 @@ where
     pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
@@ -252,9 +274,15 @@ where
     pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
@@ -282,9 +310,15 @@ where
     pub async fn process_loopback(&self, buf: &mut 
Vec<Message<GenericHeader>>) -> usize
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
@@ -333,9 +367,60 @@ where
                 Data = 
iggy_binary_protocol::Message<iggy_binary_protocol::GenericHeader>,
                 Client = u128,
             >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
     {
         let partitions = self.plane.partitions_mut();
         partitions.init_partition_in_memory(namespace);
         partitions.register_namespace_in_pipeline(namespace.inner());
     }
+
+    /// Tick the partitions consensus and dispatch any resulting actions.
+    ///
+    /// Currently handles `RetransmitPrepares` (primary retransmits
+    /// uncommitted prepares when the prepare timeout fires).
+    #[allow(clippy::future_not_send)]
+    pub async fn tick_partitions(&self)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+    {
+        let partitions = self.plane.partitions();
+        let Some(consensus) = partitions.consensus() else {
+            return;
+        };
+        let Some(journal) = partitions.journal() else {
+            return;
+        };
+
+        let current_op = consensus.sequencer().current_sequence();
+        let current_commit = consensus.commit_min();
+        let actions = consensus.tick(PlaneKind::Partitions, current_op, 
current_commit);
+
+        for action in actions {
+            if let consensus::VsrAction::RetransmitPrepares { targets } = 
action {
+                for (header, replicas) in targets {
+                    let Some(prepare) = journal.handle().entry(&header).await 
else {
+                        continue;
+                    };
+                    for replica in replicas {
+                        let _ = consensus
+                            .message_bus()
+                            .send_to_replica(replica, 
prepare.clone().into_generic())
+                            .await;
+                    }
+                }
+            }
+            // TODO: Dispatch other VsrActions (view change messages, etc.)
+        }
+    }
 }
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 8965a6a05..177cddaa0 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -30,7 +30,7 @@ use metadata::stm::StateMachine;
 /// through the channel into the target shard's message pump.  This ensures
 /// that every mutation on a shard is serialized through a single point (the
 /// pump), preventing concurrent access from independent async tasks.
-impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
+impl<B, MJ, S, M, PJ, T, R> IggyShard<B, MJ, S, M, PJ, T, R>
 where
     B: MessageBus,
     T: ShardsTable,
@@ -144,9 +144,15 @@ where
     pub async fn run_message_pump(&self, stop: Receiver<()>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
@@ -178,9 +184,15 @@ where
     async fn process_frame(&self, frame: ShardFrame<R>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
-        J: JournalHandle,
-        <J as JournalHandle>::Target: Journal<
-                <J as JournalHandle>::Storage,
+        MJ: JournalHandle,
+        <MJ as JournalHandle>::Target: Journal<
+                <MJ as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        PJ: JournalHandle,
+        <PJ as JournalHandle>::Target: Journal<
+                <PJ as JournalHandle>::Storage,
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index a6a6b09cb..d9d9029a3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -226,6 +226,17 @@ impl Simulator {
         self.crashed.contains(&replica_index)
     }
 
+    /// Advance consensus timeouts and dispatch actions on every replica.
+    ///
+    /// Call this once per simulation tick. Handles prepare retransmission
+    /// (and eventually view-change messages) via the timeout system.
+    #[allow(clippy::future_not_send)]
+    pub async fn tick(&self) {
+        for replica in &self.replicas {
+            replica.tick_partitions().await;
+        }
+    }
+
     /// Poll messages directly from a replica's partition.
     ///
     /// # Errors
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 730e0af18..c9373d3ef 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -33,8 +33,13 @@ const CLUSTER_ID: u128 = 1;
 
 // For now there is only one shard per replica,
 // we will add support for multiple shards per replica in the future.
-pub type Replica =
-    shard::IggyShard<SharedSimOutbox, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
+pub type Replica = shard::IggyShard<
+    SharedSimOutbox,
+    SimJournal<MemStorage>, // MJ: metadata journal
+    SimSnapshot,
+    SimMuxStateMachine,
+    SimJournal<MemStorage>, // PJ: partitions journal
+>;
 
 pub fn new_replica(id: u8, name: String, bus: &Arc<SimOutbox>, replica_count: 
u8) -> Replica {
     let users: Users = UsersInner::new().into();
@@ -69,6 +74,7 @@ pub fn new_replica(id: u8, name: String, bus: 
&Arc<SimOutbox>, replica_count: u8
     };
 
     let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)), 
partitions_config);
+    partitions.set_journal(SimJournal::<MemStorage>::default());
 
     // TODO: namespace=0 collides with metadata consensus. Safe for now 
because the simulator
     // routes by Operation type, but a shared view change bus would produce 
namespace collisions.

Reply via email to