This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch store_consumer_offset
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/store_consumer_offset by this 
push:
     new 3608257f8 reuse code
3608257f8 is described below

commit 3608257f8985f30ad670a54211930f2e866c61c5
Author: numinex <[email protected]>
AuthorDate: Thu Apr 2 15:02:29 2026 +0200

    reuse code
---
 core/partitions/src/iggy_partition.rs  | 152 +++++++++++++++++----------------
 core/partitions/src/iggy_partitions.rs |  65 +++++++-------
 2 files changed, 111 insertions(+), 106 deletions(-)

diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 3ca31d0a0..db71b37a2 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -59,66 +59,51 @@ pub struct IggyPartition {
     pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>,
 }
 
-#[allow(clippy::redundant_pub_crate)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) enum PendingConsumerOffsetOwner {
-    Consumer(u32),
-    ConsumerGroup(u32),
+#[derive(Debug, Clone, Copy, PartialEq)]
+struct PendingConsumerOffsetCommit {
+    kind: ConsumerKind,
+    consumer_id: u32,
+    mutation: PendingConsumerOffsetMutation,
 }
 
-#[allow(clippy::redundant_pub_crate)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) struct PendingConsumerOffsetCommit {
-    pub(crate) mutation: PendingConsumerOffsetMutation,
-}
-
-#[allow(clippy::redundant_pub_crate)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) enum PendingConsumerOffsetMutation {
-    Upsert {
-        owner: PendingConsumerOffsetOwner,
-        offset: u64,
-    },
-    Delete {
-        owner: PendingConsumerOffsetOwner,
-    },
+#[derive(Debug, Clone, Copy, PartialEq)]
+enum PendingConsumerOffsetMutation {
+    Upsert(u64),
+    Delete,
 }
 
 impl PendingConsumerOffsetCommit {
-    pub(crate) const fn upsert(owner: PendingConsumerOffsetOwner, offset: u64) 
-> Self {
+    const fn upsert(kind: ConsumerKind, consumer_id: u32, offset: u64) -> Self 
{
         Self {
-            mutation: PendingConsumerOffsetMutation::Upsert { owner, offset },
+            kind,
+            consumer_id,
+            mutation: PendingConsumerOffsetMutation::Upsert(offset),
         }
     }
 
-    pub(crate) const fn delete(owner: PendingConsumerOffsetOwner) -> Self {
+    const fn delete(kind: ConsumerKind, consumer_id: u32) -> Self {
         Self {
-            mutation: PendingConsumerOffsetMutation::Delete { owner },
+            kind,
+            consumer_id,
+            mutation: PendingConsumerOffsetMutation::Delete,
         }
     }
 
-    pub(crate) fn try_from_polling_consumer(
+    fn try_from_polling_consumer(
         consumer: PollingConsumer,
         offset: u64,
     ) -> Result<Self, IggyError> {
-        let owner = match consumer {
-            PollingConsumer::Consumer(id, _) => 
PendingConsumerOffsetOwner::Consumer(
+        let (kind, consumer_id) = match consumer {
+            PollingConsumer::Consumer(id, _) => (
+                ConsumerKind::Consumer,
                 u32::try_from(id).map_err(|_| IggyError::InvalidCommand)?,
             ),
-            PollingConsumer::ConsumerGroup(group_id, _) => {
-                PendingConsumerOffsetOwner::ConsumerGroup(
-                    u32::try_from(group_id).map_err(|_| 
IggyError::InvalidCommand)?,
-                )
-            }
+            PollingConsumer::ConsumerGroup(group_id, _) => (
+                ConsumerKind::ConsumerGroup,
+                u32::try_from(group_id).map_err(|_| 
IggyError::InvalidCommand)?,
+            ),
         };
-        Ok(Self::upsert(owner, offset))
-    }
-
-    pub(crate) const fn owner(self) -> PendingConsumerOffsetOwner {
-        match self.mutation {
-            PendingConsumerOffsetMutation::Upsert { owner, .. }
-            | PendingConsumerOffsetMutation::Delete { owner } => owner,
-        }
+        Ok(Self::upsert(kind, consumer_id, offset))
     }
 }
 
@@ -154,43 +139,59 @@ impl IggyPartition {
         self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
     }
 
-    pub(crate) fn stage_consumer_offset_commit(
+    pub(crate) async fn persist_and_stage_consumer_offset_upsert(
         &mut self,
         op: u64,
-        pending: PendingConsumerOffsetCommit,
-    ) {
+        kind: ConsumerKind,
+        consumer_id: u32,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, 
offset);
+        self.persist_consumer_offset_commit(pending).await?;
         self.pending_consumer_offset_commits.insert(op, pending);
+        Ok(())
     }
 
-    #[must_use]
-    pub(crate) fn take_staged_consumer_offset_commit(
+    pub(crate) async fn persist_and_stage_consumer_offset_delete(
         &mut self,
         op: u64,
-    ) -> Option<PendingConsumerOffsetCommit> {
-        self.pending_consumer_offset_commits.remove(&op)
+        kind: ConsumerKind,
+        consumer_id: u32,
+    ) -> Result<(), IggyError> {
+        let pending = PendingConsumerOffsetCommit::delete(kind, consumer_id);
+        self.persist_consumer_offset_commit(pending).await?;
+        self.pending_consumer_offset_commits.insert(op, pending);
+        Ok(())
     }
 
-    pub(crate) async fn persist_consumer_offset_commit(
+    pub(crate) fn apply_staged_consumer_offset_commit(&mut self, op: u64) -> 
Result<(), IggyError> {
+        let pending = self
+            .pending_consumer_offset_commits
+            .remove(&op)
+            .ok_or(IggyError::InvalidCommand)?;
+        self.apply_consumer_offset_commit(pending);
+        Ok(())
+    }
+
+    async fn persist_consumer_offset_commit(
         &self,
         pending: PendingConsumerOffsetCommit,
     ) -> Result<(), IggyError> {
-        let Some(path) = self.persisted_offset_path(pending.owner()) else {
+        let Some(path) = self.persisted_offset_path(pending.kind, 
pending.consumer_id) else {
             return Ok(());
         };
         match pending.mutation {
-            PendingConsumerOffsetMutation::Upsert { offset, .. } => {
-                persist_offset(&path, offset).await
-            }
-            PendingConsumerOffsetMutation::Delete { .. } => 
delete_persisted_offset(&path).await,
+            PendingConsumerOffsetMutation::Upsert(offset) => 
persist_offset(&path, offset).await,
+            PendingConsumerOffsetMutation::Delete => 
delete_persisted_offset(&path).await,
         }
     }
 
-    pub(crate) fn apply_consumer_offset_commit(&self, pending: 
PendingConsumerOffsetCommit) {
+    fn apply_consumer_offset_commit(&self, pending: 
PendingConsumerOffsetCommit) {
         match pending.mutation {
-            PendingConsumerOffsetMutation::Upsert {
-                owner: PendingConsumerOffsetOwner::Consumer(id),
-                offset,
-            } => {
+            PendingConsumerOffsetMutation::Upsert(offset)
+                if pending.kind == ConsumerKind::Consumer =>
+            {
+                let id = pending.consumer_id;
                 let guard = self.consumer_offsets.pin();
                 let key = usize::try_from(id).expect("u32 consumer id must fit 
usize");
                 if let Some(existing) = guard.get(&key) {
@@ -204,10 +205,10 @@ impl IggyPartition {
                     guard.insert(key, created);
                 }
             }
-            PendingConsumerOffsetMutation::Upsert {
-                owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id),
-                offset,
-            } => {
+            PendingConsumerOffsetMutation::Upsert(offset)
+                if pending.kind == ConsumerKind::ConsumerGroup =>
+            {
+                let group_id = pending.consumer_id;
                 let guard = self.consumer_group_offsets.pin();
                 let key = ConsumerGroupId(
                     usize::try_from(group_id).expect("u32 group id must fit 
usize"),
@@ -230,22 +231,23 @@ impl IggyPartition {
                     guard.insert(key, created);
                 }
             }
-            PendingConsumerOffsetMutation::Delete {
-                owner: PendingConsumerOffsetOwner::Consumer(id),
-            } => {
+            PendingConsumerOffsetMutation::Delete if pending.kind == 
ConsumerKind::Consumer => {
+                let id = pending.consumer_id;
                 let guard = self.consumer_offsets.pin();
                 let key = usize::try_from(id).expect("u32 consumer id must fit 
usize");
                 let _ = guard.remove(&key);
             }
-            PendingConsumerOffsetMutation::Delete {
-                owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id),
-            } => {
+            PendingConsumerOffsetMutation::Delete
+                if pending.kind == ConsumerKind::ConsumerGroup =>
+            {
+                let group_id = pending.consumer_id;
                 let guard = self.consumer_group_offsets.pin();
                 let key = ConsumerGroupId(
                     usize::try_from(group_id).expect("u32 group id must fit 
usize"),
                 );
                 let _ = guard.remove(&key);
             }
+            _ => {}
         }
     }
 
@@ -260,16 +262,16 @@ impl IggyPartition {
         Ok(())
     }
 
-    fn persisted_offset_path(&self, owner: PendingConsumerOffsetOwner) -> 
Option<String> {
-        match owner {
-            PendingConsumerOffsetOwner::Consumer(id) => self
+    fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) -> 
Option<String> {
+        match kind {
+            ConsumerKind::Consumer => self
                 .consumer_offsets_path
                 .as_ref()
-                .map(|path| format!("{path}/{id}")),
-            PendingConsumerOffsetOwner::ConsumerGroup(group_id) => self
+                .map(|path| format!("{path}/{consumer_id}")),
+            ConsumerKind::ConsumerGroup => self
                 .consumer_group_offsets_path
                 .as_ref()
-                .map(|path| format!("{path}/{group_id}")),
+                .map(|path| format!("{path}/{consumer_id}")),
         }
     }
 }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 33a8c4226..eeaf83e9a 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -20,7 +20,6 @@
 use crate::IggyPartition;
 use crate::Partition;
 use crate::iggy_index_writer::IggyIndexWriter;
-use crate::iggy_partition::{PendingConsumerOffsetCommit, 
PendingConsumerOffsetOwner};
 use crate::log::JournalInfo;
 use crate::messages_writer::MessagesWriter;
 use crate::offset_storage::{
@@ -41,8 +40,8 @@ use iggy_binary_protocol::{
     RequestHeader,
 };
 use iggy_common::{
-    ConsumerGroupOffsets, ConsumerOffset, ConsumerOffsets, IggyByteSize, 
IggyError, PartitionStats,
-    SegmentStorage,
+    ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, 
IggyByteSize, IggyError,
+    PartitionStats, SegmentStorage,
     send_messages2::{convert_request_message, decode_prepare_slice},
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -728,7 +727,7 @@ where
                 Ok(())
             }
             Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
-                let pending =
+                let (kind, consumer_id, offset) =
                     
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
                 let write_lock = self
                     .get_by_ns(namespace)
@@ -736,14 +735,27 @@ where
                     .write_lock
                     .clone();
                 let _guard = write_lock.lock().await;
-                let partition = self
-                    .get_by_ns(namespace)
-                    .expect("store_consumer_offset: partition not found for 
namespace");
-                partition.persist_consumer_offset_commit(pending).await?;
                 let partition = self
                     .get_mut_by_ns(namespace)
                     .expect("store_consumer_offset: partition not found for 
namespace");
-                partition.stage_consumer_offset_commit(header.op, pending);
+                match header.operation {
+                    Operation::StoreConsumerOffset => {
+                        partition
+                            .persist_and_stage_consumer_offset_upsert(
+                                header.op,
+                                kind,
+                                consumer_id,
+                                offset.expect("store_consumer_offset must 
include offset"),
+                            )
+                            .await?;
+                    }
+                    Operation::DeleteConsumerOffset => {
+                        partition
+                            
.persist_and_stage_consumer_offset_delete(header.op, kind, consumer_id)
+                            .await?;
+                    }
+                    _ => unreachable!(),
+                }
 
                 debug!(
                     target: "iggy.partitions.diag",
@@ -752,8 +764,9 @@ where
                     op = header.op,
                     namespace_raw = namespace.inner(),
                     operation = ?header.operation,
-                    consumer = ?pending.owner(),
-                    pending = ?pending.mutation,
+                    consumer_kind = ?kind,
+                    consumer_id,
+                    offset = ?offset,
                     "replicated consumer offset persisted and staged"
                 );
                 Ok(())
@@ -1163,7 +1176,7 @@ where
     fn parse_staged_consumer_offset_commit(
         operation: Operation,
         message: &Message<PrepareHeader>,
-    ) -> Result<PendingConsumerOffsetCommit, IggyError> {
+    ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
         let total_size = message.header().size() as usize;
         let body = 
&message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size];
         let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?;
@@ -1175,11 +1188,7 @@ where
                     .map(u32::from_le_bytes)
                     .map_err(|_| IggyError::InvalidCommand)
             })?;
-        let owner = match consumer_kind {
-            1 => PendingConsumerOffsetOwner::Consumer(consumer_id),
-            2 => PendingConsumerOffsetOwner::ConsumerGroup(consumer_id),
-            _ => return Err(IggyError::InvalidCommand),
-        };
+        let kind = ConsumerKind::from_code(consumer_kind)?;
         match operation {
             Operation::StoreConsumerOffset => {
                 let offset =
@@ -1190,9 +1199,9 @@ where
                                 .map(u64::from_le_bytes)
                                 .map_err(|_| IggyError::InvalidCommand)
                         })?;
-                Ok(PendingConsumerOffsetCommit::upsert(owner, offset))
+                Ok((kind, consumer_id, Some(offset)))
             }
-            Operation::DeleteConsumerOffset => 
Ok(PendingConsumerOffsetCommit::delete(owner)),
+            Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
             _ => Err(IggyError::InvalidCommand),
         }
     }
@@ -1211,13 +1220,13 @@ where
             .clone();
         let _guard = write_lock.lock().await;
 
-        let pending = {
+        let apply_result = {
             let partition = self
                 .get_mut_by_ns(&entry_namespace)
                 .expect("commit_partition_entry: partition not found");
-            partition.take_staged_consumer_offset_commit(prepare_header.op)
+            partition.apply_staged_consumer_offset_commit(prepare_header.op)
         };
-        let Some(pending) = pending else {
+        if let Err(error) = apply_result {
             failed_ns.insert(entry_namespace);
             warn!(
                 target: "iggy.partitions.diag",
@@ -1225,23 +1234,17 @@ where
                 replica_id = consensus.replica(),
                 op = prepare_header.op,
                 namespace_raw = entry_namespace.inner(),
-                "missing staged consumer offset commit for committed prepare"
+                %error,
+                "failed to apply staged consumer offset commit"
             );
             return false;
-        };
-
-        let partition = self
-            .get_by_ns(&entry_namespace)
-            .expect("commit_partition_entry: partition not found");
-        partition.apply_consumer_offset_commit(pending);
+        }
         debug!(
             target: "iggy.partitions.diag",
             plane = "partitions",
             replica_id = consensus.replica(),
             op = prepare_header.op,
             namespace_raw = entry_namespace.inner(),
-            consumer = ?pending.owner(),
-            mutation = ?pending.mutation,
             "consumer offset committed"
         );
         true

Reply via email to