This is an automated email from the ASF dual-hosted git repository. hubcio pushed a commit to branch feat/message-bus-transports in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 440994de444ff7f79d296b2e56eca0dee50c12fd Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Apr 24 11:42:59 2026 +0200 feat(server-ng): at-most-once request dedup window (IGGY-112, P0-T3) Add a per-shard Dedup store indexed by (client_id, request) pairs that the existing RequestHeader already carries. The P0-T1 design note found that RequestHeader has client: u128, request: u64, and request_checksum: u128 today, so the phase-0 plan's "add request_id: u128" wire widening is not needed; this commit skips it. Dedup entries are stored in a fixed-capacity per-client ring (DEFAULT_PER_CLIENT_CAPACITY = 64). Memory caps at clients * capacity * avg_reply_size without needing a purge tick. Entries carry inserted_at_ns; lookup treats entries older than DEFAULT_ENTRY_TTL_NS (30 s, matching the replica-plane handshake window) as absent so a stale replay reports Fresh. evict_client drops all state for a (client_id, *) on disconnect; higher layers call it from SessionManager on bind-eviction. The API is lookup / mark_in_flight / complete / evict_client. Callers see LookupResult { Fresh, InFlight, Cached(&Bytes) }. The `&Bytes` return lets the fast-path cache hit clone a reference-counted handle without copying the reply payload. Not wired into a dispatcher yet. server-ng currently has no request dispatch loop; the module ships as a standalone primitive ready for when the loop lands. Benchmark against the 100 ns/request target in the phase-0 acceptance is deferred to a follow-up commit. Tests: 11 dedup unit tests exercising state transitions, TTL boundaries, ring eviction, cross-client isolation, and the refresh-on-overwrite timestamp path. Refs: Documents/silverhand/iggy/message_bus/transport-plan/ (P0-T3, designs/message-bus-auth.md). --- core/server-ng/src/dedup.rs | 363 ++++++++++++++++++++++++++++++++++++++++++++ core/server-ng/src/lib.rs | 1 + 2 files changed, 364 insertions(+) diff --git a/core/server-ng/src/dedup.rs b/core/server-ng/src/dedup.rs new file mode 100644 index 000000000..0022e9d85 --- /dev/null +++ b/core/server-ng/src/dedup.rs @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! At-most-once request dedup window (IGGY-112, P0-T3). +//! +//! Indexed by `(client_id, request)` pairs carried in the existing +//! `RequestHeader.client` (u128) and `RequestHeader.request` (u64) fields. +//! No wire-format change on the server-ng client plane was needed: the +//! P0-T1 design note found the two fields already in place. +//! +//! Per-client state is a fixed-capacity ring holding the last N +//! `(request, Entry)` pairs. +//! +//! A ring avoids unbounded growth under a chatty client without +//! requiring TTL-driven purge ticks. Done entries also carry an +//! `inserted_at_ns` timestamp; `lookup` treats entries older than +//! `ttl_ns` as absent so a replay from far in the past sees `Fresh` +//! rather than a stale cache hit. +//! +//! The store is NOT thread-safe. Each server-ng shard owns a `Dedup` +//! on its single-threaded compio runtime. Cross-shard routing happens +//! above this layer. + +use ahash::AHashMap; +use bytes::Bytes; + +/// Per-client ring capacity on the dedup window. Sized for a pipelining +/// client issuing tens of outstanding requests; small enough to cap +/// memory at `clients * 64 * avg_reply_size`. +pub const DEFAULT_PER_CLIENT_CAPACITY: usize = 64; + +/// Entry TTL. +/// +/// An entry older than this is invisible to `lookup`; a replay past +/// the window is treated as `Fresh`. 30 s matches the replica-plane +/// handshake window (`core/message_bus/src/auth.rs`) so operators see +/// one number in their threat model, not two. +pub const DEFAULT_ENTRY_TTL_NS: u128 = 30_000_000_000; + +/// Result of `Dedup::lookup`. +/// +/// Returned as a borrowed view into the cache: `Cached` hands the caller +/// a `&Bytes` it can `clone()` (cheap, reference-counted) before +/// releasing the borrow. Avoids copying the reply payload on the +/// fast-path cache hit. +#[derive(Debug)] +pub enum LookupResult<'a> { + /// No (live) entry for this `(client, request)`. Caller should call + /// [`Dedup::mark_in_flight`] and run the handler. + Fresh, + /// The same request is currently being processed by an earlier + /// invocation. Caller policy decides: drop (client retry will find + /// the cached reply once handler completes), or buffer. + InFlight, + /// Handler already ran; return the cached reply. + Cached(&'a Bytes), +} + +#[derive(Debug)] +enum Entry { + InFlight, + Done { reply: Bytes, inserted_at_ns: u128 }, +} + +#[derive(Debug)] +struct Slot { + request: u64, + entry: Entry, +} + +#[derive(Debug)] +struct PerClient { + slots: Vec<Slot>, + cursor: usize, + capacity: usize, +} + +impl PerClient { + fn with_capacity(capacity: usize) -> Self { + assert!(capacity > 0, "dedup ring capacity must be > 0"); + Self { + slots: Vec::with_capacity(capacity), + cursor: 0, + capacity, + } + } + + fn find(&self, request: u64) -> Option<&Slot> { + self.slots.iter().find(|s| s.request == request) + } + + fn find_mut(&mut self, request: u64) -> Option<&mut Slot> { + self.slots.iter_mut().find(|s| s.request == request) + } + + fn push(&mut self, slot: Slot) { + if self.slots.len() < self.capacity { + self.slots.push(slot); + } else { + self.slots[self.cursor] = slot; + self.cursor = (self.cursor + 1) % self.capacity; + } + } +} + +/// At-most-once request dedup window. +/// +/// Single-shard, single-threaded. Callers hold a `&mut Dedup` and drive +/// the lookup / mark / complete flow synchronously on the hot path. +#[derive(Debug)] +pub struct Dedup { + per_client: AHashMap<u128, PerClient>, + per_client_capacity: usize, + ttl_ns: u128, +} + +impl Dedup { + /// Build a window with defaults: 64-entry per-client ring, 30 s TTL. + #[must_use] + pub fn new() -> Self { + Self::with_config(DEFAULT_PER_CLIENT_CAPACITY, DEFAULT_ENTRY_TTL_NS) + } + + /// # Panics + /// Panics if `per_client_capacity == 0`. Every producing path in + /// the crate uses [`DEFAULT_PER_CLIENT_CAPACITY`]; the assertion + /// guards misconfiguration in tests or future callers. + #[must_use] + pub fn with_config(per_client_capacity: usize, ttl_ns: u128) -> Self { + assert!(per_client_capacity > 0, "capacity must be > 0"); + Self { + per_client: AHashMap::new(), + per_client_capacity, + ttl_ns, + } + } + + /// Inspect the state for `(client, request)` at `now_ns`. + /// + /// Done entries older than the TTL are treated as absent; the window + /// still holds the slot but does not report it. On eviction by a + /// later `push`, the slot is overwritten silently. + #[must_use] + pub fn lookup(&self, client: u128, request: u64, now_ns: u128) -> LookupResult<'_> { + let Some(state) = self.per_client.get(&client) else { + return LookupResult::Fresh; + }; + let Some(slot) = state.find(request) else { + return LookupResult::Fresh; + }; + match &slot.entry { + Entry::InFlight => LookupResult::InFlight, + Entry::Done { + reply, + inserted_at_ns, + } => { + if now_ns.saturating_sub(*inserted_at_ns) > self.ttl_ns { + LookupResult::Fresh + } else { + LookupResult::Cached(reply) + } + } + } + } + + /// Mark the handler as starting. Returns `true` if the slot was + /// inserted; `false` if an entry for `(client, request)` already + /// existed (caller must consult [`Self::lookup`] for the specific + /// state). + pub fn mark_in_flight(&mut self, client: u128, request: u64) -> bool { + let state = self + .per_client + .entry(client) + .or_insert_with(|| PerClient::with_capacity(self.per_client_capacity)); + if state.find(request).is_some() { + return false; + } + state.push(Slot { + request, + entry: Entry::InFlight, + }); + true + } + + /// Record a completed reply. Transitions the slot from `InFlight` + /// to `Done`. If no slot exists (e.g. TTL-evicted between + /// `mark_in_flight` and `complete`), a new `Done` slot is appended. + pub fn complete(&mut self, client: u128, request: u64, reply: Bytes, now_ns: u128) { + let state = self + .per_client + .entry(client) + .or_insert_with(|| PerClient::with_capacity(self.per_client_capacity)); + if let Some(slot) = state.find_mut(request) { + slot.entry = Entry::Done { + reply, + inserted_at_ns: now_ns, + }; + return; + } + state.push(Slot { + request, + entry: Entry::Done { + reply, + inserted_at_ns: now_ns, + }, + }); + } + + /// Drop all state for `client`. Call on disconnect (or on + /// `bind_session` eviction inside `SessionManager`) so the + /// per-client ring is released deterministically rather than + /// waiting for TTL expiry of individual entries. + pub fn evict_client(&mut self, client: u128) { + self.per_client.remove(&client); + } + + /// Current number of tracked clients. Test / metrics helper. + #[must_use] + pub fn client_count(&self) -> usize { + self.per_client.len() + } +} + +impl Default for Dedup { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn bytes(s: &str) -> Bytes { + Bytes::copy_from_slice(s.as_bytes()) + } + + #[test] + fn fresh_lookup_on_empty_store() { + let d = Dedup::new(); + assert!(matches!(d.lookup(1, 1, 0), LookupResult::Fresh)); + } + + #[test] + fn mark_then_lookup_returns_in_flight() { + let mut d = Dedup::new(); + assert!(d.mark_in_flight(1, 1)); + assert!(matches!(d.lookup(1, 1, 0), LookupResult::InFlight)); + } + + #[test] + fn double_mark_returns_false() { + let mut d = Dedup::new(); + assert!(d.mark_in_flight(1, 1)); + assert!(!d.mark_in_flight(1, 1)); + } + + #[test] + fn complete_transitions_to_cached() { + let mut d = Dedup::new(); + d.mark_in_flight(1, 7); + d.complete(1, 7, bytes("ok"), 1_000); + match d.lookup(1, 7, 1_000) { + LookupResult::Cached(reply) => assert_eq!(&reply[..], b"ok"), + other => panic!("expected Cached, got {other:?}"), + } + } + + #[test] + fn complete_without_mark_inserts_directly() { + let mut d = Dedup::new(); + d.complete(1, 7, bytes("ok"), 0); + assert!(matches!(d.lookup(1, 7, 0), LookupResult::Cached(_))); + } + + #[test] + fn ttl_expired_entry_reports_fresh() { + let mut d = Dedup::with_config(8, 1_000); + d.complete(1, 7, bytes("ok"), 0); + assert!(matches!(d.lookup(1, 7, 2_000), LookupResult::Fresh)); + } + + #[test] + fn ttl_at_boundary_still_cached() { + let mut d = Dedup::with_config(8, 1_000); + d.complete(1, 7, bytes("ok"), 0); + assert!(matches!(d.lookup(1, 7, 1_000), LookupResult::Cached(_))); + } + + #[test] + fn ring_evicts_oldest_entry() { + let mut d = Dedup::with_config(3, DEFAULT_ENTRY_TTL_NS); + d.complete(1, 1, bytes("a"), 10); + d.complete(1, 2, bytes("b"), 20); + d.complete(1, 3, bytes("c"), 30); + d.complete(1, 4, bytes("d"), 40); + // Slot for request 1 was overwritten by request 4. + assert!(matches!(d.lookup(1, 1, 50), LookupResult::Fresh)); + assert!(matches!(d.lookup(1, 2, 50), LookupResult::Cached(_))); + assert!(matches!(d.lookup(1, 3, 50), LookupResult::Cached(_))); + assert!(matches!(d.lookup(1, 4, 50), LookupResult::Cached(_))); + } + + #[test] + fn evict_client_drops_all_entries() { + let mut d = Dedup::new(); + d.complete(1, 1, bytes("a"), 10); + d.complete(1, 2, bytes("b"), 20); + d.evict_client(1); + assert!(matches!(d.lookup(1, 1, 30), LookupResult::Fresh)); + assert_eq!(d.client_count(), 0); + } + + #[test] + fn isolation_between_clients() { + let mut d = Dedup::new(); + d.complete(1, 5, bytes("a"), 10); + d.complete(2, 5, bytes("b"), 20); + match d.lookup(1, 5, 30) { + LookupResult::Cached(r) => assert_eq!(&r[..], b"a"), + other => panic!("expected Cached a, got {other:?}"), + } + match d.lookup(2, 5, 30) { + LookupResult::Cached(r) => assert_eq!(&r[..], b"b"), + other => panic!("expected Cached b, got {other:?}"), + } + } + + #[test] + fn complete_updates_inserted_at_on_refresh() { + let mut d = Dedup::with_config(8, 100); + d.complete(1, 1, bytes("v1"), 0); + // Refresh with new timestamp and payload. + d.complete(1, 1, bytes("v2"), 50); + match d.lookup(1, 1, 140) { + LookupResult::Cached(r) => assert_eq!(&r[..], b"v2"), + other => panic!("expected Cached v2, got {other:?}"), + } + // Would have been TTL-expired if inserted_at had stayed at 0. + } + + #[test] + #[should_panic(expected = "capacity must be > 0")] + fn zero_capacity_rejected() { + let _ = Dedup::with_config(0, DEFAULT_ENTRY_TTL_NS); + } +} diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs index caae94a3a..0aa424bba 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/lib.rs @@ -17,5 +17,6 @@ * under the License. */ +pub mod dedup; pub mod login_register; pub mod session_manager;
