This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partition_remaster
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/partition_remaster by this 
push:
     new 64e51ff6f clippy fixes
64e51ff6f is described below

commit 64e51ff6f3ae96f65425ea5685325a5edcbb7984
Author: numinex <[email protected]>
AuthorDate: Mon Mar 30 15:33:05 2026 +0200

    clippy fixes
---
 core/partitions/src/iggy_partitions.rs | 83 ++++++++++++++++++++++++++++++++++
 core/partitions/src/journal.rs         |  2 +-
 core/simulator/src/client.rs           |  4 ++
 3 files changed, 88 insertions(+), 1 deletion(-)

diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 2fdc461c2..5d9d3d601 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -43,6 +43,7 @@ use iggy_common::{
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
 use iobuf::Frozen;
+use journal::Journal as _;
 use message_bus::MessageBus;
 use std::cell::UnsafeCell;
 use std::collections::{HashMap, HashSet};
@@ -899,6 +900,9 @@ where
 
         let mut committed_ns: HashSet<IggyNamespace> = HashSet::new();
         let mut failed_ns: HashSet<IggyNamespace> = HashSet::new();
+        let committed_visible_offsets = self
+            .resolve_committed_visible_offsets(consensus, &drained, &mut 
failed_ns)
+            .await;
 
         for PipelineEntry {
             header: prepare_header,
@@ -912,6 +916,7 @@ where
                     prepare_header,
                     entry_namespace,
                     &mut committed_ns,
+                    &committed_visible_offsets,
                     &mut failed_ns,
                 )
                 .await
@@ -968,12 +973,54 @@ where
         }
     }
 
+    async fn resolve_committed_visible_offsets(
+        &self,
+        consensus: &VsrConsensus<B, NamespacedPipeline>,
+        drained: &[PipelineEntry],
+        failed_ns: &mut HashSet<IggyNamespace>,
+    ) -> HashMap<IggyNamespace, u64> {
+        let mut committed_visible_offsets = HashMap::new();
+
+        for entry in drained {
+            if entry.header.operation != Operation::SendMessages {
+                continue;
+            }
+
+            let entry_namespace = 
IggyNamespace::from_raw(entry.header.namespace);
+            match self
+                .committed_end_offset_for_prepare(&entry_namespace, 
&entry.header)
+                .await
+            {
+                Ok(Some(end_offset)) => {
+                    committed_visible_offsets.insert(entry_namespace, 
end_offset);
+                }
+                Ok(None) => {}
+                Err(error) => {
+                    failed_ns.insert(entry_namespace);
+                    warn!(
+                        target: "iggy.partitions.diag",
+                        plane = "partitions",
+                        replica_id = consensus.replica(),
+                        namespace_raw = entry_namespace.inner(),
+                        op = entry.header.op,
+                        operation = ?entry.header.operation,
+                        %error,
+                        "failed to resolve committed visible offset for 
partition entry"
+                    );
+                }
+            }
+        }
+
+        committed_visible_offsets
+    }
+
     async fn commit_partition_entry(
         &self,
         consensus: &VsrConsensus<B, NamespacedPipeline>,
         prepare_header: PrepareHeader,
         entry_namespace: IggyNamespace,
         committed_ns: &mut HashSet<IggyNamespace>,
+        committed_visible_offsets: &HashMap<IggyNamespace, u64>,
         failed_ns: &mut HashSet<IggyNamespace>,
     ) -> bool {
         match prepare_header.operation {
@@ -993,6 +1040,19 @@ where
                         "failed to commit partition messages"
                     );
                 }
+
+                if committed_ns.contains(&entry_namespace)
+                    && !failed_ns.contains(&entry_namespace)
+                    && let Some(visible_offset) =
+                        
committed_visible_offsets.get(&entry_namespace).copied()
+                {
+                    let partition = self
+                        .get_by_ns(&entry_namespace)
+                        .expect("commit_partition_entry: partition not found");
+                    partition.offset.store(visible_offset, Ordering::Release);
+                    partition.stats.set_current_offset(visible_offset);
+                }
+
                 !failed_ns.contains(&entry_namespace)
             }
             Operation::StoreConsumerOffset => {
@@ -1022,6 +1082,29 @@ where
         }
     }
 
+    async fn committed_end_offset_for_prepare(
+        &self,
+        namespace: &IggyNamespace,
+        prepare_header: &PrepareHeader,
+    ) -> Result<Option<u64>, IggyError> {
+        let partition = self
+            .get_by_ns(namespace)
+            .expect("committed_end_offset_for_prepare: partition not found");
+        let Some(entry) = 
partition.log.journal().inner.entry(prepare_header).await else {
+            return Err(IggyError::InvalidCommand);
+        };
+        let batch =
+            decode_prepare_slice(entry.as_slice()).map_err(|_| 
IggyError::InvalidCommand)?;
+        let message_count = batch.message_count();
+        if message_count == 0 {
+            return Ok(None);
+        }
+
+        Ok(Some(
+            batch.header.base_offset + u64::from(message_count) - 1,
+        ))
+    }
+
     /// Persist frozen batches to disk and update segment bookkeeping.
     async fn persist_frozen_batches_to_disk(
         &self,
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index fd0b0fceb..dc020b492 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -219,7 +219,7 @@ where
     S: Storage<Buffer = JournalBuffer>,
 {
     #[must_use]
-    pub fn with_storage(storage: S) -> Self {
+    pub const fn with_storage(storage: S) -> Self {
         Self {
             op_to_storage_offset: UnsafeCell::new(BTreeMap::new()),
             offset_to_op: UnsafeCell::new(BTreeMap::new()),
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 0aac8dd57..2d64d0f40 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -69,6 +69,10 @@ impl SimClient {
     }
 
     #[allow(clippy::cast_possible_truncation)]
+    /// # Panics
+    ///
+    /// Panics if the simulator cannot encode the provided messages into a 
valid
+    /// `SendMessages2` request.
     pub fn send_messages(
         &self,
         namespace: IggyNamespace,

Reply via email to