numinnex commented on code in PR #3223:
URL: https://github.com/apache/iggy/pull/3223#discussion_r3218234064
##########
core/consensus/src/impls.rs:
##########
@@ -84,6 +84,10 @@ impl Sequencer for LocalSequencer {
/// Maximum number of prepares that can be in-flight in the pipeline.
pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
+/// Max accepted-but-not-yet-prepared requests buffered behind a full
+/// prepare queue. Beyond this, requests drop and the client retries.
+pub const PIPELINE_REQUEST_QUEUE_MAX: usize = 64;
Review Comment:
I am not sure whether we want to have such queue tho, I think once we reach
the PIPELINE_PREPARE_QUEUE_MAX depth, we should simply reject those requests
with some error indicating backpressure.
##########
core/consensus/src/impls.rs:
##########
@@ -130,27 +164,33 @@ impl PipelineEntry {
}
}
-/// A request message waiting to be prepared.
+/// Accepted request waiting in `request_queue` for a prepare slot.
+#[derive(Debug)]
pub struct RequestEntry {
pub message: Message<RequestHeader>,
- /// Timestamp when the request was received (for ordering/timeout).
- pub received_at: i64, //TODO figure the correct way to do this
+ // TODO: populate from monotonic clock at push, promote to `pub` for
+ // age-based filtering. Currently `0`; `pub(crate)` blocks sort-on-stub.
+ #[allow(dead_code)]
+ pub(crate) received_at: i64,
}
impl RequestEntry {
#[must_use]
pub const fn new(message: Message<RequestHeader>) -> Self {
Self {
message,
- received_at: 0, //TODO figure the correct way to do this
+ received_at: 0,
}
}
}
+/// Two-queue pipeline: in-flight prepares + buffered requests.
#[derive(Debug)]
pub struct LocalPipeline {
- /// Messages being prepared (uncommitted and being replicated).
+ /// Uncommitted prepares; cap [`PIPELINE_PREPARE_QUEUE_MAX`].
prepare_queue: VecDeque<PipelineEntry>,
+ /// Requests awaiting a prepare slot; cap [`PIPELINE_REQUEST_QUEUE_MAX`].
+ request_queue: VecDeque<RequestEntry>,
Review Comment:
As mentioned above, not sure about introducing this, especially considering
the consensus per partition design, where each partition has it's own
`LocalPipeline`.
##########
core/binary_protocol/src/consensus/header.rs:
##########
@@ -303,6 +308,188 @@ impl ConsensusHeader for ReplyHeader {
}
}
+// EvictionReason, wire-level reason in EvictionHeader.
+//
+// Discriminants pinned: any reorder/reuse breaks SDK decoders. New
+// variants also break old SDKs (CheckedBitPattern fails). Coordinate
+// SDK releases on every extension.
+
+/// Wire reason on [`EvictionHeader`]. Session-terminal; never transient.
+/// No `Default`: callers must name reason so `..default()` can't ship
+/// `Reserved`.
+///
+/// **Wire-version pinned.**
+#[derive(Debug, Clone, Copy, PartialEq, Eq, NoUninit, CheckedBitPattern)]
+#[repr(u8)]
+pub enum EvictionReason {
+ /// Sentinel; rejected on wire.
+ Reserved = 0,
+
+ /// No session for `client_id`.
+ NoSession = 1,
+ /// Client release < cluster min.
+ ClientReleaseTooLow = 2,
+ /// Client release > cluster max.
+ ClientReleaseTooHigh = 3,
+ /// Invalid operation discriminant.
+ InvalidRequestOperation = 4,
+ /// Body failed state-machine validation.
+ InvalidRequestBody = 5,
Review Comment:
I am not sure if we want to evict clients on failed validation of the
`Command`. Currently our server doesn't do that, instead it just sends an error
response. @hubcio What do you think ?
##########
core/consensus/src/client_table.rs:
##########
@@ -15,207 +15,160 @@
// specific language governing permissions and limitations
// under the License.
+use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
+use iggy_binary_protocol::consensus::iobuf::Frozen;
use iggy_binary_protocol::{Message, ReplyHeader};
-use std::cell::RefCell;
use std::collections::HashMap;
-use std::future::Future;
-use std::rc::Rc;
-use std::task::Waker;
-
-/// Identifies a specific request from a specific client.
-/// Used as the key for the pending-commit waiter map.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct ClientRequest {
- pub client_id: u128,
- pub request: u64,
-}
+use std::mem::size_of;
+use tracing::trace;
-/// Inner state shared between `Notify` clones via `Rc`.
-#[derive(Debug)]
-struct NotifyInner {
- waker: RefCell<Option<Waker>>,
- notified: std::cell::Cell<bool>,
-}
-
-/// Lightweight, single-threaded async notification primitive.
-///
-/// ## Usage
+/// Refcounted wrapper around a committed reply.
///
-/// ```ignore
-/// let notify = Notify::new();
-/// let waiter = notify.clone();
+/// Bytes are deterministic across replicas: `build_reply_message` reads
+/// only from the prepare header, so a backup-promoted primary replays
+/// the exact bytes the original primary produced.
///
-/// // Producer side (in commit_reply):
-/// notify.notify();
-///
-/// // Consumer side (caller awaiting the commit):
-/// waiter.notified().await;
-/// ```
+/// Immutable by construction: [`Frozen`] has no mutable accessor.
#[derive(Debug, Clone)]
-pub struct Notify {
- inner: Rc<NotifyInner>,
+pub struct CachedReply {
+ bytes: Frozen<MESSAGE_ALIGN>,
}
-impl Notify {
- /// Create a new `Notify` in the un-notified state.
+impl CachedReply {
+ /// Reply header view.
+ ///
+ /// # Panics
+ /// Unreachable: prefix validated by [`Message::try_from`] at construction;
+ /// `Frozen` has no mutable accessor.
#[must_use]
- pub fn new() -> Self {
- Self {
- inner: Rc::new(NotifyInner {
- waker: RefCell::new(None),
- notified: std::cell::Cell::new(false),
- }),
- }
+ pub fn header(&self) -> &ReplyHeader {
+
bytemuck::checked::try_from_bytes(&self.bytes.as_slice()[..size_of::<ReplyHeader>()])
+ .expect("cached reply bytes contain a valid ReplyHeader (validated
at storage time)")
}
- /// Wake the waiter, if any. If `notified()` is polled later, it will
- /// resolve immediately.
- pub fn notify(&self) {
- self.inner.notified.set(true);
- if let Some(waker) = self.inner.waker.borrow_mut().take() {
- waker.wake();
- }
- }
-
- /// Returns a future that resolves when [`notify()`](Self::notify) is
called.
+ /// Consume into wire-shareable [`Frozen`] buffer.
///
- /// If `notify()` was already called before this future is polled, it
- /// resolves immediately (permit is consumed).
- #[allow(clippy::future_not_send)]
- pub fn notified(&self) -> impl Future<Output = ()> + '_ {
- std::future::poll_fn(move |cx| {
- if self.inner.notified.get() {
- self.inner.notified.set(false);
- std::task::Poll::Ready(())
- } else {
- *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
- std::task::Poll::Pending
- }
- })
+ /// `MessageBus::send_to_client` takes `Frozen<MESSAGE_ALIGN>` directly.
+ /// To retain the cached entry, `.clone()` (Arc bump) first.
+ #[must_use]
+ pub fn into_wire_bytes(self) -> Frozen<MESSAGE_ALIGN> {
+ self.bytes
}
}
-impl Default for Notify {
- fn default() -> Self {
- Self::new()
+impl CachedReply {
+ /// Freeze owned buffer in place; no alloc. Subsequent `Clone`s are Arc
bumps.
+ ///
+ /// `pub(crate)` so [`Self::header`]'s validity invariant cannot be
+ /// bypassed by an unvalidated buffer from outside the crate.
+ pub(crate) fn from_message(msg: Message<ReplyHeader>) -> Self {
+ Self {
+ bytes: msg.into_generic().into_frozen(),
+ }
}
}
-/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+/// Reserved request number for
[`Operation::Register`](iggy_binary_protocol::Operation::Register).
+/// Real requests start at 1 (header validation enforces `request > 0`).
+pub const REGISTER_REQUEST_ID: u64 = 0;
+
+/// Per-client entry (VR paper §4, Fig. 2): session + latest committed reply.
///
-/// Stores the session number and the reply for the client's latest committed
-/// request. The session number is assigned once at registration (from the
-/// commit op number) and never changes for the lifetime of the entry.
+/// `session` is assigned at registration and fixed for the entry's lifetime.
#[derive(Debug)]
pub struct ClientEntry {
- /// Session number assigned at registration time (= commit op number of the
- /// register operation). Monotonically increasing across registrations.
- /// Used to distinguish between successive registrations from different
- /// client processes, a new register always gets a higher session.
+ /// Session number = commit op of the register. Monotonic across
+ /// registrations; new register always gets a higher session.
pub session: u64,
- /// The cached reply for the client's latest committed request (header +
body).
- pub reply: Message<ReplyHeader>,
+ /// Cached reply for client's latest committed request.
+ pub reply: CachedReply,
}
/// Result of checking a request against the client table.
+///
+/// In-progress dedup is the caller's job, preflights consult
+/// `pipeline.has_message_from_client(client_id)`. `ClientTable` only sees
+/// committed state.
#[derive(Debug)]
pub enum RequestStatus {
- /// Request not seen before, proceed with consensus.
+ /// Not seen; proceed with consensus.
New,
- /// Exact request already committed, re-send cached reply.
- Duplicate(Message<ReplyHeader>),
- /// Request is in the pipeline awaiting commit, drop (client should wait).
- InProgress,
- /// Request number is older than the client's latest committed request.
- /// Already handled in a prior commit cycle, drop silently.
+ /// Exact request already committed; re-send cached reply.
+ Duplicate(CachedReply),
Review Comment:
We've been talking about it that I am not sure whether our clients table
should cache the responses or only function as an deduplicator, that would send
an error/empty response on duplicated `request_id`.
There are potential issues with caching the response.
1) Our most common request path on the server is Send/Poll messages, in the
SendMessages those cached reply are very small, on the PollMessages path that
is not the case. Additionally we would have duplicated data in the memory, as
we are already caching messages in the partition layer.
2) Non idempotent clients: Imagine a scenario in which our client as it's
own internal logic stores `stream_id` or id of any other server resources in
it's own data strucutres. Client creates stream with id 1, the request gets
retried on timeout due to flaky network, the first request that got sent is
handled by the server, the response is sent - client reacts to it by storing
that Stream with id 1 has been created by storing it in its own DS, then the
second request (the retried one) arrives at the server, the server sees that it
is a duplicate sends back the cached response clients reacts to it by storing
the information that Stream with stream id 1 has been created, essentially
duplicating the information.
##########
core/server-ng/src/login_register.rs:
##########
@@ -17,71 +17,74 @@
* under the License.
*/
-//! Combined login + register handler for server-ng.
+//! Combined login + register handler.
//!
-//! One client-facing command, two internal phases:
-//! 1. Verify credentials locally (Argon2). NOT through consensus
-//! 2. Submit `Operation::Register` through consensus. All replicas create
`ClientTable` entry
+//! One client command, two phases:
+//! 1. Local credential verify (Argon2). Not consensus.
+//! 2. `Operation::Register` through consensus -> `ClientTable` entry on all
replicas.
//!
-//! The handler is trait-based so it can be tested via mocking.
+//! Trait-based for mocking.
use crate::session_manager::{SessionError, SessionManager};
+use iggy_binary_protocol::EvictionReason;
use iggy_binary_protocol::requests::users::{LoginRegisterRequest,
LoginRegisterWithPatRequest};
use iggy_binary_protocol::responses::users::LoginRegisterResponse;
+use metadata::RegisterSubmitError;
use secrecy::ExposeSecret;
-/// Credential verification abstraction.
-///
-/// The real implementation delegates to the shard's metadata user store
-/// and Argon2 password hashing. Test implementations return fixed results.
+/// Credential verifier. Real impl: metadata user store + Argon2.
pub trait CredentialVerifier {
- /// Verify username/password. Returns `user_id` on success.
+ /// Verify username/password. Returns `user_id`.
///
/// # Errors
- /// Returns `LoginRegisterError` if credentials are invalid.
+ /// `LoginRegisterError` on invalid credentials.
fn verify(&self, username: &str, password: &str) -> Result<u32,
LoginRegisterError>;
}
-/// Personal access token verification abstraction.
-///
-/// The real implementation looks up the PAT by hash in the user store,
-/// checks expiry, and returns the owning user's ID.
+/// PAT verifier. Real impl: hash lookup + expiry check.
pub trait TokenVerifier {
- /// Verify a personal access token. Returns `user_id` on success.
+ /// Verify PAT. Returns `user_id`.
///
/// # Errors
- /// Returns `LoginRegisterError` if the token is invalid or expired.
+ /// `LoginRegisterError` on invalid/expired token.
fn verify_token(&self, token: &str) -> Result<u32, LoginRegisterError>;
}
-/// Consensus register submission abstraction.
+/// Submit `Register` through consensus.
+///
+/// # Runtime
+/// `Future` is **not `Send`**. Production
+/// ([`crate::register_submitter::IggyRegisterSubmitter`]) wraps
+/// `IggyMetadata::submit_register_in_process`, whose state is `RefCell`/`Cell`
+/// on single-threaded `compio`. Multi-threaded embedders need a shim or
+/// custom transport; constraining `Send` would tax every call site.
///
-/// The real implementation builds a `RequestHeader { operation: Register }`,
-/// calls `check_register` on the `ClientTable`, submits through the consensus
-/// pipeline, and awaits the `Notify` for commit. Returns the session number
-/// (commit op number).
+/// # Failures
+/// Transient (pipeline-full, view-change cancel, primary-not-caught-up,
+/// in-flight register) MUST eventually be absorbed via bounded retry. Until
+/// then they surface as [`LoginRegisterError::Transient`] → `NotEvictable`,
+/// so network can't ship a wire-terminal `Eviction` for a recoverable
+/// failure. SDK read-timeout replays.
+///
+/// Terminal → [`EvictionReason`] in `EvictionHeader`; SDK invokes its
+/// eviction callback and stops.
pub trait RegisterSubmitter {
Review Comment:
Do we need to have this behind a trait ? Couldn't this go through the
`IggyMetadata` path ? and just simply be yet another command that is handled by
the `MuxStateMachine` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]