krishvishal commented on code in PR #3071:
URL: https://github.com/apache/iggy/pull/3071#discussion_r3037225386


##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -642,49 +726,36 @@ where
                 );
                 Ok(())
             }
-            Operation::StoreConsumerOffset => {
-                let total_size = header.size() as usize;
-                let body = 
&message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size];
-                let consumer_kind = 
*body.first().ok_or(IggyError::InvalidCommand)?;
-                let consumer_id =
-                    body.get(1..5)
-                        .ok_or(IggyError::InvalidCommand)
-                        .and_then(|bytes| {
-                            <[u8; 4]>::try_from(bytes)
-                                .map(u32::from_le_bytes)
-                                .map(|value| value as usize)
-                                .map_err(|_| IggyError::InvalidCommand)
-                        })?;
-                let offset =
-                    body.get(5..13)
-                        .ok_or(IggyError::InvalidCommand)
-                        .and_then(|bytes| {
-                            <[u8; 8]>::try_from(bytes)
-                                .map(u64::from_le_bytes)
-                                .map_err(|_| IggyError::InvalidCommand)
-                        })?;
-                let consumer = match consumer_kind {
-                    1 => PollingConsumer::Consumer(consumer_id, 0),
-                    2 => PollingConsumer::ConsumerGroup(consumer_id, 0),
-                    _ => {
-                        warn!(
-                            target: "iggy.partitions.diag",
-                            plane = "partitions",
-                            replica = consensus.replica(),
-                            op = header.op,
-                            namespace_raw = namespace.inner(),
-                            operation = ?header.operation,
-                            consumer_kind,
-                            "unknown consumer kind while applying replicated 
offset update"
-                        );
-                        return Err(IggyError::InvalidCommand);
-                    }
-                };
-
-                let partition = self
+            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
+                let (kind, consumer_id, offset) =
+                    
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
+                let write_lock = self
                     .get_by_ns(namespace)
+                    .expect("store_consumer_offset: partition not found for 
namespace")

Review Comment:
   Return `IggyError` instead of using `expect`.



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -67,6 +120,158 @@ impl IggyPartition {
             revision_id: 0,
             should_increment_offset: false,
             write_lock: Arc::new(TokioMutex::new(())),
+            consumer_offsets_path: None,
+            consumer_group_offsets_path: None,
+            pending_consumer_offset_commits: HashMap::new(),
+        }
+    }
+
+    pub fn configure_consumer_offset_storage(
+        &mut self,
+        consumer_offsets_path: String,
+        consumer_group_offsets_path: String,
+        consumer_offsets: ConsumerOffsets,
+        consumer_group_offsets: ConsumerGroupOffsets,
+    ) {
+        self.consumer_offsets = Arc::new(consumer_offsets);
+        self.consumer_group_offsets = Arc::new(consumer_group_offsets);
+        self.consumer_offsets_path = Some(consumer_offsets_path);
+        self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
+    }
+
+    pub(crate) async fn persist_and_stage_consumer_offset_upsert(
+        &mut self,
+        op: u64,
+        kind: ConsumerKind,
+        consumer_id: u32,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, 
offset);
+        self.persist_consumer_offset_commit(pending).await?;
+        self.pending_consumer_offset_commits.insert(op, pending);
+        Ok(())
+    }
+
+    pub(crate) async fn persist_and_stage_consumer_offset_delete(
+        &mut self,
+        op: u64,
+        kind: ConsumerKind,
+        consumer_id: u32,
+    ) -> Result<(), IggyError> {
+        let pending = PendingConsumerOffsetCommit::delete(kind, consumer_id);
+        self.persist_consumer_offset_commit(pending).await?;
+        self.pending_consumer_offset_commits.insert(op, pending);
+        Ok(())
+    }
+
+    pub(crate) fn apply_staged_consumer_offset_commit(&mut self, op: u64) -> 
Result<(), IggyError> {
+        let pending = self
+            .pending_consumer_offset_commits
+            .remove(&op)
+            .ok_or(IggyError::InvalidCommand)?;
+        self.apply_consumer_offset_commit(pending);
+        Ok(())
+    }
+
+    async fn persist_consumer_offset_commit(
+        &self,
+        pending: PendingConsumerOffsetCommit,
+    ) -> Result<(), IggyError> {
+        let Some(path) = self.persisted_offset_path(pending.kind, 
pending.consumer_id) else {
+            return Ok(());
+        };
+        match pending.mutation {
+            PendingConsumerOffsetMutation::Upsert(offset) => 
persist_offset(&path, offset).await,
+            PendingConsumerOffsetMutation::Delete => 
delete_persisted_offset(&path).await,
+        }
+    }
+
+    fn apply_consumer_offset_commit(&self, pending: 
PendingConsumerOffsetCommit) {
+        match pending.mutation {
+            PendingConsumerOffsetMutation::Upsert(offset)
+                if pending.kind == ConsumerKind::Consumer =>
+            {
+                let id = pending.consumer_id;
+                let guard = self.consumer_offsets.pin();
+                let key = usize::try_from(id).expect("u32 consumer id must fit 
usize");
+                if let Some(existing) = guard.get(&key) {
+                    existing.offset.store(offset, Ordering::Relaxed);
+                } else {
+                    let created = 
self.consumer_offsets_path.as_deref().map_or_else(
+                        || ConsumerOffset::new(ConsumerKind::Consumer, id, 0, 
String::new()),
+                        |path| ConsumerOffset::default_for_consumer(id, path),
+                    );
+                    created.offset.store(offset, Ordering::Relaxed);
+                    guard.insert(key, created);
+                }
+            }
+            PendingConsumerOffsetMutation::Upsert(offset)
+                if pending.kind == ConsumerKind::ConsumerGroup =>
+            {
+                let group_id = pending.consumer_id;
+                let guard = self.consumer_group_offsets.pin();
+                let key = ConsumerGroupId(
+                    usize::try_from(group_id).expect("u32 group id must fit 
usize"),
+                );
+                if let Some(existing) = guard.get(&key) {
+                    existing.offset.store(offset, Ordering::Relaxed);
+                } else {
+                    let created = 
self.consumer_group_offsets_path.as_deref().map_or_else(
+                        || {
+                            ConsumerOffset::new(
+                                ConsumerKind::ConsumerGroup,
+                                group_id,
+                                0,
+                                String::new(),
+                            )
+                        },
+                        |path| ConsumerOffset::default_for_consumer_group(key, 
path),
+                    );
+                    created.offset.store(offset, Ordering::Relaxed);
+                    guard.insert(key, created);
+                }
+            }
+            PendingConsumerOffsetMutation::Delete if pending.kind == 
ConsumerKind::Consumer => {
+                let id = pending.consumer_id;
+                let guard = self.consumer_offsets.pin();
+                let key = usize::try_from(id).expect("u32 consumer id must fit 
usize");
+                let _ = guard.remove(&key);
+            }
+            PendingConsumerOffsetMutation::Delete
+                if pending.kind == ConsumerKind::ConsumerGroup =>
+            {
+                let group_id = pending.consumer_id;
+                let guard = self.consumer_group_offsets.pin();
+                let key = ConsumerGroupId(
+                    usize::try_from(group_id).expect("u32 group id must fit 
usize"),
+                );
+                let _ = guard.remove(&key);
+            }
+            _ => {}

Review Comment:
   Instead of silent catch-all. Make it `unreachable!`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to