This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch store_consumer_offset
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit df6f1756898b03e502408a8eee44944fd772a217
Author: numinex <[email protected]>
AuthorDate: Thu Apr 2 14:21:42 2026 +0200

    feat(partitions): implement  StoreConsumerOffset and DeleteConsumerOffset
---
 core/binary_protocol/src/consensus/operation.rs |  13 +-
 core/binary_protocol/src/dispatch.rs            |   8 +-
 core/consensus/src/observability.rs             |   1 +
 core/partitions/src/iggy_partition.rs           | 245 +++++++++++++++++++----
 core/partitions/src/iggy_partitions.rs          | 256 ++++++++++++++++++------
 core/partitions/src/lib.rs                      |   1 +
 core/partitions/src/offset_storage.rs           | 202 +++++++++++++++++++
 core/partitions/src/types.rs                    |  55 ++++-
 core/simulator/src/client.rs                    |  13 ++
 9 files changed, 695 insertions(+), 99 deletions(-)

diff --git a/core/binary_protocol/src/consensus/operation.rs 
b/core/binary_protocol/src/consensus/operation.rs
index c17733deb..d275269ae 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -37,7 +37,7 @@ pub enum Operation {
     CreatePartitions = 136,
     DeletePartitions = 137,
     // TODO: DeleteSegments is a partition operation (is_partition() == true) 
but its
-    // discriminant sits in the metadata range (128-147). Should be moved to 
162 once
+    // discriminant sits in the metadata range (128-147). Should be moved to 
163 once
     // iggy_common's Operation enum is removed and wire compat is no longer a 
concern.
     DeleteSegments = 138,
     CreateConsumerGroup = 139,
@@ -53,6 +53,7 @@ pub enum Operation {
     // Partition operations (routed by namespace)
     SendMessages = 160,
     StoreConsumerOffset = 161,
+    DeleteConsumerOffset = 162,
 }
 
 impl Operation {
@@ -90,7 +91,10 @@ impl Operation {
     pub const fn is_partition(&self) -> bool {
         matches!(
             self,
-            Self::SendMessages | Self::StoreConsumerOffset | 
Self::DeleteSegments
+            Self::SendMessages
+                | Self::StoreConsumerOffset
+                | Self::DeleteConsumerOffset
+                | Self::DeleteSegments
         )
     }
 
@@ -122,7 +126,8 @@ impl Operation {
             | Self::CreatePersonalAccessToken
             | Self::DeletePersonalAccessToken
             | Self::SendMessages
-            | Self::StoreConsumerOffset => match 
crate::dispatch::lookup_by_operation(*self) {
+            | Self::StoreConsumerOffset
+            | Self::DeleteConsumerOffset => match 
crate::dispatch::lookup_by_operation(*self) {
                 Some(meta) => Some(meta.code),
                 None => None,
             },
@@ -170,6 +175,7 @@ mod tests {
             Operation::DeletePersonalAccessToken,
             Operation::SendMessages,
             Operation::StoreConsumerOffset,
+            Operation::DeleteConsumerOffset,
         ];
         for op in ops {
             let code = op
@@ -202,5 +208,6 @@ mod tests {
         assert!(Operation::SendMessages.is_partition());
         assert!(!Operation::SendMessages.is_metadata());
         assert!(Operation::DeleteSegments.is_partition());
+        assert!(Operation::DeleteConsumerOffset.is_partition());
     }
 }
diff --git a/core/binary_protocol/src/dispatch.rs 
b/core/binary_protocol/src/dispatch.rs
index fcd1dcb69..1aa9eb828 100644
--- a/core/binary_protocol/src/dispatch.rs
+++ b/core/binary_protocol/src/dispatch.rs
@@ -120,7 +120,11 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
         "consumer_offset.store",
         Operation::StoreConsumerOffset,
     ),
-    CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, 
"consumer_offset.delete"),
+    CommandMeta::replicated(
+        DELETE_CONSUMER_OFFSET_CODE,
+        "consumer_offset.delete",
+        Operation::DeleteConsumerOffset,
+    ),
     // Streams
     CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
     CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
@@ -260,6 +264,7 @@ pub const fn lookup_by_operation(op: Operation) -> 
Option<&'static CommandMeta>
         Operation::DeletePersonalAccessToken => 18,
         Operation::SendMessages => 21,
         Operation::StoreConsumerOffset => 24,
+        Operation::DeleteConsumerOffset => 25,
         Operation::Reserved => return None,
     };
     Some(&COMMAND_TABLE[idx])
@@ -378,6 +383,7 @@ mod tests {
             Operation::DeletePersonalAccessToken,
             Operation::SendMessages,
             Operation::StoreConsumerOffset,
+            Operation::DeleteConsumerOffset,
         ];
         for op in replicated_ops {
             let meta = lookup_by_operation(op)
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
index 865935faf..3944ef86c 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -372,6 +372,7 @@ pub const fn operation_as_str(operation: Operation) -> 
&'static str {
         Operation::DeletePersonalAccessToken => "delete_personal_access_token",
         Operation::SendMessages => "send_messages",
         Operation::StoreConsumerOffset => "store_consumer_offset",
+        Operation::DeleteConsumerOffset => "delete_consumer_offset",
     }
 }
 
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index c49911b3d..3ca31d0a0 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -19,6 +19,7 @@ use crate::journal::{
     MessageLookup, PartitionJournal, PartitionJournalMemStorage, 
QueryableJournal,
 };
 use crate::log::SegmentedLog;
+use crate::offset_storage::{delete_persisted_offset, persist_offset};
 use crate::{
     AppendResult, Partition, PartitionOffsets, PollFragments, PollQueryResult, 
PollingArgs,
     PollingConsumer,
@@ -30,6 +31,7 @@ use iggy_common::{
     send_messages2::stamp_prepare_for_persistence,
 };
 use journal::Journal as _;
+use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
 use tokio::sync::Mutex as TokioMutex;
@@ -52,6 +54,72 @@ pub struct IggyPartition {
     pub revision_id: u64,
     pub should_increment_offset: bool,
     pub write_lock: Arc<TokioMutex<()>>,
+    consumer_offsets_path: Option<String>,
+    consumer_group_offsets_path: Option<String>,
+    pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>,
+}
+
+#[allow(clippy::redundant_pub_crate)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum PendingConsumerOffsetOwner {
+    Consumer(u32),
+    ConsumerGroup(u32),
+}
+
+#[allow(clippy::redundant_pub_crate)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct PendingConsumerOffsetCommit {
+    pub(crate) mutation: PendingConsumerOffsetMutation,
+}
+
+#[allow(clippy::redundant_pub_crate)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum PendingConsumerOffsetMutation {
+    Upsert {
+        owner: PendingConsumerOffsetOwner,
+        offset: u64,
+    },
+    Delete {
+        owner: PendingConsumerOffsetOwner,
+    },
+}
+
+impl PendingConsumerOffsetCommit {
+    pub(crate) const fn upsert(owner: PendingConsumerOffsetOwner, offset: u64) 
-> Self {
+        Self {
+            mutation: PendingConsumerOffsetMutation::Upsert { owner, offset },
+        }
+    }
+
+    pub(crate) const fn delete(owner: PendingConsumerOffsetOwner) -> Self {
+        Self {
+            mutation: PendingConsumerOffsetMutation::Delete { owner },
+        }
+    }
+
+    pub(crate) fn try_from_polling_consumer(
+        consumer: PollingConsumer,
+        offset: u64,
+    ) -> Result<Self, IggyError> {
+        let owner = match consumer {
+            PollingConsumer::Consumer(id, _) => 
PendingConsumerOffsetOwner::Consumer(
+                u32::try_from(id).map_err(|_| IggyError::InvalidCommand)?,
+            ),
+            PollingConsumer::ConsumerGroup(group_id, _) => {
+                PendingConsumerOffsetOwner::ConsumerGroup(
+                    u32::try_from(group_id).map_err(|_| 
IggyError::InvalidCommand)?,
+                )
+            }
+        };
+        Ok(Self::upsert(owner, offset))
+    }
+
+    pub(crate) const fn owner(self) -> PendingConsumerOffsetOwner {
+        match self.mutation {
+            PendingConsumerOffsetMutation::Upsert { owner, .. }
+            | PendingConsumerOffsetMutation::Delete { owner } => owner,
+        }
+    }
 }
 
 impl IggyPartition {
@@ -67,6 +135,141 @@ impl IggyPartition {
             revision_id: 0,
             should_increment_offset: false,
             write_lock: Arc::new(TokioMutex::new(())),
+            consumer_offsets_path: None,
+            consumer_group_offsets_path: None,
+            pending_consumer_offset_commits: HashMap::new(),
+        }
+    }
+
+    pub fn configure_consumer_offset_storage(
+        &mut self,
+        consumer_offsets_path: String,
+        consumer_group_offsets_path: String,
+        consumer_offsets: ConsumerOffsets,
+        consumer_group_offsets: ConsumerGroupOffsets,
+    ) {
+        self.consumer_offsets = Arc::new(consumer_offsets);
+        self.consumer_group_offsets = Arc::new(consumer_group_offsets);
+        self.consumer_offsets_path = Some(consumer_offsets_path);
+        self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
+    }
+
+    pub(crate) fn stage_consumer_offset_commit(
+        &mut self,
+        op: u64,
+        pending: PendingConsumerOffsetCommit,
+    ) {
+        self.pending_consumer_offset_commits.insert(op, pending);
+    }
+
+    #[must_use]
+    pub(crate) fn take_staged_consumer_offset_commit(
+        &mut self,
+        op: u64,
+    ) -> Option<PendingConsumerOffsetCommit> {
+        self.pending_consumer_offset_commits.remove(&op)
+    }
+
+    pub(crate) async fn persist_consumer_offset_commit(
+        &self,
+        pending: PendingConsumerOffsetCommit,
+    ) -> Result<(), IggyError> {
+        let Some(path) = self.persisted_offset_path(pending.owner()) else {
+            return Ok(());
+        };
+        match pending.mutation {
+            PendingConsumerOffsetMutation::Upsert { offset, .. } => {
+                persist_offset(&path, offset).await
+            }
+            PendingConsumerOffsetMutation::Delete { .. } => 
delete_persisted_offset(&path).await,
+        }
+    }
+
+    pub(crate) fn apply_consumer_offset_commit(&self, pending: 
PendingConsumerOffsetCommit) {
+        match pending.mutation {
+            PendingConsumerOffsetMutation::Upsert {
+                owner: PendingConsumerOffsetOwner::Consumer(id),
+                offset,
+            } => {
+                let guard = self.consumer_offsets.pin();
+                let key = usize::try_from(id).expect("u32 consumer id must fit 
usize");
+                if let Some(existing) = guard.get(&key) {
+                    existing.offset.store(offset, Ordering::Relaxed);
+                } else {
+                    let created = 
self.consumer_offsets_path.as_deref().map_or_else(
+                        || ConsumerOffset::new(ConsumerKind::Consumer, id, 0, 
String::new()),
+                        |path| ConsumerOffset::default_for_consumer(id, path),
+                    );
+                    created.offset.store(offset, Ordering::Relaxed);
+                    guard.insert(key, created);
+                }
+            }
+            PendingConsumerOffsetMutation::Upsert {
+                owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id),
+                offset,
+            } => {
+                let guard = self.consumer_group_offsets.pin();
+                let key = ConsumerGroupId(
+                    usize::try_from(group_id).expect("u32 group id must fit 
usize"),
+                );
+                if let Some(existing) = guard.get(&key) {
+                    existing.offset.store(offset, Ordering::Relaxed);
+                } else {
+                    let created = 
self.consumer_group_offsets_path.as_deref().map_or_else(
+                        || {
+                            ConsumerOffset::new(
+                                ConsumerKind::ConsumerGroup,
+                                group_id,
+                                0,
+                                String::new(),
+                            )
+                        },
+                        |path| ConsumerOffset::default_for_consumer_group(key, 
path),
+                    );
+                    created.offset.store(offset, Ordering::Relaxed);
+                    guard.insert(key, created);
+                }
+            }
+            PendingConsumerOffsetMutation::Delete {
+                owner: PendingConsumerOffsetOwner::Consumer(id),
+            } => {
+                let guard = self.consumer_offsets.pin();
+                let key = usize::try_from(id).expect("u32 consumer id must fit 
usize");
+                let _ = guard.remove(&key);
+            }
+            PendingConsumerOffsetMutation::Delete {
+                owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id),
+            } => {
+                let guard = self.consumer_group_offsets.pin();
+                let key = ConsumerGroupId(
+                    usize::try_from(group_id).expect("u32 group id must fit 
usize"),
+                );
+                let _ = guard.remove(&key);
+            }
+        }
+    }
+
+    async fn store_consumer_offset_and_persist(
+        &self,
+        consumer: PollingConsumer,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let pending = 
PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?;
+        self.persist_consumer_offset_commit(pending).await?;
+        self.apply_consumer_offset_commit(pending);
+        Ok(())
+    }
+
+    fn persisted_offset_path(&self, owner: PendingConsumerOffsetOwner) -> 
Option<String> {
+        match owner {
+            PendingConsumerOffsetOwner::Consumer(id) => self
+                .consumer_offsets_path
+                .as_ref()
+                .map(|path| format!("{path}/{id}")),
+            PendingConsumerOffsetOwner::ConsumerGroup(group_id) => self
+                .consumer_group_offsets_path
+                .as_ref()
+                .map(|path| format!("{path}/{group_id}")),
         }
     }
 }
@@ -191,7 +394,10 @@ impl Partition for IggyPartition {
         if args.auto_commit && !fragments.is_empty() {
             let last_offset =
                 last_matching_offset.expect("non-empty poll result must have a 
last offset");
-            if let Err(err) = self.store_consumer_offset(consumer, 
last_offset) {
+            if let Err(err) = self
+                .store_consumer_offset_and_persist(consumer, last_offset)
+                .await
+            {
                 // warning for now.
                 warn!(
                     target: "iggy.partitions.diag",
@@ -212,41 +418,8 @@ impl Partition for IggyPartition {
         consumer: PollingConsumer,
         offset: u64,
     ) -> Result<(), IggyError> {
-        match consumer {
-            PollingConsumer::Consumer(id, _) => {
-                let guard = self.consumer_offsets.pin();
-                if let Some(existing) = guard.get(&id) {
-                    existing.offset.store(offset, Ordering::Relaxed);
-                } else {
-                    guard.insert(
-                        id,
-                        ConsumerOffset::new(
-                            ConsumerKind::Consumer,
-                            id as u32,
-                            offset,
-                            String::new(),
-                        ),
-                    );
-                }
-            }
-            PollingConsumer::ConsumerGroup(group_id, _) => {
-                let guard = self.consumer_group_offsets.pin();
-                let key = ConsumerGroupId(group_id);
-                if let Some(existing) = guard.get(&key) {
-                    existing.offset.store(offset, Ordering::Relaxed);
-                } else {
-                    guard.insert(
-                        key,
-                        ConsumerOffset::new(
-                            ConsumerKind::ConsumerGroup,
-                            group_id as u32,
-                            offset,
-                            String::new(),
-                        ),
-                    );
-                }
-            }
-        }
+        let pending = 
PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?;
+        self.apply_consumer_offset_commit(pending);
         Ok(())
     }
 
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 5d9d3d601..33a8c4226 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -19,10 +19,13 @@
 
 use crate::IggyPartition;
 use crate::Partition;
-use crate::PollingConsumer;
 use crate::iggy_index_writer::IggyIndexWriter;
+use crate::iggy_partition::{PendingConsumerOffsetCommit, 
PendingConsumerOffsetOwner};
 use crate::log::JournalInfo;
 use crate::messages_writer::MessagesWriter;
+use crate::offset_storage::{
+    create_offset_file_hierarchy, load_consumer_group_offsets, 
load_consumer_offsets,
+};
 use crate::segment::Segment;
 use crate::types::PartitionsConfig;
 use consensus::PlaneIdentity;
@@ -38,7 +41,8 @@ use iggy_binary_protocol::{
     RequestHeader,
 };
 use iggy_common::{
-    IggyByteSize, IggyError, PartitionStats, SegmentStorage,
+    ConsumerGroupOffsets, ConsumerOffset, ConsumerOffsets, IggyByteSize, 
IggyError, PartitionStats,
+    SegmentStorage,
     send_messages2::{convert_request_message, decode_prepare_slice},
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -316,6 +320,25 @@ impl<C> IggyPartitions<C> {
         // Create initial segment with storage
         let start_offset = 0;
         let segment = Segment::new(start_offset, self.config.segment_size);
+        create_offset_file_hierarchy(&self.config, namespace).await?;
+
+        let consumer_offsets_path = self.config.get_consumer_offsets_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+        );
+        let consumer_group_offsets_path = 
self.config.get_consumer_group_offsets_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+        );
+        let consumer_offsets =
+            Self::load_partition_consumer_offsets(&consumer_offsets_path, 
start_offset, namespace)?;
+        let consumer_group_offsets = 
Self::load_partition_consumer_group_offsets(
+            &consumer_group_offsets_path,
+            start_offset,
+            namespace,
+        )?;
 
         // TODO: Waiting for issue to move server config to shared module.
         // Once complete, paths will come from proper 
base_path/streams_path/etc config fields.
@@ -372,6 +395,12 @@ impl<C> IggyPartitions<C> {
         // Create partition with initialized log
         let stats = Arc::new(PartitionStats::default());
         let mut partition = IggyPartition::new(stats);
+        partition.configure_consumer_offset_storage(
+            consumer_offsets_path,
+            consumer_group_offsets_path,
+            consumer_offsets,
+            consumer_group_offsets,
+        );
         partition.log.add_persisted_segment(
             segment,
             storage,
@@ -387,6 +416,62 @@ impl<C> IggyPartitions<C> {
 
         Ok(self.insert(namespace, partition))
     }
+
+    fn load_partition_consumer_offsets(
+        path: &str,
+        current_offset: u64,
+        namespace: IggyNamespace,
+    ) -> Result<ConsumerOffsets, IggyError> {
+        let offsets = load_consumer_offsets(path)?
+            .into_iter()
+            .map(|offset| {
+                let offset = Self::clamp_loaded_offset(namespace, 
current_offset, offset);
+                (
+                    usize::try_from(offset.consumer_id).expect("u32 consumer 
id must fit usize"),
+                    offset,
+                )
+            })
+            .collect::<Vec<_>>();
+        Ok(ConsumerOffsets::from(offsets))
+    }
+
+    fn load_partition_consumer_group_offsets(
+        path: &str,
+        current_offset: u64,
+        namespace: IggyNamespace,
+    ) -> Result<ConsumerGroupOffsets, IggyError> {
+        let offsets = load_consumer_group_offsets(path)?
+            .into_iter()
+            .map(|(group_id, offset)| {
+                let offset = Self::clamp_loaded_offset(namespace, 
current_offset, offset);
+                (group_id, offset)
+            })
+            .collect::<Vec<_>>();
+        Ok(ConsumerGroupOffsets::from(offsets))
+    }
+
+    fn clamp_loaded_offset(
+        namespace: IggyNamespace,
+        current_offset: u64,
+        offset: ConsumerOffset,
+    ) -> ConsumerOffset {
+        let persisted_offset = offset.offset.load(Ordering::Relaxed);
+        if persisted_offset <= current_offset {
+            return offset;
+        }
+
+        warn!(
+            target: "iggy.partitions.diag",
+            namespace_raw = namespace.inner(),
+            consumer_id = offset.consumer_id,
+            persisted_offset,
+            current_offset,
+            path = %offset.path,
+            "clamping recovered consumer offset to current partition offset"
+        );
+        offset.offset.store(current_offset, Ordering::Relaxed);
+        offset
+    }
 }
 
 impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, 
NamespacedPipeline>>
@@ -642,49 +727,23 @@ where
                 );
                 Ok(())
             }
-            Operation::StoreConsumerOffset => {
-                let total_size = header.size() as usize;
-                let body = 
&message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size];
-                let consumer_kind = 
*body.first().ok_or(IggyError::InvalidCommand)?;
-                let consumer_id =
-                    body.get(1..5)
-                        .ok_or(IggyError::InvalidCommand)
-                        .and_then(|bytes| {
-                            <[u8; 4]>::try_from(bytes)
-                                .map(u32::from_le_bytes)
-                                .map(|value| value as usize)
-                                .map_err(|_| IggyError::InvalidCommand)
-                        })?;
-                let offset =
-                    body.get(5..13)
-                        .ok_or(IggyError::InvalidCommand)
-                        .and_then(|bytes| {
-                            <[u8; 8]>::try_from(bytes)
-                                .map(u64::from_le_bytes)
-                                .map_err(|_| IggyError::InvalidCommand)
-                        })?;
-                let consumer = match consumer_kind {
-                    1 => PollingConsumer::Consumer(consumer_id, 0),
-                    2 => PollingConsumer::ConsumerGroup(consumer_id, 0),
-                    _ => {
-                        warn!(
-                            target: "iggy.partitions.diag",
-                            plane = "partitions",
-                            replica = consensus.replica(),
-                            op = header.op,
-                            namespace_raw = namespace.inner(),
-                            operation = ?header.operation,
-                            consumer_kind,
-                            "unknown consumer kind while applying replicated 
offset update"
-                        );
-                        return Err(IggyError::InvalidCommand);
-                    }
-                };
-
+            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
+                let pending =
+                    
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
+                let write_lock = self
+                    .get_by_ns(namespace)
+                    .expect("store_consumer_offset: partition not found for 
namespace")
+                    .write_lock
+                    .clone();
+                let _guard = write_lock.lock().await;
                 let partition = self
                     .get_by_ns(namespace)
                     .expect("store_consumer_offset: partition not found for 
namespace");
-                let _ = partition.store_consumer_offset(consumer, offset);
+                partition.persist_consumer_offset_commit(pending).await?;
+                let partition = self
+                    .get_mut_by_ns(namespace)
+                    .expect("store_consumer_offset: partition not found for 
namespace");
+                partition.stage_consumer_offset_commit(header.op, pending);
 
                 debug!(
                     target: "iggy.partitions.diag",
@@ -693,10 +752,9 @@ where
                     op = header.op,
                     namespace_raw = namespace.inner(),
                     operation = ?header.operation,
-                    consumer_kind,
-                    consumer_id,
-                    offset,
-                    "replicated consumer offset stored"
+                    consumer = ?pending.owner(),
+                    pending = ?pending.mutation,
+                    "replicated consumer offset persisted and staged"
                 );
                 Ok(())
             }
@@ -1055,17 +1113,14 @@ where
 
                 !failed_ns.contains(&entry_namespace)
             }
-            Operation::StoreConsumerOffset => {
-                // TODO: Commit consumer offset update.
-                debug!(
-                    target: "iggy.partitions.diag",
-                    plane = "partitions",
-                    replica_id = consensus.replica(),
-                    op = prepare_header.op,
-                    namespace_raw = entry_namespace.inner(),
-                    "consumer offset committed"
-                );
-                true
+            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
+                self.commit_consumer_offset_entry(
+                    consensus,
+                    prepare_header,
+                    entry_namespace,
+                    failed_ns,
+                )
+                .await
             }
             _ => {
                 warn!(
@@ -1105,6 +1160,93 @@ where
         ))
     }
 
+    fn parse_staged_consumer_offset_commit(
+        operation: Operation,
+        message: &Message<PrepareHeader>,
+    ) -> Result<PendingConsumerOffsetCommit, IggyError> {
+        let total_size = message.header().size() as usize;
+        let body = 
&message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size];
+        let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?;
+        let consumer_id = body
+            .get(1..5)
+            .ok_or(IggyError::InvalidCommand)
+            .and_then(|bytes| {
+                <[u8; 4]>::try_from(bytes)
+                    .map(u32::from_le_bytes)
+                    .map_err(|_| IggyError::InvalidCommand)
+            })?;
+        let owner = match consumer_kind {
+            1 => PendingConsumerOffsetOwner::Consumer(consumer_id),
+            2 => PendingConsumerOffsetOwner::ConsumerGroup(consumer_id),
+            _ => return Err(IggyError::InvalidCommand),
+        };
+        match operation {
+            Operation::StoreConsumerOffset => {
+                let offset =
+                    body.get(5..13)
+                        .ok_or(IggyError::InvalidCommand)
+                        .and_then(|bytes| {
+                            <[u8; 8]>::try_from(bytes)
+                                .map(u64::from_le_bytes)
+                                .map_err(|_| IggyError::InvalidCommand)
+                        })?;
+                Ok(PendingConsumerOffsetCommit::upsert(owner, offset))
+            }
+            Operation::DeleteConsumerOffset => 
Ok(PendingConsumerOffsetCommit::delete(owner)),
+            _ => Err(IggyError::InvalidCommand),
+        }
+    }
+
+    async fn commit_consumer_offset_entry(
+        &self,
+        consensus: &VsrConsensus<B, NamespacedPipeline>,
+        prepare_header: PrepareHeader,
+        entry_namespace: IggyNamespace,
+        failed_ns: &mut HashSet<IggyNamespace>,
+    ) -> bool {
+        let write_lock = self
+            .get_by_ns(&entry_namespace)
+            .expect("commit_partition_entry: partition not found")
+            .write_lock
+            .clone();
+        let _guard = write_lock.lock().await;
+
+        let pending = {
+            let partition = self
+                .get_mut_by_ns(&entry_namespace)
+                .expect("commit_partition_entry: partition not found");
+            partition.take_staged_consumer_offset_commit(prepare_header.op)
+        };
+        let Some(pending) = pending else {
+            failed_ns.insert(entry_namespace);
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id = consensus.replica(),
+                op = prepare_header.op,
+                namespace_raw = entry_namespace.inner(),
+                "missing staged consumer offset commit for committed prepare"
+            );
+            return false;
+        };
+
+        let partition = self
+            .get_by_ns(&entry_namespace)
+            .expect("commit_partition_entry: partition not found");
+        partition.apply_consumer_offset_commit(pending);
+        debug!(
+            target: "iggy.partitions.diag",
+            plane = "partitions",
+            replica_id = consensus.replica(),
+            op = prepare_header.op,
+            namespace_raw = entry_namespace.inner(),
+            consumer = ?pending.owner(),
+            mutation = ?pending.mutation,
+            "consumer offset committed"
+        );
+        true
+    }
+
     /// Persist frozen batches to disk and update segment bookkeeping.
     async fn persist_frozen_batches_to_disk(
         &self,
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index fd5bb0234..be2b18647 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -24,6 +24,7 @@ mod iggy_partitions;
 mod journal;
 mod log;
 mod messages_writer;
+mod offset_storage;
 mod segment;
 mod types;
 
diff --git a/core/partitions/src/offset_storage.rs 
b/core/partitions/src/offset_storage.rs
new file mode 100644
index 000000000..9f62b7d08
--- /dev/null
+++ b/core/partitions/src/offset_storage.rs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PartitionsConfig;
+use compio::{
+    fs::{OpenOptions, create_dir_all, remove_file},
+    io::AsyncWriteAtExt,
+};
+use iggy_common::{
+    ConsumerGroupId, ConsumerKind, ConsumerOffset, IggyError, 
sharding::IggyNamespace,
+};
+use std::{io::Read, path::Path};
+
+pub async fn create_offset_file_hierarchy(
+    config: &PartitionsConfig,
+    namespace: IggyNamespace,
+) -> Result<(), IggyError> {
+    let stream_id = namespace.stream_id();
+    let topic_id = namespace.topic_id();
+    let partition_id = namespace.partition_id();
+    let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id);
+
+    if !Path::new(&partition_path).exists() {
+        create_dir_all(&partition_path).await.map_err(|_| {
+            IggyError::CannotCreatePartitionDirectory(partition_id, stream_id, 
topic_id)
+        })?;
+    }
+
+    let consumer_offsets_path = config.get_consumer_offsets_path(stream_id, 
topic_id, partition_id);
+    if !Path::new(&consumer_offsets_path).exists() {
+        create_dir_all(&consumer_offsets_path).await.map_err(|_| {
+            
IggyError::CannotCreateConsumerOffsetsDirectory(consumer_offsets_path.clone())
+        })?;
+    }
+
+    let consumer_group_offsets_path =
+        config.get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id);
+    if !Path::new(&consumer_group_offsets_path).exists() {
+        create_dir_all(&consumer_group_offsets_path)
+            .await
+            .map_err(|_| {
+                
IggyError::CannotCreateConsumerOffsetsDirectory(consumer_group_offsets_path.clone())
+            })?;
+    }
+
+    Ok(())
+}
+
+pub async fn persist_offset(path: &str, offset: u64) -> Result<(), IggyError> {
+    if let Some(parent) = Path::new(path).parent()
+        && !parent.exists()
+    {
+        create_dir_all(parent).await.map_err(|_| {
+            
IggyError::CannotCreateConsumerOffsetsDirectory(parent.display().to_string())
+        })?;
+    }
+
+    let mut file = OpenOptions::new()
+        .write(true)
+        .create(true)
+        .open(path)
+        .await
+        .map_err(|_| 
IggyError::CannotOpenConsumerOffsetsFile(path.to_owned()))?;
+    let buf = offset.to_le_bytes();
+    file.write_all_at(buf, 0)
+        .await
+        .0
+        .map_err(|_| IggyError::CannotWriteToFile)?;
+    Ok(())
+}
+
+pub async fn delete_persisted_offset(path: &str) -> Result<(), IggyError> {
+    if !Path::new(path).exists() {
+        return Ok(());
+    }
+
+    remove_file(path)
+        .await
+        .map_err(|_| 
IggyError::CannotDeleteConsumerOffsetFile(path.to_owned()))
+}
+
+pub fn load_consumer_offsets(path: &str) -> Result<Vec<ConsumerOffset>, 
IggyError> {
+    if !Path::new(path).exists() {
+        return Ok(Vec::new());
+    }
+
+    let dir_entries = std::fs::read_dir(path)
+        .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+    let mut consumer_offsets = Vec::new();
+
+    for dir_entry in dir_entries {
+        let dir_entry =
+            dir_entry.map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        let metadata = dir_entry
+            .metadata()
+            .map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        if metadata.is_dir() {
+            continue;
+        }
+
+        let file_name = dir_entry
+            .file_name()
+            .into_string()
+            .map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        let consumer_id = file_name
+            .parse::<u32>()
+            .map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        let offset_path = dir_entry
+            .path()
+            .to_str()
+            .ok_or_else(|| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?
+            .to_owned();
+        let offset = read_offset(&offset_path)?;
+
+        consumer_offsets.push(ConsumerOffset::new(
+            ConsumerKind::Consumer,
+            consumer_id,
+            offset,
+            offset_path,
+        ));
+    }
+
+    consumer_offsets.sort_by_key(|offset| offset.consumer_id);
+    Ok(consumer_offsets)
+}
+
+pub fn load_consumer_group_offsets(
+    path: &str,
+) -> Result<Vec<(ConsumerGroupId, ConsumerOffset)>, IggyError> {
+    if !Path::new(path).exists() {
+        return Ok(Vec::new());
+    }
+
+    let dir_entries = std::fs::read_dir(path)
+        .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+    let mut consumer_group_offsets = Vec::new();
+
+    for dir_entry in dir_entries {
+        let dir_entry =
+            dir_entry.map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        let metadata = dir_entry
+            .metadata()
+            .map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        if metadata.is_dir() {
+            continue;
+        }
+
+        let file_name = dir_entry
+            .file_name()
+            .into_string()
+            .map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        let consumer_group_id = file_name
+            .parse::<u32>()
+            .map_err(|_| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?;
+        let offset_path = dir_entry
+            .path()
+            .to_str()
+            .ok_or_else(|| 
IggyError::CannotReadConsumerOffsets(path.to_owned()))?
+            .to_owned();
+        let offset = read_offset(&offset_path)?;
+
+        consumer_group_offsets.push((
+            ConsumerGroupId(
+                usize::try_from(consumer_group_id).expect("u32 group id must 
fit usize"),
+            ),
+            ConsumerOffset::new(
+                ConsumerKind::ConsumerGroup,
+                consumer_group_id,
+                offset,
+                offset_path,
+            ),
+        ));
+    }
+
+    consumer_group_offsets.sort_by_key(|(group_id, _)| group_id.0);
+    Ok(consumer_group_offsets)
+}
+
+fn read_offset(path: &str) -> Result<u64, IggyError> {
+    let file = std::fs::File::open(path).map_err(|_| 
IggyError::CannotReadFile)?;
+    let mut cursor = std::io::Cursor::new(file);
+    let mut offset = [0; 8];
+    cursor
+        .get_mut()
+        .read_exact(&mut offset)
+        .map_err(|_| IggyError::CannotReadFile)?;
+    Ok(u64::from_le_bytes(offset))
+}
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
index dbaf57a59..178922755 100644
--- a/core/partitions/src/types.rs
+++ b/core/partitions/src/types.rs
@@ -219,6 +219,16 @@ pub struct PartitionsConfig {
 }
 
 impl PartitionsConfig {
+    #[must_use]
+    pub fn get_partition_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) -> String {
+        
format!("/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}",)
+    }
+
     /// Constructs the file path for segment messages.
     ///
     /// TODO: This is a stub waiting for completion of issue to move server 
config
@@ -233,7 +243,8 @@ impl PartitionsConfig {
         start_offset: u64,
     ) -> String {
         format!(
-            
"/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.log",
+            "{}/{start_offset:0>20}.log",
+            self.get_partition_path(stream_id, topic_id, partition_id)
         )
     }
 
@@ -251,7 +262,47 @@ impl PartitionsConfig {
         start_offset: u64,
     ) -> String {
         format!(
-            
"/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.index",
+            "{}/{start_offset:0>20}.index",
+            self.get_partition_path(stream_id, topic_id, partition_id)
+        )
+    }
+
+    #[must_use]
+    pub fn get_offsets_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) -> String {
+        format!(
+            "{}/offsets",
+            self.get_partition_path(stream_id, topic_id, partition_id)
+        )
+    }
+
+    #[must_use]
+    pub fn get_consumer_offsets_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) -> String {
+        format!(
+            "{}/consumers",
+            self.get_offsets_path(stream_id, topic_id, partition_id)
+        )
+    }
+
+    #[must_use]
+    pub fn get_consumer_group_offsets_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) -> String {
+        format!(
+            "{}/groups",
+            self.get_offsets_path(stream_id, topic_id, partition_id)
         )
     }
 }
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 2d64d0f40..fce752b62 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -114,6 +114,19 @@ impl SimClient {
         self.build_request_with_namespace(Operation::StoreConsumerOffset, 
&payload, namespace)
     }
 
+    pub fn delete_consumer_offset(
+        &self,
+        namespace: IggyNamespace,
+        consumer_kind: u8,
+        consumer_id: u32,
+    ) -> Message<RequestHeader> {
+        let mut payload = Vec::with_capacity(5);
+        payload.push(consumer_kind);
+        payload.extend_from_slice(&consumer_id.to_le_bytes());
+
+        self.build_request_with_namespace(Operation::DeleteConsumerOffset, 
&payload, namespace)
+    }
+
     #[allow(clippy::cast_possible_truncation)]
     fn build_request_with_namespace(
         &self,


Reply via email to