This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch partition_remaster
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/partition_remaster by this
push:
new 64e51ff6f clippy fixes
64e51ff6f is described below
commit 64e51ff6f3ae96f65425ea5685325a5edcbb7984
Author: numinex <[email protected]>
AuthorDate: Mon Mar 30 15:33:05 2026 +0200
clippy fixes
---
core/partitions/src/iggy_partitions.rs | 83 ++++++++++++++++++++++++++++++++++
core/partitions/src/journal.rs | 2 +-
core/simulator/src/client.rs | 4 ++
3 files changed, 88 insertions(+), 1 deletion(-)
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 2fdc461c2..5d9d3d601 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -43,6 +43,7 @@ use iggy_common::{
sharding::{IggyNamespace, LocalIdx, ShardId},
};
use iobuf::Frozen;
+use journal::Journal as _;
use message_bus::MessageBus;
use std::cell::UnsafeCell;
use std::collections::{HashMap, HashSet};
@@ -899,6 +900,9 @@ where
let mut committed_ns: HashSet<IggyNamespace> = HashSet::new();
let mut failed_ns: HashSet<IggyNamespace> = HashSet::new();
+ let committed_visible_offsets = self
+ .resolve_committed_visible_offsets(consensus, &drained, &mut
failed_ns)
+ .await;
for PipelineEntry {
header: prepare_header,
@@ -912,6 +916,7 @@ where
prepare_header,
entry_namespace,
&mut committed_ns,
+ &committed_visible_offsets,
&mut failed_ns,
)
.await
@@ -968,12 +973,54 @@ where
}
}
+ async fn resolve_committed_visible_offsets(
+ &self,
+ consensus: &VsrConsensus<B, NamespacedPipeline>,
+ drained: &[PipelineEntry],
+ failed_ns: &mut HashSet<IggyNamespace>,
+ ) -> HashMap<IggyNamespace, u64> {
+ let mut committed_visible_offsets = HashMap::new();
+
+ for entry in drained {
+ if entry.header.operation != Operation::SendMessages {
+ continue;
+ }
+
+ let entry_namespace =
IggyNamespace::from_raw(entry.header.namespace);
+ match self
+ .committed_end_offset_for_prepare(&entry_namespace,
&entry.header)
+ .await
+ {
+ Ok(Some(end_offset)) => {
+ committed_visible_offsets.insert(entry_namespace,
end_offset);
+ }
+ Ok(None) => {}
+ Err(error) => {
+ failed_ns.insert(entry_namespace);
+ warn!(
+ target: "iggy.partitions.diag",
+ plane = "partitions",
+ replica_id = consensus.replica(),
+ namespace_raw = entry_namespace.inner(),
+ op = entry.header.op,
+ operation = ?entry.header.operation,
+ %error,
+ "failed to resolve committed visible offset for
partition entry"
+ );
+ }
+ }
+ }
+
+ committed_visible_offsets
+ }
+
async fn commit_partition_entry(
&self,
consensus: &VsrConsensus<B, NamespacedPipeline>,
prepare_header: PrepareHeader,
entry_namespace: IggyNamespace,
committed_ns: &mut HashSet<IggyNamespace>,
+ committed_visible_offsets: &HashMap<IggyNamespace, u64>,
failed_ns: &mut HashSet<IggyNamespace>,
) -> bool {
match prepare_header.operation {
@@ -993,6 +1040,19 @@ where
"failed to commit partition messages"
);
}
+
+ if committed_ns.contains(&entry_namespace)
+ && !failed_ns.contains(&entry_namespace)
+ && let Some(visible_offset) =
+
committed_visible_offsets.get(&entry_namespace).copied()
+ {
+ let partition = self
+ .get_by_ns(&entry_namespace)
+ .expect("commit_partition_entry: partition not found");
+ partition.offset.store(visible_offset, Ordering::Release);
+ partition.stats.set_current_offset(visible_offset);
+ }
+
!failed_ns.contains(&entry_namespace)
}
Operation::StoreConsumerOffset => {
@@ -1022,6 +1082,29 @@ where
}
}
+ async fn committed_end_offset_for_prepare(
+ &self,
+ namespace: &IggyNamespace,
+ prepare_header: &PrepareHeader,
+ ) -> Result<Option<u64>, IggyError> {
+ let partition = self
+ .get_by_ns(namespace)
+ .expect("committed_end_offset_for_prepare: partition not found");
+ let Some(entry) =
partition.log.journal().inner.entry(prepare_header).await else {
+ return Err(IggyError::InvalidCommand);
+ };
+ let batch =
+ decode_prepare_slice(entry.as_slice()).map_err(|_|
IggyError::InvalidCommand)?;
+ let message_count = batch.message_count();
+ if message_count == 0 {
+ return Ok(None);
+ }
+
+ Ok(Some(
+ batch.header.base_offset + u64::from(message_count) - 1,
+ ))
+ }
+
/// Persist frozen batches to disk and update segment bookkeeping.
async fn persist_frozen_batches_to_disk(
&self,
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index fd0b0fceb..dc020b492 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -219,7 +219,7 @@ where
S: Storage<Buffer = JournalBuffer>,
{
#[must_use]
- pub fn with_storage(storage: S) -> Self {
+ pub const fn with_storage(storage: S) -> Self {
Self {
op_to_storage_offset: UnsafeCell::new(BTreeMap::new()),
offset_to_op: UnsafeCell::new(BTreeMap::new()),
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 0aac8dd57..2d64d0f40 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -69,6 +69,10 @@ impl SimClient {
}
#[allow(clippy::cast_possible_truncation)]
+ /// # Panics
+ ///
+ /// Panics if the simulator cannot encode the provided messages into a
valid
+ /// `SendMessages2` request.
pub fn send_messages(
&self,
namespace: IggyNamespace,