hubcio commented on code in PR #3023:
URL: https://github.com/apache/iggy/pull/3023#discussion_r3043819591
##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -389,15 +406,40 @@ impl<C> IggyPartitions<C> {
}
}
-impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>>
+impl<B, J> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>, J>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ J: JournalHandle,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
{
async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
let namespace = IggyNamespace::from_raw(message.header().namespace);
let consensus = self
.consensus()
.expect("on_request: consensus not initialized");
+ let client_id = message.header().client;
+ let request = message.header().request;
+
+ // TODO: Add a bounded request queue instead of dropping here.
+ // When the prepare queue (8 max) is full, buffer
+ // incoming requests in a request queue. On commit, pop the next
request
+ // from the request queue and begin preparing it. Only drop when both
+ // queues are full.
+ if consensus.pipeline().borrow().is_full() {
+ warn!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ replica_id = consensus.replica(),
+ client = client_id,
+ request = request,
+ "on_request: pipeline full, dropping request"
+ );
+ return;
+ }
+
+ let Some(_notify) = request_preflight(consensus, client_id,
request).await else {
Review Comment:
`request_preflight` calls `register_pending` here, inserting into
`client_table.pending`. if `convert_request_message` fails at line 454, the
function returns early without removing the pending entry. after that, every
retry from the client for this `(client_id, request)` pair hits `check_request`
-> `RequestStatus::InProgress` -> silently dropped. the client is stuck until a
view change calls `clear_pending()`.
fix: either move `register_pending` after `convert_request_message`
succeeds, or remove the pending entry on the error path. the metadata plane
doesn't have this bug - there's no fallible step between `register_pending` and
`pipeline_prepare_common`.
##########
core/consensus/src/impls.rs:
##########
@@ -403,7 +408,7 @@ pub enum Status {
}
/// Actions to be taken by the caller after processing a VSR event.
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug)]
Review Comment:
`VsrAction` previously derived `Clone, PartialEq, Eq` on master. the new
`RetransmitPrepares` variant uses `Vec<(PrepareHeader, Vec<u8>)>` which is
`Clone`-able, so `#[derive(Debug, Clone)]` should work here. `PartialEq`/`Eq`
can't be restored without also adding those derives to `PrepareHeader`.
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -357,7 +380,10 @@ where
// Force a checkpoint if the journal is running low on capacity.
if let Some(coordinator) = &self.coordinator {
Review Comment:
`checkpoint_if_needed` here can call `journal.drain()` which clears slot
headers up to `commit_min`. then the hash chain integrity check at line 416
calls `journal.handle().previous_header(&header)` which returns `None` for the
just-drained entry, silently skipping verification.
fix: move the hash chain verification (lines 416-418) before this checkpoint
block so the previous header is still available when checked.
##########
core/consensus/src/client_table.rs:
##########
@@ -0,0 +1,576 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+ pub client_id: u128,
+ pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+ waker: RefCell<Option<Waker>>,
+ notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+ inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+ /// Create a new `Notify` in the un-notified state.
+ #[must_use]
+ pub fn new() -> Self {
+ Self {
+ inner: Rc::new(NotifyInner {
+ waker: RefCell::new(None),
+ notified: std::cell::Cell::new(false),
+ }),
+ }
+ }
+
+ /// Wake the waiter, if any. If `notified()` is polled later, it will
+ /// resolve immediately.
+ pub fn notify(&self) {
+ self.inner.notified.set(true);
+ if let Some(waker) = self.inner.waker.borrow_mut().take() {
+ waker.wake();
+ }
+ }
+
+ /// Returns a future that resolves when [`notify()`](Self::notify) is
called.
+ ///
+ /// If `notify()` was already called before this future is polled, it
+ /// resolves immediately (permit is consumed).
+ #[allow(clippy::future_not_send)]
+ pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+ std::future::poll_fn(move |cx| {
+ if self.inner.notified.get() {
+ self.inner.notified.set(false);
+ std::task::Poll::Ready(())
+ } else {
+ *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+ })
+ }
+}
+
+impl Default for Notify {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+ /// The cached reply for the client's latest committed request (header +
body).
+ pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the client table.
Review Comment:
`RequestStatus` is missing `#[derive(Debug)]`. `Message<ReplyHeader>` in the
`Duplicate` variant already implements `Debug`, so the derive is trivially
possible and would make test failures more informative.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]