hubcio commented on code in PR #3192: URL: https://github.com/apache/iggy/pull/3192#discussion_r3179977095
########## core/server-ng/src/dedup.rs: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// `Dedup` is consumed only by the `#[cfg(test)]` tests below today; the +// request dispatcher loop in server-ng is not yet wired up. Once the +// dispatcher lands and calls `lookup` / `mark_in_flight` / `complete` / +// `evict_client`, the `dead_code` warnings disappear without needing +// this allow. +#![allow(dead_code)] + +//! At-most-once **per-shard-lifetime** request dedup window. +//! +//! Indexed by `(client_id, request)` pairs carried in the existing +//! `RequestHeader.client` (u128) and `RequestHeader.request` (u64) fields. +//! No wire-format change on the server-ng client plane is needed: both +//! fields are already part of the request header. +//! +//! # Lifetime scope (load-bearing) +//! +//! "At-most-once" applies for the lifetime of THIS shard's `Dedup` +//! instance. Process restart drops every entry: a client that retries a +//! request after a server restart re-applies side effects unless the +//! request is idempotent at the handler level. Cross-process persistence +//! would need a checkpoint file backed by the consensus WAL and is +//! deferred. Operators sizing client-side retry policy must treat the +//! dedup window as a *crash-window* mitigation only. +//! +//! # Per-client ring +//! +//! Per-client state is a fixed-capacity ring holding the last N +//! `(request, Entry)` pairs. A ring avoids unbounded growth under a +//! chatty client without requiring TTL-driven purge ticks. Done entries +//! also carry an `inserted_at_ns` timestamp; `lookup` treats entries +//! older than `ttl_ns` as absent so a replay from far in the past sees +//! `Fresh` rather than a stale cache hit. +//! +//! # Threading +//! +//! The store is NOT thread-safe. Each server-ng shard owns a `Dedup` on +//! its single-threaded compio runtime. Cross-shard routing happens above +//! this layer. Caller MUST invoke [`Dedup::evict_client`] on session +//! disconnect / bind eviction so the per-client `AHashMap` entry is +//! released; without that hook the outer map grows unbounded across the +//! shard's session history. + +use ahash::AHashMap; +use bytes::Bytes; + +/// Per-client ring capacity on the dedup window. Sized for a pipelining +/// client issuing tens of outstanding requests; small enough to cap +/// memory at `clients * 64 * avg_reply_size`. +pub const DEFAULT_PER_CLIENT_CAPACITY: usize = 64; + +/// Entry TTL. +/// +/// An entry older than this is invisible to `lookup`; a replay past +/// the window is treated as `Fresh`. 30 s is comfortably larger than +/// any per-request retry timer the SDK uses, so a client retry that +/// follows a slow handler still sees the cached reply. +pub const DEFAULT_ENTRY_TTL_NS: u128 = 30_000_000_000; + +/// Result of `Dedup::lookup`. +/// +/// Returned as a borrowed view into the cache: `Cached` hands the caller +/// a `&Bytes` it can `clone()` (cheap, reference-counted) before +/// releasing the borrow. Avoids copying the reply payload on the +/// fast-path cache hit. +#[derive(Debug)] +pub enum LookupResult<'a> { + /// No (live) entry for this `(client, request)`. Caller should call + /// [`Dedup::mark_in_flight`] and run the handler. + Fresh, + /// The same request is currently being processed by an earlier + /// invocation. Caller policy decides: drop (client retry will find + /// the cached reply once handler completes), or buffer. + InFlight, + /// Handler already ran; return the cached reply. + /// + /// The borrow ties to `&Dedup`. Callers MUST `clone()` the `Bytes` + /// (cheap, ref-counted) before invoking any `&mut Dedup` method on + /// the same instance; the borrow checker enforces this at compile + /// time but the contract is worth naming explicitly. + Cached(&'a Bytes), +} + +#[derive(Debug)] +enum Entry { + InFlight, + Done { reply: Bytes, inserted_at_ns: u128 }, +} + +#[derive(Debug)] +struct Slot { + request: u64, + entry: Entry, +} + +#[derive(Debug)] +struct PerClient { + slots: Vec<Slot>, + cursor: usize, + capacity: usize, +} + +impl PerClient { + fn with_capacity(capacity: usize) -> Self { + // Capacity > 0 is validated at the public entry + // (`Dedup::with_config`); every callsite here goes through that + // path, so no second assert is needed. + debug_assert!(capacity > 0, "dedup ring capacity must be > 0"); + Self { + slots: Vec::with_capacity(capacity), + cursor: 0, + capacity, + } + } + + fn find(&self, request: u64) -> Option<&Slot> { + self.slots.iter().find(|s| s.request == request) + } + + fn find_mut(&mut self, request: u64) -> Option<&mut Slot> { + self.slots.iter_mut().find(|s| s.request == request) + } + + fn push(&mut self, slot: Slot) { + if self.slots.len() < self.capacity { + self.slots.push(slot); + } else { + self.slots[self.cursor] = slot; + self.cursor = (self.cursor + 1) % self.capacity; + } + } +} + +/// At-most-once request dedup window. +/// +/// Single-shard, single-threaded. Callers hold a `&mut Dedup` and drive +/// the lookup / mark / complete flow synchronously on the hot path. +#[derive(Debug)] +pub struct Dedup { + per_client: AHashMap<u128, PerClient>, + per_client_capacity: usize, + ttl_ns: u128, +} + +impl Dedup { + /// Build a window with defaults: 64-entry per-client ring, 30 s TTL. + #[must_use] + pub fn new() -> Self { + Self::with_config(DEFAULT_PER_CLIENT_CAPACITY, DEFAULT_ENTRY_TTL_NS) + } + + /// # Panics + /// Panics if `per_client_capacity == 0`. Every producing path in + /// the crate uses [`DEFAULT_PER_CLIENT_CAPACITY`]; the assertion + /// guards misconfiguration in tests or future callers. + #[must_use] + pub fn with_config(per_client_capacity: usize, ttl_ns: u128) -> Self { + assert!(per_client_capacity > 0, "capacity must be > 0"); + Self { + per_client: AHashMap::new(), + per_client_capacity, + ttl_ns, + } + } + + /// Inspect the state for `(client, request)` at `now_ns`. + /// + /// Done entries older than the TTL are treated as absent; the window + /// still holds the slot but does not report it. On eviction by a + /// later `push`, the slot is overwritten silently. + #[must_use] + pub fn lookup(&self, client: u128, request: u64, now_ns: u128) -> LookupResult<'_> { + let Some(state) = self.per_client.get(&client) else { + return LookupResult::Fresh; + }; + let Some(slot) = state.find(request) else { + return LookupResult::Fresh; + }; + match &slot.entry { + Entry::InFlight => LookupResult::InFlight, + Entry::Done { + reply, + inserted_at_ns, + } => { + if now_ns.saturating_sub(*inserted_at_ns) > self.ttl_ns { + LookupResult::Fresh + } else { + LookupResult::Cached(reply) + } + } + } + } + + /// Mark the handler as starting. Returns `true` if the slot was + /// inserted; `false` if an entry for `(client, request)` already + /// existed (caller must consult [`Self::lookup`] for the specific + /// state). + pub fn mark_in_flight(&mut self, client: u128, request: u64) -> bool { + let state = self + .per_client + .entry(client) + .or_insert_with(|| PerClient::with_capacity(self.per_client_capacity)); + if state.find(request).is_some() { Review Comment: i removed all dedup related stuff in this PR. it'll be done later, so comment is obsolete. ########## core/message_bus/src/transports/ws.rs: ########## @@ -0,0 +1,427 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Plaintext WS transport: `compio_ws::WebSocketStream<TcpStream>` +//! driven by a single connection task per peer. +//! +//! # Architecture +//! +//! Replaces the prior post-handshake unwrap-to-bare-TcpStream + manual +//! RFC 6455 frame parser with the compio-native stack: +//! +//! 1. The HTTP-Upgrade handshake runs externally +//! (`installer::install_client_ws_fd` calls +//! `compio_ws::accept_async`); the resulting +//! `WebSocketStream<TcpStream>` is handed to [`WsTransportConn::new_server`]. +//! 2. A single connection task owns the `WebSocketStream` and +//! `select!`s over the bus shutdown token, the outbound mailbox, +//! and `ws.read()`. +//! 3. On shutdown the task sends a WS Close frame, flushes, and drops. +//! +//! # Cancel safety +//! +//! `compio_ws::WebSocketStream::read` (lib.rs:208-229) loops calling +//! `inner.read()` against a buffered `SyncStream`. The await points are +//! at `fill_read_buf` on the underlying stream; bytes already in the +//! buffer survive a future drop. Tungstenite's frame parser state lives +//! inside the WebSocket struct, also preserved across drops. +//! +//! Sends are run to completion outside any `select!`. The token signals +//! the *start* of close, never cancels an in-flight send. +//! +//! # Pings +//! +//! Inbound `Message::Ping` is queued by tungstenite as an auto-Pong +//! reply; the `WebSocketStream::read` flush before delivery drains the +//! queue, so the application code only needs to ignore Pings. No +//! explicit Pong send required. + +use super::{ActorContext, TransportConn}; +use crate::framing; +use crate::lifecycle::BusMessage; +use compio::net::TcpStream; +use compio::ws::WebSocketStream; +use compio::ws::tungstenite::{self, Message as WsMessage}; +use futures::FutureExt; +use iggy_binary_protocol::consensus::MESSAGE_ALIGN; +use iggy_binary_protocol::{GenericHeader, Message}; +use std::time::Duration; +use tracing::{debug, warn}; + +/// Default wall-clock bound on the WS Close + drop sequence. +const DEFAULT_CLOSE_GRACE: Duration = Duration::from_secs(2); + +/// Errors decoded from a WS `Message::Binary` payload. +#[derive(Debug)] +pub(in crate::transports) enum FrameDecodeError { + BadHeader, + BadSize, +} + +/// Decode one consensus `Message<GenericHeader>` from a raw WS Binary +/// payload. Reused by [`super::wss`] over the TLS-protected channel. +pub(in crate::transports) fn decode_consensus_frame( + body: &[u8], +) -> Result<Message<GenericHeader>, FrameDecodeError> { + if body.len() < iggy_binary_protocol::HEADER_SIZE { + return Err(FrameDecodeError::BadHeader); + } + let total_size = u32::from_le_bytes( + body[48..52] + .try_into() + .map_err(|_| FrameDecodeError::BadHeader)?, + ) as usize; + if !(iggy_binary_protocol::HEADER_SIZE..=framing::MAX_MESSAGE_SIZE).contains(&total_size) + || total_size != body.len() + { + return Err(FrameDecodeError::BadSize); + } + // Aligned MESSAGE_ALIGN backing memory is required by the + // `Message<GenericHeader>` invariant; the WS payload is a plain byte + // slice with no alignment guarantee, so this is a one-time copy. + let owned = + iggy_binary_protocol::consensus::iobuf::Owned::<MESSAGE_ALIGN>::copy_from_slice(body); + Message::<GenericHeader>::try_from(owned).map_err(|_| FrameDecodeError::BadHeader) +} + +/// A single WebSocket connection. +/// +/// Holds an already-upgraded `WebSocketStream<TcpStream>`. Construct +/// with [`Self::new_server`] for accepted server-role connections or +/// [`Self::new_client`] for outbound dialer paths / tests. +pub struct WsTransportConn { + stream: WebSocketStream<TcpStream>, + close_grace: Duration, +} + +impl WsTransportConn { + #[must_use] + pub const fn new_server(stream: WebSocketStream<TcpStream>) -> Self { + Self { + stream, + close_grace: DEFAULT_CLOSE_GRACE, + } + } + + #[must_use] + pub const fn new_client(stream: WebSocketStream<TcpStream>) -> Self { + Self { + stream, + close_grace: DEFAULT_CLOSE_GRACE, + } + } + + /// Override the wall-clock bound on the close sequence. + #[must_use] + pub const fn with_close_grace(mut self, close_grace: Duration) -> Self { + self.close_grace = close_grace; + self + } +} + +impl TransportConn for WsTransportConn { + #[allow(clippy::future_not_send)] + async fn run(self, ctx: ActorContext) { + let label = ctx.label; + let peer = ctx.peer.clone(); + let mut ws = self.stream; + run_pump(&mut ws, ctx).await; + drive_close(&mut ws, self.close_grace, label, &peer).await; + } +} + +/// Per-iteration outcome of the single-task select. +enum PumpAction { + Shutdown, + Send(BusMessage), + Recv(Result<WsMessage, tungstenite::Error>), + MailboxClosed, +} + +/// Drive the WS connection until shutdown, peer Close, or an +/// unrecoverable error. +#[allow(clippy::future_not_send)] +async fn run_pump(ws: &mut WebSocketStream<TcpStream>, ctx: ActorContext) { + let ActorContext { + in_tx, + rx, + shutdown, + label, + peer, + .. + } = ctx; + let mut shutdown_fut = Box::pin(shutdown.wait().fuse()); + + loop { + let action = { + let read_fut = ws.read(); + let recv_fut = rx.recv(); + futures::pin_mut!(read_fut); + futures::pin_mut!(recv_fut); + + futures::select_biased! { + () = shutdown_fut.as_mut() => PumpAction::Shutdown, + msg = recv_fut.fuse() => msg.map_or(PumpAction::MailboxClosed, PumpAction::Send), + res = read_fut.fuse() => PumpAction::Recv(res), + } + }; + + match action { + PumpAction::Shutdown => { + debug!(%label, %peer, "ws pump: shutdown observed"); + return; + } + PumpAction::MailboxClosed => { + debug!(%label, %peer, "ws pump: mailbox closed"); + return; + } + PumpAction::Send(msg) => { + let payload: Vec<u8> = msg.as_slice().to_vec(); + if let Err(e) = ws.send(WsMessage::Binary(payload.into())).await { + warn!(%label, %peer, error = ?e, "ws writer: send failed"); + return; + } + if let Err(e) = ws.flush().await { + warn!(%label, %peer, error = ?e, "ws writer: flush failed"); + return; + } + } + PumpAction::Recv(Ok(msg)) => match msg { + WsMessage::Binary(bytes) => match decode_consensus_frame(&bytes) { + Ok(frame) => { + if in_tx.send(frame).await.is_err() { + debug!(%label, %peer, "ws reader: inbound queue dropped"); + return; + } + } + Err(e) => { + warn!(%label, %peer, error = ?e, "ws reader: bad consensus frame"); + return; + } + }, + WsMessage::Ping(_) | WsMessage::Pong(_) => { + // Tungstenite queues an auto-Pong for inbound Pings; + // `compio_ws::WebSocketStream::read` flushes before + // delivery so no explicit reply needed. + } + WsMessage::Close(_) => { + debug!(%label, %peer, "ws reader: peer initiated close"); + return; + } + WsMessage::Text(_) | WsMessage::Frame(_) => { + warn!(%label, %peer, "ws reader: unexpected text/raw frame, closing"); + return; + } + }, + PumpAction::Recv(Err(e)) => { + debug!(%label, %peer, error = ?e, "ws reader: read error"); + return; + } + } + } +} + +/// Best-effort cooperative close: send WS Close frame, flush, then drop +/// the stream. Bounded by `close_grace`; on timeout the OS sends RST. +#[allow(clippy::future_not_send)] +async fn drive_close( + ws: &mut WebSocketStream<TcpStream>, + close_grace: Duration, + label: &'static str, + peer: &str, +) { + if compio::time::timeout(close_grace, async { + let _ = ws.close(None).await; + let _ = ws.flush().await; + }) + .await + .is_err() + { + warn!( + %label, + %peer, + grace_ms = close_grace.as_millis(), + "ws close: grace exceeded" + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::framing; + use crate::lifecycle::Shutdown; + use async_channel::{Receiver, Sender, bounded}; + use compio::net::TcpListener; + use iggy_binary_protocol::consensus::iobuf::Frozen; + use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE, Message}; + use std::time::Duration; + + #[allow(clippy::cast_possible_truncation)] + fn header_only(command: Command2) -> Frozen<MESSAGE_ALIGN> { + Message::<GenericHeader>::new(HEADER_SIZE) + .transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.size = HEADER_SIZE as u32; + }) + .into_frozen() + } + + #[allow(clippy::cast_possible_truncation)] + fn padded(command: Command2, total_size: usize) -> Frozen<MESSAGE_ALIGN> { + Message::<GenericHeader>::new(total_size) + .transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.size = total_size as u32; + }) + .into_frozen() + } + + #[allow(clippy::future_not_send)] + async fn ws_pair() -> (WebSocketStream<TcpStream>, WebSocketStream<TcpStream>) { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + let connect = async { + let stream = TcpStream::connect(addr).await.expect("connect"); + let (ws, _resp) = compio_ws::client_async("ws://127.0.0.1/", stream) + .await + .expect("client_async"); + ws + }; + let accept = async { + let (stream, _peer) = listener.accept().await.expect("accept"); + compio_ws::accept_async(stream).await.expect("accept_async") + }; + let (client, server) = futures::join!(connect, accept); + (client, server) + } + + #[allow(clippy::future_not_send)] + fn drive( + conn: WsTransportConn, + ) -> ( + Sender<Frozen<MESSAGE_ALIGN>>, + Receiver<Message<GenericHeader>>, + Shutdown, + compio::runtime::JoinHandle<()>, + ) { + let (out_tx, out_rx) = bounded::<Frozen<MESSAGE_ALIGN>>(16); + let (in_tx, in_rx) = bounded::<Message<GenericHeader>>(16); + let (shutdown, token) = Shutdown::new(); + let ctx = ActorContext { + in_tx, + rx: out_rx, + shutdown: token, + max_batch: 16, + max_message_size: framing::MAX_MESSAGE_SIZE, + label: "test", + peer: "test".to_owned(), + }; + let handle = compio::runtime::spawn(async move { conn.run(ctx).await }); + (out_tx, in_rx, shutdown, handle) + } + + #[compio::test] + #[allow(clippy::future_not_send)] + async fn ws_loopback_round_trip() { + let (client_ws, server_ws) = ws_pair().await; + let server_conn = WsTransportConn::new_server(server_ws); + let client_conn = WsTransportConn::new_client(client_ws); + + let (server_out, server_in, server_shutdown, server_handle) = drive(server_conn); + let (client_out, client_in, client_shutdown, client_handle) = drive(client_conn); + + client_out Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
