krishvishal commented on code in PR #3023:
URL: https://github.com/apache/iggy/pull/3023#discussion_r3043918776
##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -389,15 +406,40 @@ impl<C> IggyPartitions<C> {
}
}
-impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>>
+impl<B, J> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>, J>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ J: JournalHandle,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
{
async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
let namespace = IggyNamespace::from_raw(message.header().namespace);
let consensus = self
.consensus()
.expect("on_request: consensus not initialized");
+ let client_id = message.header().client;
+ let request = message.header().request;
+
+ // TODO: Add a bounded request queue instead of dropping here.
+ // When the prepare queue (8 max) is full, buffer
+ // incoming requests in a request queue. On commit, pop the next
request
+ // from the request queue and begin preparing it. Only drop when both
+ // queues are full.
+ if consensus.pipeline().borrow().is_full() {
+ warn!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ replica_id = consensus.replica(),
+ client = client_id,
+ request = request,
+ "on_request: pipeline full, dropping request"
+ );
+ return;
+ }
+
+ let Some(_notify) = request_preflight(consensus, client_id,
request).await else {
Review Comment:
Done.
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -357,7 +380,10 @@ where
// Force a checkpoint if the journal is running low on capacity.
if let Some(coordinator) = &self.coordinator {
Review Comment:
Done.
##########
core/consensus/src/client_table.rs:
##########
@@ -0,0 +1,576 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_binary_protocol::{Message, ReplyHeader};
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::rc::Rc;
+use std::task::Waker;
+
+/// Identifies a specific request from a specific client.
+/// Used as the key for the pending-commit waiter map.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ClientRequest {
+ pub client_id: u128,
+ pub request: u64,
+}
+
+/// Inner state shared between `Notify` clones via `Rc`.
+#[derive(Debug)]
+struct NotifyInner {
+ waker: RefCell<Option<Waker>>,
+ notified: std::cell::Cell<bool>,
+}
+
+/// Lightweight, single-threaded async notification primitive.
+///
+/// ## Usage
+///
+/// ```ignore
+/// let notify = Notify::new();
+/// let waiter = notify.clone();
+///
+/// // Producer side (in commit_reply):
+/// notify.notify();
+///
+/// // Consumer side (caller awaiting the commit):
+/// waiter.notified().await;
+/// ```
+#[derive(Debug, Clone)]
+pub struct Notify {
+ inner: Rc<NotifyInner>,
+}
+
+impl Notify {
+ /// Create a new `Notify` in the un-notified state.
+ #[must_use]
+ pub fn new() -> Self {
+ Self {
+ inner: Rc::new(NotifyInner {
+ waker: RefCell::new(None),
+ notified: std::cell::Cell::new(false),
+ }),
+ }
+ }
+
+ /// Wake the waiter, if any. If `notified()` is polled later, it will
+ /// resolve immediately.
+ pub fn notify(&self) {
+ self.inner.notified.set(true);
+ if let Some(waker) = self.inner.waker.borrow_mut().take() {
+ waker.wake();
+ }
+ }
+
+ /// Returns a future that resolves when [`notify()`](Self::notify) is
called.
+ ///
+ /// If `notify()` was already called before this future is polled, it
+ /// resolves immediately (permit is consumed).
+ #[allow(clippy::future_not_send)]
+ pub fn notified(&self) -> impl Future<Output = ()> + '_ {
+ std::future::poll_fn(move |cx| {
+ if self.inner.notified.get() {
+ self.inner.notified.set(false);
+ std::task::Poll::Ready(())
+ } else {
+ *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+ })
+ }
+}
+
+impl Default for Notify {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+///
+/// Stores the reply for the client's latest committed request. The client ID,
+/// request number, and commit number are all read from `reply.header()`.
+#[derive(Debug)]
+pub struct ClientEntry {
+ /// The cached reply for the client's latest committed request (header +
body).
+ pub reply: Message<ReplyHeader>,
+}
+
+/// Result of checking a request against the client table.
Review Comment:
Done.
##########
core/consensus/src/impls.rs:
##########
@@ -403,7 +408,7 @@ pub enum Status {
}
/// Actions to be taken by the caller after processing a VSR event.
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug)]
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]