krishvishal commented on code in PR #3023: URL: https://github.com/apache/iggy/pull/3023#discussion_r3036826454
########## core/consensus/src/client_table.rs: ########## @@ -0,0 +1,530 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_binary_protocol::{Message, ReplyHeader}; +use std::cell::RefCell; +use std::collections::HashMap; +use std::future::Future; +use std::rc::Rc; +use std::task::Waker; + +/// Identifies a specific request from a specific client. +/// Used as the key for the pending-commit waiter map. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ClientRequest { + pub client_id: u128, + pub request: u64, +} + +/// Inner state shared between `Notify` clones via `Rc`. +#[derive(Debug)] +struct NotifyInner { + waker: RefCell<Option<Waker>>, + notified: std::cell::Cell<bool>, +} + +/// Lightweight, single-threaded async notification primitive. +/// +/// ## Usage +/// +/// ```ignore +/// let notify = Notify::new(); +/// let waiter = notify.clone(); +/// +/// // Producer side (in commit_reply): +/// notify.notify(); +/// +/// // Consumer side (caller awaiting the commit): +/// waiter.notified().await; +/// ``` +#[derive(Debug, Clone)] +pub struct Notify { + inner: Rc<NotifyInner>, +} + +impl Notify { + /// Create a new `Notify` in the un-notified state. + #[must_use] + pub fn new() -> Self { + Self { + inner: Rc::new(NotifyInner { + waker: RefCell::new(None), + notified: std::cell::Cell::new(false), + }), + } + } + + /// Wake the waiter, if any. If `notified()` is polled later, it will + /// resolve immediately. + pub fn notify(&self) { + self.inner.notified.set(true); + if let Some(waker) = self.inner.waker.borrow_mut().take() { + waker.wake(); + } + } + + /// Returns a future that resolves when [`notify()`](Self::notify) is called. + /// + /// If `notify()` was already called before this future is polled, it + /// resolves immediately (permit is consumed). + #[allow(clippy::future_not_send)] + pub fn notified(&self) -> impl Future<Output = ()> + '_ { + std::future::poll_fn(move |cx| { + if self.inner.notified.get() { + self.inner.notified.set(false); + std::task::Poll::Ready(()) + } else { + *self.inner.waker.borrow_mut() = Some(cx.waker().clone()); + std::task::Poll::Pending + } + }) + } +} + +impl Default for Notify { + fn default() -> Self { + Self::new() + } +} + +/// Per-client entry in the clients table (VR paper Section 4, Figure 2). +/// +/// Stores the reply for the client's latest committed request. The client ID, +/// request number, and commit number are all read from `reply.header()`. +#[derive(Debug)] +pub struct ClientEntry { + /// The cached reply for the client's latest committed request (header + body). + pub reply: Message<ReplyHeader>, +} + +/// Result of checking a request against the clients table. +pub enum RequestStatus { + /// Request not seen before — proceed with consensus. + New, + /// Request already committed — re-send cached reply. + Duplicate(Message<ReplyHeader>), + /// Request is in the pipeline awaiting commit — drop (client should wait). + InProgress, +} + +/// VSR client-table: tracks per-client request state for duplicate detection, +/// reply caching, and async commit notification. +/// +/// Uses a fixed-size slot array as the source of truth, with a `HashMap` +/// as a secondary index for O(1) lookups by client ID. +/// +/// ## Committed state (`slots` + `index`) +/// +/// Always contains a valid `ClientEntry` with a non-optional reply. +/// Updated by `commit_reply` when a request commits through consensus. +/// +/// ## Pending state (`pending`) +/// +/// Tracks in-flight requests that have been accepted for consensus but not yet +/// committed. Each entry holds a [`Notify`] that is fired when the corresponding +/// `commit_reply` arrives. Keyed by `ClientRequest` to support future +/// request pipelining (currently at most one per client). +/// +/// The `pending` map is local notification state not replicated, not +/// serialized, not part of the deterministic committed state. +/// +/// ## TODO +/// Checkpoint serialization: the slot array is laid out for deterministic +/// encode/decode to disk. +#[derive(Debug)] +pub struct ClientsTable { + /// `None` means the slot is free. + /// Deterministic iteration order for eviction and serialization. + slots: Vec<Option<ClientEntry>>, + /// Secondary index: `client_id` → slot index. Rebuilt on decode. + index: HashMap<u128, usize>, + /// Pending commit waiters, keyed by `(client_id, request)`. + /// Keyed by request number (not just client) to support future pipelining. + /// Currently at most one per client. + pending: HashMap<ClientRequest, Notify>, +} + +impl ClientsTable { + #[must_use] + pub fn new(max_clients: usize) -> Self { + let mut slots = Vec::with_capacity(max_clients); + slots.resize_with(max_clients, || None); + Self { + slots, + index: HashMap::with_capacity(max_clients), + pending: HashMap::new(), + } + } + + /// Check a request against the table. + /// + /// Returns: + /// - [`RequestStatus::New`] — not seen before, proceed with consensus + /// - [`RequestStatus::Duplicate`] — already committed, re-send cached reply + /// - [`RequestStatus::InProgress`] — stale, already pending, or already committed + /// + /// # Panics + /// Panics if the internal index points to an empty slot (invariant violation). + #[must_use] + pub fn check_request(&self, client_id: u128, request: u64) -> RequestStatus { + // Check if already pending in the pipeline. + let key = ClientRequest { client_id, request }; + if self.pending.contains_key(&key) { + return RequestStatus::InProgress; + } + + let Some(&slot_idx) = self.index.get(&client_id) else { + return RequestStatus::New; + }; + let entry = self.slots[slot_idx].as_ref().expect("index/slot mismatch"); + let committed_request = entry.reply.header().request; + + if request < committed_request { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
