This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch store_consumer_offset
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/store_consumer_offset by this
push:
new 3608257f8 reuse code
3608257f8 is described below
commit 3608257f8985f30ad670a54211930f2e866c61c5
Author: numinex <[email protected]>
AuthorDate: Thu Apr 2 15:02:29 2026 +0200
reuse code
---
core/partitions/src/iggy_partition.rs | 152 +++++++++++++++++----------------
core/partitions/src/iggy_partitions.rs | 65 +++++++-------
2 files changed, 111 insertions(+), 106 deletions(-)
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 3ca31d0a0..db71b37a2 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -59,66 +59,51 @@ pub struct IggyPartition {
pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>,
}
-#[allow(clippy::redundant_pub_crate)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) enum PendingConsumerOffsetOwner {
- Consumer(u32),
- ConsumerGroup(u32),
+#[derive(Debug, Clone, Copy, PartialEq)]
+struct PendingConsumerOffsetCommit {
+ kind: ConsumerKind,
+ consumer_id: u32,
+ mutation: PendingConsumerOffsetMutation,
}
-#[allow(clippy::redundant_pub_crate)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) struct PendingConsumerOffsetCommit {
- pub(crate) mutation: PendingConsumerOffsetMutation,
-}
-
-#[allow(clippy::redundant_pub_crate)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) enum PendingConsumerOffsetMutation {
- Upsert {
- owner: PendingConsumerOffsetOwner,
- offset: u64,
- },
- Delete {
- owner: PendingConsumerOffsetOwner,
- },
+#[derive(Debug, Clone, Copy, PartialEq)]
+enum PendingConsumerOffsetMutation {
+ Upsert(u64),
+ Delete,
}
impl PendingConsumerOffsetCommit {
- pub(crate) const fn upsert(owner: PendingConsumerOffsetOwner, offset: u64)
-> Self {
+ const fn upsert(kind: ConsumerKind, consumer_id: u32, offset: u64) -> Self
{
Self {
- mutation: PendingConsumerOffsetMutation::Upsert { owner, offset },
+ kind,
+ consumer_id,
+ mutation: PendingConsumerOffsetMutation::Upsert(offset),
}
}
- pub(crate) const fn delete(owner: PendingConsumerOffsetOwner) -> Self {
+ const fn delete(kind: ConsumerKind, consumer_id: u32) -> Self {
Self {
- mutation: PendingConsumerOffsetMutation::Delete { owner },
+ kind,
+ consumer_id,
+ mutation: PendingConsumerOffsetMutation::Delete,
}
}
- pub(crate) fn try_from_polling_consumer(
+ fn try_from_polling_consumer(
consumer: PollingConsumer,
offset: u64,
) -> Result<Self, IggyError> {
- let owner = match consumer {
- PollingConsumer::Consumer(id, _) =>
PendingConsumerOffsetOwner::Consumer(
+ let (kind, consumer_id) = match consumer {
+ PollingConsumer::Consumer(id, _) => (
+ ConsumerKind::Consumer,
u32::try_from(id).map_err(|_| IggyError::InvalidCommand)?,
),
- PollingConsumer::ConsumerGroup(group_id, _) => {
- PendingConsumerOffsetOwner::ConsumerGroup(
- u32::try_from(group_id).map_err(|_|
IggyError::InvalidCommand)?,
- )
- }
+ PollingConsumer::ConsumerGroup(group_id, _) => (
+ ConsumerKind::ConsumerGroup,
+ u32::try_from(group_id).map_err(|_|
IggyError::InvalidCommand)?,
+ ),
};
- Ok(Self::upsert(owner, offset))
- }
-
- pub(crate) const fn owner(self) -> PendingConsumerOffsetOwner {
- match self.mutation {
- PendingConsumerOffsetMutation::Upsert { owner, .. }
- | PendingConsumerOffsetMutation::Delete { owner } => owner,
- }
+ Ok(Self::upsert(kind, consumer_id, offset))
}
}
@@ -154,43 +139,59 @@ impl IggyPartition {
self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
}
- pub(crate) fn stage_consumer_offset_commit(
+ pub(crate) async fn persist_and_stage_consumer_offset_upsert(
&mut self,
op: u64,
- pending: PendingConsumerOffsetCommit,
- ) {
+ kind: ConsumerKind,
+ consumer_id: u32,
+ offset: u64,
+ ) -> Result<(), IggyError> {
+ let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id,
offset);
+ self.persist_consumer_offset_commit(pending).await?;
self.pending_consumer_offset_commits.insert(op, pending);
+ Ok(())
}
- #[must_use]
- pub(crate) fn take_staged_consumer_offset_commit(
+ pub(crate) async fn persist_and_stage_consumer_offset_delete(
&mut self,
op: u64,
- ) -> Option<PendingConsumerOffsetCommit> {
- self.pending_consumer_offset_commits.remove(&op)
+ kind: ConsumerKind,
+ consumer_id: u32,
+ ) -> Result<(), IggyError> {
+ let pending = PendingConsumerOffsetCommit::delete(kind, consumer_id);
+ self.persist_consumer_offset_commit(pending).await?;
+ self.pending_consumer_offset_commits.insert(op, pending);
+ Ok(())
}
- pub(crate) async fn persist_consumer_offset_commit(
+ pub(crate) fn apply_staged_consumer_offset_commit(&mut self, op: u64) ->
Result<(), IggyError> {
+ let pending = self
+ .pending_consumer_offset_commits
+ .remove(&op)
+ .ok_or(IggyError::InvalidCommand)?;
+ self.apply_consumer_offset_commit(pending);
+ Ok(())
+ }
+
+ async fn persist_consumer_offset_commit(
&self,
pending: PendingConsumerOffsetCommit,
) -> Result<(), IggyError> {
- let Some(path) = self.persisted_offset_path(pending.owner()) else {
+ let Some(path) = self.persisted_offset_path(pending.kind,
pending.consumer_id) else {
return Ok(());
};
match pending.mutation {
- PendingConsumerOffsetMutation::Upsert { offset, .. } => {
- persist_offset(&path, offset).await
- }
- PendingConsumerOffsetMutation::Delete { .. } =>
delete_persisted_offset(&path).await,
+ PendingConsumerOffsetMutation::Upsert(offset) =>
persist_offset(&path, offset).await,
+ PendingConsumerOffsetMutation::Delete =>
delete_persisted_offset(&path).await,
}
}
- pub(crate) fn apply_consumer_offset_commit(&self, pending:
PendingConsumerOffsetCommit) {
+ fn apply_consumer_offset_commit(&self, pending:
PendingConsumerOffsetCommit) {
match pending.mutation {
- PendingConsumerOffsetMutation::Upsert {
- owner: PendingConsumerOffsetOwner::Consumer(id),
- offset,
- } => {
+ PendingConsumerOffsetMutation::Upsert(offset)
+ if pending.kind == ConsumerKind::Consumer =>
+ {
+ let id = pending.consumer_id;
let guard = self.consumer_offsets.pin();
let key = usize::try_from(id).expect("u32 consumer id must fit
usize");
if let Some(existing) = guard.get(&key) {
@@ -204,10 +205,10 @@ impl IggyPartition {
guard.insert(key, created);
}
}
- PendingConsumerOffsetMutation::Upsert {
- owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id),
- offset,
- } => {
+ PendingConsumerOffsetMutation::Upsert(offset)
+ if pending.kind == ConsumerKind::ConsumerGroup =>
+ {
+ let group_id = pending.consumer_id;
let guard = self.consumer_group_offsets.pin();
let key = ConsumerGroupId(
usize::try_from(group_id).expect("u32 group id must fit
usize"),
@@ -230,22 +231,23 @@ impl IggyPartition {
guard.insert(key, created);
}
}
- PendingConsumerOffsetMutation::Delete {
- owner: PendingConsumerOffsetOwner::Consumer(id),
- } => {
+ PendingConsumerOffsetMutation::Delete if pending.kind ==
ConsumerKind::Consumer => {
+ let id = pending.consumer_id;
let guard = self.consumer_offsets.pin();
let key = usize::try_from(id).expect("u32 consumer id must fit
usize");
let _ = guard.remove(&key);
}
- PendingConsumerOffsetMutation::Delete {
- owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id),
- } => {
+ PendingConsumerOffsetMutation::Delete
+ if pending.kind == ConsumerKind::ConsumerGroup =>
+ {
+ let group_id = pending.consumer_id;
let guard = self.consumer_group_offsets.pin();
let key = ConsumerGroupId(
usize::try_from(group_id).expect("u32 group id must fit
usize"),
);
let _ = guard.remove(&key);
}
+ _ => {}
}
}
@@ -260,16 +262,16 @@ impl IggyPartition {
Ok(())
}
- fn persisted_offset_path(&self, owner: PendingConsumerOffsetOwner) ->
Option<String> {
- match owner {
- PendingConsumerOffsetOwner::Consumer(id) => self
+ fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) ->
Option<String> {
+ match kind {
+ ConsumerKind::Consumer => self
.consumer_offsets_path
.as_ref()
- .map(|path| format!("{path}/{id}")),
- PendingConsumerOffsetOwner::ConsumerGroup(group_id) => self
+ .map(|path| format!("{path}/{consumer_id}")),
+ ConsumerKind::ConsumerGroup => self
.consumer_group_offsets_path
.as_ref()
- .map(|path| format!("{path}/{group_id}")),
+ .map(|path| format!("{path}/{consumer_id}")),
}
}
}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 33a8c4226..eeaf83e9a 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -20,7 +20,6 @@
use crate::IggyPartition;
use crate::Partition;
use crate::iggy_index_writer::IggyIndexWriter;
-use crate::iggy_partition::{PendingConsumerOffsetCommit,
PendingConsumerOffsetOwner};
use crate::log::JournalInfo;
use crate::messages_writer::MessagesWriter;
use crate::offset_storage::{
@@ -41,8 +40,8 @@ use iggy_binary_protocol::{
RequestHeader,
};
use iggy_common::{
- ConsumerGroupOffsets, ConsumerOffset, ConsumerOffsets, IggyByteSize,
IggyError, PartitionStats,
- SegmentStorage,
+ ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets,
IggyByteSize, IggyError,
+ PartitionStats, SegmentStorage,
send_messages2::{convert_request_message, decode_prepare_slice},
sharding::{IggyNamespace, LocalIdx, ShardId},
};
@@ -728,7 +727,7 @@ where
Ok(())
}
Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset
=> {
- let pending =
+ let (kind, consumer_id, offset) =
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
let write_lock = self
.get_by_ns(namespace)
@@ -736,14 +735,27 @@ where
.write_lock
.clone();
let _guard = write_lock.lock().await;
- let partition = self
- .get_by_ns(namespace)
- .expect("store_consumer_offset: partition not found for
namespace");
- partition.persist_consumer_offset_commit(pending).await?;
let partition = self
.get_mut_by_ns(namespace)
.expect("store_consumer_offset: partition not found for
namespace");
- partition.stage_consumer_offset_commit(header.op, pending);
+ match header.operation {
+ Operation::StoreConsumerOffset => {
+ partition
+ .persist_and_stage_consumer_offset_upsert(
+ header.op,
+ kind,
+ consumer_id,
+ offset.expect("store_consumer_offset must
include offset"),
+ )
+ .await?;
+ }
+ Operation::DeleteConsumerOffset => {
+ partition
+
.persist_and_stage_consumer_offset_delete(header.op, kind, consumer_id)
+ .await?;
+ }
+ _ => unreachable!(),
+ }
debug!(
target: "iggy.partitions.diag",
@@ -752,8 +764,9 @@ where
op = header.op,
namespace_raw = namespace.inner(),
operation = ?header.operation,
- consumer = ?pending.owner(),
- pending = ?pending.mutation,
+ consumer_kind = ?kind,
+ consumer_id,
+ offset = ?offset,
"replicated consumer offset persisted and staged"
);
Ok(())
@@ -1163,7 +1176,7 @@ where
fn parse_staged_consumer_offset_commit(
operation: Operation,
message: &Message<PrepareHeader>,
- ) -> Result<PendingConsumerOffsetCommit, IggyError> {
+ ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
let total_size = message.header().size() as usize;
let body =
&message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size];
let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?;
@@ -1175,11 +1188,7 @@ where
.map(u32::from_le_bytes)
.map_err(|_| IggyError::InvalidCommand)
})?;
- let owner = match consumer_kind {
- 1 => PendingConsumerOffsetOwner::Consumer(consumer_id),
- 2 => PendingConsumerOffsetOwner::ConsumerGroup(consumer_id),
- _ => return Err(IggyError::InvalidCommand),
- };
+ let kind = ConsumerKind::from_code(consumer_kind)?;
match operation {
Operation::StoreConsumerOffset => {
let offset =
@@ -1190,9 +1199,9 @@ where
.map(u64::from_le_bytes)
.map_err(|_| IggyError::InvalidCommand)
})?;
- Ok(PendingConsumerOffsetCommit::upsert(owner, offset))
+ Ok((kind, consumer_id, Some(offset)))
}
- Operation::DeleteConsumerOffset =>
Ok(PendingConsumerOffsetCommit::delete(owner)),
+ Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
_ => Err(IggyError::InvalidCommand),
}
}
@@ -1211,13 +1220,13 @@ where
.clone();
let _guard = write_lock.lock().await;
- let pending = {
+ let apply_result = {
let partition = self
.get_mut_by_ns(&entry_namespace)
.expect("commit_partition_entry: partition not found");
- partition.take_staged_consumer_offset_commit(prepare_header.op)
+ partition.apply_staged_consumer_offset_commit(prepare_header.op)
};
- let Some(pending) = pending else {
+ if let Err(error) = apply_result {
failed_ns.insert(entry_namespace);
warn!(
target: "iggy.partitions.diag",
@@ -1225,23 +1234,17 @@ where
replica_id = consensus.replica(),
op = prepare_header.op,
namespace_raw = entry_namespace.inner(),
- "missing staged consumer offset commit for committed prepare"
+ %error,
+ "failed to apply staged consumer offset commit"
);
return false;
- };
-
- let partition = self
- .get_by_ns(&entry_namespace)
- .expect("commit_partition_entry: partition not found");
- partition.apply_consumer_offset_commit(pending);
+ }
debug!(
target: "iggy.partitions.diag",
plane = "partitions",
replica_id = consensus.replica(),
op = prepare_header.op,
namespace_raw = entry_namespace.inner(),
- consumer = ?pending.owner(),
- mutation = ?pending.mutation,
"consumer offset committed"
);
true