numinnex commented on code in PR #3071:
URL: https://github.com/apache/iggy/pull/3071#discussion_r3052801230


##########
core/partitions/src/iggy_partition.rs:
##########
@@ -67,11 +145,193 @@ impl IggyPartition {
             revision_id: 0,
             should_increment_offset: false,
             write_lock: Arc::new(TokioMutex::new(())),
+            consumer_offsets_path: None,
+            consumer_group_offsets_path: None,
+            pending_consumer_offset_commits: HashMap::new(),
+        }
+    }
+
+    #[must_use]
+    pub const fn consensus(&self) -> &VsrConsensus<B> {
+        &self.consensus
+    }
+
+    #[must_use]
+    pub fn with_in_memory_storage(
+        stats: Arc<PartitionStats>,
+        consensus: VsrConsensus<B>,
+        segment_size: IggyByteSize,
+    ) -> Self {
+        let mut partition = Self::new(stats, consensus);
+        let start_offset = 0;
+        let segment = Segment::new(start_offset, segment_size);
+        let storage = SegmentStorage::default();
+        partition
+            .log
+            .add_persisted_segment(segment, storage, None, None);
+        partition.offset.store(start_offset, Ordering::Release);
+        partition
+            .dirty_offset
+            .store(start_offset, Ordering::Relaxed);
+        partition.should_increment_offset = false;
+        partition.stats.increment_segments_count(1);
+        partition
+    }
+
+    pub fn configure_consumer_offset_storage(
+        &mut self,
+        consumer_offsets_path: String,
+        consumer_group_offsets_path: String,
+        consumer_offsets: ConsumerOffsets,
+        consumer_group_offsets: ConsumerGroupOffsets,
+    ) {
+        self.consumer_offsets = Arc::new(consumer_offsets);
+        self.consumer_group_offsets = Arc::new(consumer_group_offsets);
+        self.consumer_offsets_path = Some(consumer_offsets_path);
+        self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
+    }
+
+    pub(crate) async fn persist_and_stage_consumer_offset_upsert(
+        &mut self,
+        op: u64,
+        kind: ConsumerKind,
+        consumer_id: u32,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, 
offset);
+        self.persist_consumer_offset_commit(pending).await?;

Review Comment:
   I think the behavior of treating written offset as `committed` by default is 
fine, as in scenario where we restart a replica after crash we perform state 
transfer, which would overwrite the currently commited offset for that 
particular consumer. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to