hubcio commented on code in PR #3023:
URL: https://github.com/apache/iggy/pull/3023#discussion_r3011219870


##########
core/consensus/src/clients_table.rs:
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+    pub client_id: u128,
+    pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+    waker: RefCell<Option<Waker>>,
+    notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+    inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+    /// Create a new `Notify` in the un-notified state.
+    #[must_use]
+    pub fn new() -> Self {
+        Self {
+            inner: Rc::new(NotifyInner {
+                waker: RefCell::new(None),
+                notified: std::cell::Cell::new(false),
+            }),
+        }
+    }
+
+    /// Wake the waiter, if any. If `notified()` is polled later, it will
+    /// resolve immediately.
+    pub fn notify(&self) {
+        self.inner.notified.set(true);
+        if let Some(waker) = self.inner.waker.borrow_mut().take() {
+            waker.wake();
+        }
+    }
+
+    /// Returns a future that resolves when [`notify()`](Self::notify) is 
called.
+    ///
+    /// If `notify()` was already called before this future is polled, it
+    /// resolves immediately (permit is consumed).
+    #[allow(clippy::future_not_send)]
+    pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+        std::future::poll_fn(move |cx| {
+            if self.inner.notified.get() {
+                self.inner.notified.set(false);
+                std::task::Poll::Ready(())
+            } else {
+                *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+                std::task::Poll::Pending
+            }
+        })
+    }
+}
+
+impl Default for Notify {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+    /// The cached reply for the client's latest committed request (header + 
body).
+    pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the clients table.
+pub enum RequestStatus {
+    /// Request not seen before — proceed with consensus.
+    New,
+    /// Request already committed — re-send cached reply.
+    Duplicate(Message<ReplyHeader>),
+    /// Request is in the pipeline awaiting commit — drop (client should wait).
+    InProgress,
+}
+
+/// VSR client-table: tracks per-client request state for duplicate detection,
+/// reply caching, and async commit notification.
+///
+/// Uses a fixed-size slot array as the source of truth, with a `HashMap`
+/// as a secondary index for O(1) lookups by client ID.
+///
+/// ## Committed state (`slots` + `index`)
+///
+/// Always contains a valid `ClientEntry` with a non-optional reply.
+/// Updated by `commit_reply` when a request commits through consensus.
+///
+/// ## Pending state (`pending`)
+///
+/// Tracks in-flight requests that have been accepted for consensus but not yet
+/// committed. Each entry holds a [`Notify`] that is fired when the 
corresponding
+/// `commit_reply` arrives. Keyed by `ClientRequest` to support future
+/// request pipelining (currently at most one per client).
+///
+/// The `pending` map is local notification state not replicated, not
+/// serialized, not part of the deterministic committed state.
+///
+/// ## TODO
+///   Checkpoint serialization: the slot array is laid out for deterministic
+///   encode/decode to disk.
+#[derive(Debug)]
+pub struct ClientsTable {

Review Comment:
   the VR paper (section 4, figure 2) mandates the client-table as replicated 
state, transferred during view changes. in this PR, only the primary populates 
the table (`on_request` + `on_ack`). backups never call `commit_reply` because 
`ack_preflight` rejects non-primaries. `complete_view_change_as_primary()` and 
`handle_start_view()` don't rebuild or transfer the table.
   
   after any view change, the new primary's client-table is empty. client 
retransmissions are treated as `New`, causing duplicate execution. for 
`SendMessages` this means duplicate message appends to the partition log.
   
   <img width="653" height="790" alt="image" 
src="https://github.com/user-attachments/assets/8c0aca38-9ef4-444b-8029-f35cab36f2a7";
 />
   
   if this PR is scaffolding, please add a SAFETY comment at 
`complete_view_change_as_primary` and `handle_start_view` documenting the gap 
with a tracking issue number.



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -286,8 +286,45 @@ where
 {
     async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
+        let client_id = message.header().client;
+        let request = message.header().request;
+
+        // Duplicate detection via client-table
+        let status = consensus
+            .clients_table()
+            .borrow()
+            .check_request(client_id, request);
+        match status {
+            RequestStatus::Duplicate(cached_reply) => {
+                debug!(
+                    "on_request: duplicate request={request} for 
client={client_id}, re-sending cached reply"
+                );
+                consensus
+                    .message_bus()
+                    .send_to_client(client_id, cached_reply.into_generic())
+                    .await

Review Comment:
   also at `core/partitions/src/iggy_partitions.rs:366`.
   
   `.unwrap()` on `send_to_client` in the duplicate-reply path - if the client 
disconnects between sending the request and receiving the cached reply, this 
panics the shard. the pre-existing `on_ack` paths have the same `.unwrap()` 
with a `// TODO: Propagate send error` comment. these new call sites should at 
minimum have the same TODO, or better, use `if let Err(e) = ... { warn!(...) }`.



##########
core/consensus/src/clients_table.rs:
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+    pub client_id: u128,
+    pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+    waker: RefCell<Option<Waker>>,
+    notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+    inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+    /// Create a new `Notify` in the un-notified state.
+    #[must_use]
+    pub fn new() -> Self {
+        Self {
+            inner: Rc::new(NotifyInner {
+                waker: RefCell::new(None),
+                notified: std::cell::Cell::new(false),
+            }),
+        }
+    }
+
+    /// Wake the waiter, if any. If `notified()` is polled later, it will
+    /// resolve immediately.
+    pub fn notify(&self) {
+        self.inner.notified.set(true);
+        if let Some(waker) = self.inner.waker.borrow_mut().take() {
+            waker.wake();
+        }
+    }
+
+    /// Returns a future that resolves when [`notify()`](Self::notify) is 
called.
+    ///
+    /// If `notify()` was already called before this future is polled, it
+    /// resolves immediately (permit is consumed).
+    #[allow(clippy::future_not_send)]
+    pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+        std::future::poll_fn(move |cx| {
+            if self.inner.notified.get() {
+                self.inner.notified.set(false);
+                std::task::Poll::Ready(())
+            } else {
+                *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+                std::task::Poll::Pending
+            }
+        })
+    }
+}
+
+impl Default for Notify {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+    /// The cached reply for the client's latest committed request (header + 
body).
+    pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the clients table.
+pub enum RequestStatus {
+    /// Request not seen before — proceed with consensus.
+    New,
+    /// Request already committed — re-send cached reply.
+    Duplicate(Message<ReplyHeader>),
+    /// Request is in the pipeline awaiting commit — drop (client should wait).
+    InProgress,
+}
+
+/// VSR client-table: tracks per-client request state for duplicate detection,
+/// reply caching, and async commit notification.
+///
+/// Uses a fixed-size slot array as the source of truth, with a `HashMap`
+/// as a secondary index for O(1) lookups by client ID.
+///
+/// ## Committed state (`slots` + `index`)
+///
+/// Always contains a valid `ClientEntry` with a non-optional reply.
+/// Updated by `commit_reply` when a request commits through consensus.
+///
+/// ## Pending state (`pending`)
+///
+/// Tracks in-flight requests that have been accepted for consensus but not yet
+/// committed. Each entry holds a [`Notify`] that is fired when the 
corresponding
+/// `commit_reply` arrives. Keyed by `ClientRequest` to support future
+/// request pipelining (currently at most one per client).
+///
+/// The `pending` map is local notification state not replicated, not
+/// serialized, not part of the deterministic committed state.
+///
+/// ## TODO
+///   Checkpoint serialization: the slot array is laid out for deterministic
+///   encode/decode to disk.
+#[derive(Debug)]
+pub struct ClientsTable {
+    /// `None` means the slot is free.
+    /// Deterministic iteration order for eviction and serialization.
+    slots: Vec<Option<ClientEntry>>,
+    /// Secondary index: `client_id` → slot index. Rebuilt on decode.
+    index: HashMap<u128, usize>,
+    /// Pending commit waiters, keyed by `(client_id, request)`.
+    /// Keyed by request number (not just client) to support future pipelining.
+    /// Currently at most one per client.
+    pending: HashMap<ClientRequest, Notify>,
+}
+
+impl ClientsTable {
+    #[must_use]
+    pub fn new(max_clients: usize) -> Self {
+        let mut slots = Vec::with_capacity(max_clients);
+        slots.resize_with(max_clients, || None);
+        Self {
+            slots,
+            index: HashMap::with_capacity(max_clients),
+            pending: HashMap::new(),
+        }
+    }
+
+    /// Check a request against the table.
+    ///
+    /// Returns:
+    /// - [`RequestStatus::New`] — not seen before, proceed with consensus
+    /// - [`RequestStatus::Duplicate`] — already committed, re-send cached 
reply
+    /// - [`RequestStatus::InProgress`] — stale, already pending, or already 
committed
+    ///
+    /// # Panics
+    /// Panics if the internal index points to an empty slot (invariant 
violation).
+    #[must_use]
+    pub fn check_request(&self, client_id: u128, request: u64) -> 
RequestStatus {
+        // Check if already pending in the pipeline.
+        let key = ClientRequest { client_id, request };
+        if self.pending.contains_key(&key) {
+            return RequestStatus::InProgress;
+        }
+
+        let Some(&slot_idx) = self.index.get(&client_id) else {
+            return RequestStatus::New;
+        };
+        let entry = self.slots[slot_idx].as_ref().expect("index/slot 
mismatch");
+        let committed_request = entry.reply.header().request;
+
+        if request < committed_request {

Review Comment:
   `request < committed_request` returns `RequestStatus::InProgress`, but these 
requests are already committed (stale), not in-progress. callers currently 
handle this correctly (drop), but a future caller could reasonably try to await 
an `InProgress` result, which would block forever for stale requests. consider 
a `Stale` variant or rename to `AlreadyHandled`.



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -286,8 +286,45 @@ where
 {
     async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
+        let client_id = message.header().client;
+        let request = message.header().request;
+
+        // Duplicate detection via client-table

Review Comment:
   also at `core/partitions/src/iggy_partitions.rs:353-390`.
   
   the `check_request` -> match -> `register_pending` block in `on_request` and 
the `commit_reply` + `send_to_client` block in `on_ack` are nearly identical 
between the metadata and partitions planes (~25 lines each). extracting into a 
helper in `plane_helpers.rs` (following the existing `pipeline_prepare_common` 
/ `ack_preflight` pattern) would reduce drift risk - a fix applied in one plane 
but not the other would be a subtle bug.



##########
core/consensus/src/clients_table.rs:
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+    pub client_id: u128,
+    pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+    waker: RefCell<Option<Waker>>,
+    notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+    inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+    /// Create a new `Notify` in the un-notified state.
+    #[must_use]
+    pub fn new() -> Self {
+        Self {
+            inner: Rc::new(NotifyInner {
+                waker: RefCell::new(None),
+                notified: std::cell::Cell::new(false),
+            }),
+        }
+    }
+
+    /// Wake the waiter, if any. If `notified()` is polled later, it will
+    /// resolve immediately.
+    pub fn notify(&self) {
+        self.inner.notified.set(true);
+        if let Some(waker) = self.inner.waker.borrow_mut().take() {
+            waker.wake();
+        }
+    }
+
+    /// Returns a future that resolves when [`notify()`](Self::notify) is 
called.
+    ///
+    /// If `notify()` was already called before this future is polled, it
+    /// resolves immediately (permit is consumed).
+    #[allow(clippy::future_not_send)]
+    pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+        std::future::poll_fn(move |cx| {
+            if self.inner.notified.get() {
+                self.inner.notified.set(false);
+                std::task::Poll::Ready(())
+            } else {
+                *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+                std::task::Poll::Pending
+            }
+        })
+    }
+}
+
+impl Default for Notify {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+    /// The cached reply for the client's latest committed request (header + 
body).
+    pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the clients table.
+pub enum RequestStatus {
+    /// Request not seen before — proceed with consensus.
+    New,
+    /// Request already committed — re-send cached reply.
+    Duplicate(Message<ReplyHeader>),
+    /// Request is in the pipeline awaiting commit — drop (client should wait).
+    InProgress,
+}
+
+/// VSR client-table: tracks per-client request state for duplicate detection,
+/// reply caching, and async commit notification.
+///
+/// Uses a fixed-size slot array as the source of truth, with a `HashMap`
+/// as a secondary index for O(1) lookups by client ID.
+///
+/// ## Committed state (`slots` + `index`)
+///
+/// Always contains a valid `ClientEntry` with a non-optional reply.
+/// Updated by `commit_reply` when a request commits through consensus.
+///
+/// ## Pending state (`pending`)
+///
+/// Tracks in-flight requests that have been accepted for consensus but not yet
+/// committed. Each entry holds a [`Notify`] that is fired when the 
corresponding
+/// `commit_reply` arrives. Keyed by `ClientRequest` to support future
+/// request pipelining (currently at most one per client).
+///
+/// The `pending` map is local notification state not replicated, not
+/// serialized, not part of the deterministic committed state.
+///
+/// ## TODO
+///   Checkpoint serialization: the slot array is laid out for deterministic
+///   encode/decode to disk.
+#[derive(Debug)]
+pub struct ClientsTable {
+    /// `None` means the slot is free.
+    /// Deterministic iteration order for eviction and serialization.
+    slots: Vec<Option<ClientEntry>>,
+    /// Secondary index: `client_id` → slot index. Rebuilt on decode.
+    index: HashMap<u128, usize>,
+    /// Pending commit waiters, keyed by `(client_id, request)`.
+    /// Keyed by request number (not just client) to support future pipelining.
+    /// Currently at most one per client.
+    pending: HashMap<ClientRequest, Notify>,

Review Comment:
   `reset_view_change_state()` clears the pipeline and loopback queue but never 
touches `clients_table.pending`. if the same replica becomes primary again 
(e.g. replica 0 in view 0, then again in view 3 with 3 replicas since `3 % 3 == 
0`), stale pending entries from the old view survive. `check_request` returns 
`InProgress` for the orphaned `(client_id, request)` key, silently dropping 
valid client retries.
   
   fix: add `self.clients_table.borrow_mut().pending.clear()` to 
`reset_view_change_state()`, or expose a `clear_pending()` method on 
`ClientsTable`.



##########
core/consensus/src/clients_table.rs:
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+    pub client_id: u128,
+    pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+    waker: RefCell<Option<Waker>>,
+    notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+    inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+    /// Create a new `Notify` in the un-notified state.
+    #[must_use]
+    pub fn new() -> Self {
+        Self {
+            inner: Rc::new(NotifyInner {
+                waker: RefCell::new(None),
+                notified: std::cell::Cell::new(false),
+            }),
+        }
+    }
+
+    /// Wake the waiter, if any. If `notified()` is polled later, it will
+    /// resolve immediately.
+    pub fn notify(&self) {
+        self.inner.notified.set(true);
+        if let Some(waker) = self.inner.waker.borrow_mut().take() {
+            waker.wake();
+        }
+    }
+
+    /// Returns a future that resolves when [`notify()`](Self::notify) is 
called.
+    ///
+    /// If `notify()` was already called before this future is polled, it
+    /// resolves immediately (permit is consumed).
+    #[allow(clippy::future_not_send)]
+    pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+        std::future::poll_fn(move |cx| {
+            if self.inner.notified.get() {
+                self.inner.notified.set(false);
+                std::task::Poll::Ready(())
+            } else {
+                *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+                std::task::Poll::Pending
+            }
+        })
+    }
+}
+
+impl Default for Notify {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+    /// The cached reply for the client's latest committed request (header + 
body).
+    pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the clients table.
+pub enum RequestStatus {
+    /// Request not seen before — proceed with consensus.
+    New,
+    /// Request already committed — re-send cached reply.
+    Duplicate(Message<ReplyHeader>),
+    /// Request is in the pipeline awaiting commit — drop (client should wait).
+    InProgress,
+}
+
+/// VSR client-table: tracks per-client request state for duplicate detection,
+/// reply caching, and async commit notification.
+///
+/// Uses a fixed-size slot array as the source of truth, with a `HashMap`
+/// as a secondary index for O(1) lookups by client ID.
+///
+/// ## Committed state (`slots` + `index`)
+///
+/// Always contains a valid `ClientEntry` with a non-optional reply.
+/// Updated by `commit_reply` when a request commits through consensus.
+///
+/// ## Pending state (`pending`)
+///
+/// Tracks in-flight requests that have been accepted for consensus but not yet
+/// committed. Each entry holds a [`Notify`] that is fired when the 
corresponding
+/// `commit_reply` arrives. Keyed by `ClientRequest` to support future
+/// request pipelining (currently at most one per client).
+///
+/// The `pending` map is local notification state not replicated, not
+/// serialized, not part of the deterministic committed state.
+///
+/// ## TODO
+///   Checkpoint serialization: the slot array is laid out for deterministic
+///   encode/decode to disk.
+#[derive(Debug)]
+pub struct ClientsTable {
+    /// `None` means the slot is free.
+    /// Deterministic iteration order for eviction and serialization.
+    slots: Vec<Option<ClientEntry>>,
+    /// Secondary index: `client_id` → slot index. Rebuilt on decode.
+    index: HashMap<u128, usize>,
+    /// Pending commit waiters, keyed by `(client_id, request)`.
+    /// Keyed by request number (not just client) to support future pipelining.
+    /// Currently at most one per client.
+    pending: HashMap<ClientRequest, Notify>,
+}
+
+impl ClientsTable {
+    #[must_use]
+    pub fn new(max_clients: usize) -> Self {
+        let mut slots = Vec::with_capacity(max_clients);
+        slots.resize_with(max_clients, || None);
+        Self {
+            slots,
+            index: HashMap::with_capacity(max_clients),
+            pending: HashMap::new(),
+        }
+    }
+
+    /// Check a request against the table.
+    ///
+    /// Returns:
+    /// - [`RequestStatus::New`] — not seen before, proceed with consensus
+    /// - [`RequestStatus::Duplicate`] — already committed, re-send cached 
reply
+    /// - [`RequestStatus::InProgress`] — stale, already pending, or already 
committed
+    ///
+    /// # Panics
+    /// Panics if the internal index points to an empty slot (invariant 
violation).
+    #[must_use]
+    pub fn check_request(&self, client_id: u128, request: u64) -> 
RequestStatus {
+        // Check if already pending in the pipeline.
+        let key = ClientRequest { client_id, request };
+        if self.pending.contains_key(&key) {
+            return RequestStatus::InProgress;
+        }
+
+        let Some(&slot_idx) = self.index.get(&client_id) else {
+            return RequestStatus::New;
+        };
+        let entry = self.slots[slot_idx].as_ref().expect("index/slot 
mismatch");
+        let committed_request = entry.reply.header().request;
+
+        if request < committed_request {
+            return RequestStatus::InProgress;
+        }
+        if request == committed_request {
+            return RequestStatus::Duplicate(entry.reply.clone());
+        }
+
+        RequestStatus::New
+    }
+
+    /// Register interest in a pending request's commit.
+    ///
+    /// Returns a [`Notify`] the caller can `.notified().await` on. The 
`Notify`
+    /// is cloned via `Rc`, so the caller can hold it across `.await` points
+    /// without borrowing the `ClientsTable`.
+    ///
+    /// Called after `check_request` returns `New`, before submitting the 
request
+    /// to the consensus pipeline.
+    ///
+    /// # Panics
+    /// Panics if there is already a pending waiter for this `(client_id, 
request)`.
+    pub fn register_pending(&mut self, client_id: u128, request: u64) -> 
Notify {
+        let notify = Notify::new();
+        let key = ClientRequest { client_id, request };
+        let prev = self.pending.insert(key, notify.clone());
+        assert!(
+            prev.is_none(),
+            "client {client_id} request {request} already has a pending waiter"
+        );
+        notify
+    }
+
+    /// Record a committed reply and cache it.
+    ///
+    /// If the client already has a slot, updates it in place. Otherwise 
allocates
+    /// a free slot, evicting the client with the oldest commit number if the 
table
+    /// is full.
+    ///
+    /// Wakes the pending [`Notify`] for this `(client_id, request)` if one 
exists.
+    ///
+    /// Called in `on_ack` after `build_reply_message`.
+    ///
+    /// # Panics
+    /// Panics if the internal index points to an empty slot (invariant 
violation).
+    pub fn commit_reply(&mut self, client_id: u128, reply: 
Message<ReplyHeader>) {
+        let request = reply.header().request;
+
+        if let Some(&slot_idx) = self.index.get(&client_id) {
+            // Update existing slot in place.
+            let slot = self.slots[slot_idx].as_mut().expect("index/slot 
mismatch");
+            slot.reply = reply;
+        } else {
+            // Need a free slot. Evict if full.
+            if self.index.len() >= self.slots.len() {
+                self.evict_oldest();
+            }
+
+            let slot_idx = self.first_free_slot().expect("eviction must free a 
slot");
+            self.slots[slot_idx] = Some(ClientEntry { reply });
+            self.index.insert(client_id, slot_idx);
+        }
+
+        // Wake the waiter, if any.
+        let key = ClientRequest { client_id, request };
+        if let Some(notify) = self.pending.remove(&key) {
+            notify.notify();
+        }
+    }
+
+    /// Evict the client with the oldest commit number.
+    ///
+    /// Iterates the fixed-size slot array (deterministic order), so all 
replicas
+    /// with the same committed state evict the same client. Ties on commit 
number
+    /// are broken by slot index (lowest index wins), which is also 
deterministic.
+    fn evict_oldest(&mut self) {

Review Comment:
   when the 8192-slot table is full, `evict_oldest()` removes the client with 
the lowest commit number. if that evicted client retransmits a committed 
request, `check_request` returns `New` (since `index.get(&client_id)` is now 
`None`), causing re-execution. this violates VSR's at-most-once semantics.
   
   the VR paper addresses this via checkpoint inclusion of the client-table 
(which isn't implemented yet - acknowledged in the TODO at line 147). worth a 
comment here noting that eviction can break the dedup invariant for the evicted 
client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to