This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch store_consumer_offset in repository https://gitbox.apache.org/repos/asf/iggy.git
commit df6f1756898b03e502408a8eee44944fd772a217 Author: numinex <[email protected]> AuthorDate: Thu Apr 2 14:21:42 2026 +0200 feat(partitions): implement StoreConsumerOffset and DeleteConsumerOffset --- core/binary_protocol/src/consensus/operation.rs | 13 +- core/binary_protocol/src/dispatch.rs | 8 +- core/consensus/src/observability.rs | 1 + core/partitions/src/iggy_partition.rs | 245 +++++++++++++++++++---- core/partitions/src/iggy_partitions.rs | 256 ++++++++++++++++++------ core/partitions/src/lib.rs | 1 + core/partitions/src/offset_storage.rs | 202 +++++++++++++++++++ core/partitions/src/types.rs | 55 ++++- core/simulator/src/client.rs | 13 ++ 9 files changed, 695 insertions(+), 99 deletions(-) diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index c17733deb..d275269ae 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -37,7 +37,7 @@ pub enum Operation { CreatePartitions = 136, DeletePartitions = 137, // TODO: DeleteSegments is a partition operation (is_partition() == true) but its - // discriminant sits in the metadata range (128-147). Should be moved to 162 once + // discriminant sits in the metadata range (128-147). Should be moved to 163 once // iggy_common's Operation enum is removed and wire compat is no longer a concern. DeleteSegments = 138, CreateConsumerGroup = 139, @@ -53,6 +53,7 @@ pub enum Operation { // Partition operations (routed by namespace) SendMessages = 160, StoreConsumerOffset = 161, + DeleteConsumerOffset = 162, } impl Operation { @@ -90,7 +91,10 @@ impl Operation { pub const fn is_partition(&self) -> bool { matches!( self, - Self::SendMessages | Self::StoreConsumerOffset | Self::DeleteSegments + Self::SendMessages + | Self::StoreConsumerOffset + | Self::DeleteConsumerOffset + | Self::DeleteSegments ) } @@ -122,7 +126,8 @@ impl Operation { | Self::CreatePersonalAccessToken | Self::DeletePersonalAccessToken | Self::SendMessages - | Self::StoreConsumerOffset => match crate::dispatch::lookup_by_operation(*self) { + | Self::StoreConsumerOffset + | Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) { Some(meta) => Some(meta.code), None => None, }, @@ -170,6 +175,7 @@ mod tests { Operation::DeletePersonalAccessToken, Operation::SendMessages, Operation::StoreConsumerOffset, + Operation::DeleteConsumerOffset, ]; for op in ops { let code = op @@ -202,5 +208,6 @@ mod tests { assert!(Operation::SendMessages.is_partition()); assert!(!Operation::SendMessages.is_metadata()); assert!(Operation::DeleteSegments.is_partition()); + assert!(Operation::DeleteConsumerOffset.is_partition()); } } diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs index fcd1dcb69..1aa9eb828 100644 --- a/core/binary_protocol/src/dispatch.rs +++ b/core/binary_protocol/src/dispatch.rs @@ -120,7 +120,11 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[ "consumer_offset.store", Operation::StoreConsumerOffset, ), - CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, "consumer_offset.delete"), + CommandMeta::replicated( + DELETE_CONSUMER_OFFSET_CODE, + "consumer_offset.delete", + Operation::DeleteConsumerOffset, + ), // Streams CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"), CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"), @@ -260,6 +264,7 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> Operation::DeletePersonalAccessToken => 18, Operation::SendMessages => 21, Operation::StoreConsumerOffset => 24, + Operation::DeleteConsumerOffset => 25, Operation::Reserved => return None, }; Some(&COMMAND_TABLE[idx]) @@ -378,6 +383,7 @@ mod tests { Operation::DeletePersonalAccessToken, Operation::SendMessages, Operation::StoreConsumerOffset, + Operation::DeleteConsumerOffset, ]; for op in replicated_ops { let meta = lookup_by_operation(op) diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs index 865935faf..3944ef86c 100644 --- a/core/consensus/src/observability.rs +++ b/core/consensus/src/observability.rs @@ -372,6 +372,7 @@ pub const fn operation_as_str(operation: Operation) -> &'static str { Operation::DeletePersonalAccessToken => "delete_personal_access_token", Operation::SendMessages => "send_messages", Operation::StoreConsumerOffset => "store_consumer_offset", + Operation::DeleteConsumerOffset => "delete_consumer_offset", } } diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index c49911b3d..3ca31d0a0 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -19,6 +19,7 @@ use crate::journal::{ MessageLookup, PartitionJournal, PartitionJournalMemStorage, QueryableJournal, }; use crate::log::SegmentedLog; +use crate::offset_storage::{delete_persisted_offset, persist_offset}; use crate::{ AppendResult, Partition, PartitionOffsets, PollFragments, PollQueryResult, PollingArgs, PollingConsumer, @@ -30,6 +31,7 @@ use iggy_common::{ send_messages2::stamp_prepare_for_persistence, }; use journal::Journal as _; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex as TokioMutex; @@ -52,6 +54,72 @@ pub struct IggyPartition { pub revision_id: u64, pub should_increment_offset: bool, pub write_lock: Arc<TokioMutex<()>>, + consumer_offsets_path: Option<String>, + consumer_group_offsets_path: Option<String>, + pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>, +} + +#[allow(clippy::redundant_pub_crate)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PendingConsumerOffsetOwner { + Consumer(u32), + ConsumerGroup(u32), +} + +#[allow(clippy::redundant_pub_crate)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct PendingConsumerOffsetCommit { + pub(crate) mutation: PendingConsumerOffsetMutation, +} + +#[allow(clippy::redundant_pub_crate)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PendingConsumerOffsetMutation { + Upsert { + owner: PendingConsumerOffsetOwner, + offset: u64, + }, + Delete { + owner: PendingConsumerOffsetOwner, + }, +} + +impl PendingConsumerOffsetCommit { + pub(crate) const fn upsert(owner: PendingConsumerOffsetOwner, offset: u64) -> Self { + Self { + mutation: PendingConsumerOffsetMutation::Upsert { owner, offset }, + } + } + + pub(crate) const fn delete(owner: PendingConsumerOffsetOwner) -> Self { + Self { + mutation: PendingConsumerOffsetMutation::Delete { owner }, + } + } + + pub(crate) fn try_from_polling_consumer( + consumer: PollingConsumer, + offset: u64, + ) -> Result<Self, IggyError> { + let owner = match consumer { + PollingConsumer::Consumer(id, _) => PendingConsumerOffsetOwner::Consumer( + u32::try_from(id).map_err(|_| IggyError::InvalidCommand)?, + ), + PollingConsumer::ConsumerGroup(group_id, _) => { + PendingConsumerOffsetOwner::ConsumerGroup( + u32::try_from(group_id).map_err(|_| IggyError::InvalidCommand)?, + ) + } + }; + Ok(Self::upsert(owner, offset)) + } + + pub(crate) const fn owner(self) -> PendingConsumerOffsetOwner { + match self.mutation { + PendingConsumerOffsetMutation::Upsert { owner, .. } + | PendingConsumerOffsetMutation::Delete { owner } => owner, + } + } } impl IggyPartition { @@ -67,6 +135,141 @@ impl IggyPartition { revision_id: 0, should_increment_offset: false, write_lock: Arc::new(TokioMutex::new(())), + consumer_offsets_path: None, + consumer_group_offsets_path: None, + pending_consumer_offset_commits: HashMap::new(), + } + } + + pub fn configure_consumer_offset_storage( + &mut self, + consumer_offsets_path: String, + consumer_group_offsets_path: String, + consumer_offsets: ConsumerOffsets, + consumer_group_offsets: ConsumerGroupOffsets, + ) { + self.consumer_offsets = Arc::new(consumer_offsets); + self.consumer_group_offsets = Arc::new(consumer_group_offsets); + self.consumer_offsets_path = Some(consumer_offsets_path); + self.consumer_group_offsets_path = Some(consumer_group_offsets_path); + } + + pub(crate) fn stage_consumer_offset_commit( + &mut self, + op: u64, + pending: PendingConsumerOffsetCommit, + ) { + self.pending_consumer_offset_commits.insert(op, pending); + } + + #[must_use] + pub(crate) fn take_staged_consumer_offset_commit( + &mut self, + op: u64, + ) -> Option<PendingConsumerOffsetCommit> { + self.pending_consumer_offset_commits.remove(&op) + } + + pub(crate) async fn persist_consumer_offset_commit( + &self, + pending: PendingConsumerOffsetCommit, + ) -> Result<(), IggyError> { + let Some(path) = self.persisted_offset_path(pending.owner()) else { + return Ok(()); + }; + match pending.mutation { + PendingConsumerOffsetMutation::Upsert { offset, .. } => { + persist_offset(&path, offset).await + } + PendingConsumerOffsetMutation::Delete { .. } => delete_persisted_offset(&path).await, + } + } + + pub(crate) fn apply_consumer_offset_commit(&self, pending: PendingConsumerOffsetCommit) { + match pending.mutation { + PendingConsumerOffsetMutation::Upsert { + owner: PendingConsumerOffsetOwner::Consumer(id), + offset, + } => { + let guard = self.consumer_offsets.pin(); + let key = usize::try_from(id).expect("u32 consumer id must fit usize"); + if let Some(existing) = guard.get(&key) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + let created = self.consumer_offsets_path.as_deref().map_or_else( + || ConsumerOffset::new(ConsumerKind::Consumer, id, 0, String::new()), + |path| ConsumerOffset::default_for_consumer(id, path), + ); + created.offset.store(offset, Ordering::Relaxed); + guard.insert(key, created); + } + } + PendingConsumerOffsetMutation::Upsert { + owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id), + offset, + } => { + let guard = self.consumer_group_offsets.pin(); + let key = ConsumerGroupId( + usize::try_from(group_id).expect("u32 group id must fit usize"), + ); + if let Some(existing) = guard.get(&key) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + let created = self.consumer_group_offsets_path.as_deref().map_or_else( + || { + ConsumerOffset::new( + ConsumerKind::ConsumerGroup, + group_id, + 0, + String::new(), + ) + }, + |path| ConsumerOffset::default_for_consumer_group(key, path), + ); + created.offset.store(offset, Ordering::Relaxed); + guard.insert(key, created); + } + } + PendingConsumerOffsetMutation::Delete { + owner: PendingConsumerOffsetOwner::Consumer(id), + } => { + let guard = self.consumer_offsets.pin(); + let key = usize::try_from(id).expect("u32 consumer id must fit usize"); + let _ = guard.remove(&key); + } + PendingConsumerOffsetMutation::Delete { + owner: PendingConsumerOffsetOwner::ConsumerGroup(group_id), + } => { + let guard = self.consumer_group_offsets.pin(); + let key = ConsumerGroupId( + usize::try_from(group_id).expect("u32 group id must fit usize"), + ); + let _ = guard.remove(&key); + } + } + } + + async fn store_consumer_offset_and_persist( + &self, + consumer: PollingConsumer, + offset: u64, + ) -> Result<(), IggyError> { + let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?; + self.persist_consumer_offset_commit(pending).await?; + self.apply_consumer_offset_commit(pending); + Ok(()) + } + + fn persisted_offset_path(&self, owner: PendingConsumerOffsetOwner) -> Option<String> { + match owner { + PendingConsumerOffsetOwner::Consumer(id) => self + .consumer_offsets_path + .as_ref() + .map(|path| format!("{path}/{id}")), + PendingConsumerOffsetOwner::ConsumerGroup(group_id) => self + .consumer_group_offsets_path + .as_ref() + .map(|path| format!("{path}/{group_id}")), } } } @@ -191,7 +394,10 @@ impl Partition for IggyPartition { if args.auto_commit && !fragments.is_empty() { let last_offset = last_matching_offset.expect("non-empty poll result must have a last offset"); - if let Err(err) = self.store_consumer_offset(consumer, last_offset) { + if let Err(err) = self + .store_consumer_offset_and_persist(consumer, last_offset) + .await + { // warning for now. warn!( target: "iggy.partitions.diag", @@ -212,41 +418,8 @@ impl Partition for IggyPartition { consumer: PollingConsumer, offset: u64, ) -> Result<(), IggyError> { - match consumer { - PollingConsumer::Consumer(id, _) => { - let guard = self.consumer_offsets.pin(); - if let Some(existing) = guard.get(&id) { - existing.offset.store(offset, Ordering::Relaxed); - } else { - guard.insert( - id, - ConsumerOffset::new( - ConsumerKind::Consumer, - id as u32, - offset, - String::new(), - ), - ); - } - } - PollingConsumer::ConsumerGroup(group_id, _) => { - let guard = self.consumer_group_offsets.pin(); - let key = ConsumerGroupId(group_id); - if let Some(existing) = guard.get(&key) { - existing.offset.store(offset, Ordering::Relaxed); - } else { - guard.insert( - key, - ConsumerOffset::new( - ConsumerKind::ConsumerGroup, - group_id as u32, - offset, - String::new(), - ), - ); - } - } - } + let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?; + self.apply_consumer_offset_commit(pending); Ok(()) } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 5d9d3d601..33a8c4226 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -19,10 +19,13 @@ use crate::IggyPartition; use crate::Partition; -use crate::PollingConsumer; use crate::iggy_index_writer::IggyIndexWriter; +use crate::iggy_partition::{PendingConsumerOffsetCommit, PendingConsumerOffsetOwner}; use crate::log::JournalInfo; use crate::messages_writer::MessagesWriter; +use crate::offset_storage::{ + create_offset_file_hierarchy, load_consumer_group_offsets, load_consumer_offsets, +}; use crate::segment::Segment; use crate::types::PartitionsConfig; use consensus::PlaneIdentity; @@ -38,7 +41,8 @@ use iggy_binary_protocol::{ RequestHeader, }; use iggy_common::{ - IggyByteSize, IggyError, PartitionStats, SegmentStorage, + ConsumerGroupOffsets, ConsumerOffset, ConsumerOffsets, IggyByteSize, IggyError, PartitionStats, + SegmentStorage, send_messages2::{convert_request_message, decode_prepare_slice}, sharding::{IggyNamespace, LocalIdx, ShardId}, }; @@ -316,6 +320,25 @@ impl<C> IggyPartitions<C> { // Create initial segment with storage let start_offset = 0; let segment = Segment::new(start_offset, self.config.segment_size); + create_offset_file_hierarchy(&self.config, namespace).await?; + + let consumer_offsets_path = self.config.get_consumer_offsets_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ); + let consumer_group_offsets_path = self.config.get_consumer_group_offsets_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ); + let consumer_offsets = + Self::load_partition_consumer_offsets(&consumer_offsets_path, start_offset, namespace)?; + let consumer_group_offsets = Self::load_partition_consumer_group_offsets( + &consumer_group_offsets_path, + start_offset, + namespace, + )?; // TODO: Waiting for issue to move server config to shared module. // Once complete, paths will come from proper base_path/streams_path/etc config fields. @@ -372,6 +395,12 @@ impl<C> IggyPartitions<C> { // Create partition with initialized log let stats = Arc::new(PartitionStats::default()); let mut partition = IggyPartition::new(stats); + partition.configure_consumer_offset_storage( + consumer_offsets_path, + consumer_group_offsets_path, + consumer_offsets, + consumer_group_offsets, + ); partition.log.add_persisted_segment( segment, storage, @@ -387,6 +416,62 @@ impl<C> IggyPartitions<C> { Ok(self.insert(namespace, partition)) } + + fn load_partition_consumer_offsets( + path: &str, + current_offset: u64, + namespace: IggyNamespace, + ) -> Result<ConsumerOffsets, IggyError> { + let offsets = load_consumer_offsets(path)? + .into_iter() + .map(|offset| { + let offset = Self::clamp_loaded_offset(namespace, current_offset, offset); + ( + usize::try_from(offset.consumer_id).expect("u32 consumer id must fit usize"), + offset, + ) + }) + .collect::<Vec<_>>(); + Ok(ConsumerOffsets::from(offsets)) + } + + fn load_partition_consumer_group_offsets( + path: &str, + current_offset: u64, + namespace: IggyNamespace, + ) -> Result<ConsumerGroupOffsets, IggyError> { + let offsets = load_consumer_group_offsets(path)? + .into_iter() + .map(|(group_id, offset)| { + let offset = Self::clamp_loaded_offset(namespace, current_offset, offset); + (group_id, offset) + }) + .collect::<Vec<_>>(); + Ok(ConsumerGroupOffsets::from(offsets)) + } + + fn clamp_loaded_offset( + namespace: IggyNamespace, + current_offset: u64, + offset: ConsumerOffset, + ) -> ConsumerOffset { + let persisted_offset = offset.offset.load(Ordering::Relaxed); + if persisted_offset <= current_offset { + return offset; + } + + warn!( + target: "iggy.partitions.diag", + namespace_raw = namespace.inner(), + consumer_id = offset.consumer_id, + persisted_offset, + current_offset, + path = %offset.path, + "clamping recovered consumer offset to current partition offset" + ); + offset.offset.store(current_offset, Ordering::Relaxed); + offset + } } impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, NamespacedPipeline>> @@ -642,49 +727,23 @@ where ); Ok(()) } - Operation::StoreConsumerOffset => { - let total_size = header.size() as usize; - let body = &message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size]; - let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; - let consumer_id = - body.get(1..5) - .ok_or(IggyError::InvalidCommand) - .and_then(|bytes| { - <[u8; 4]>::try_from(bytes) - .map(u32::from_le_bytes) - .map(|value| value as usize) - .map_err(|_| IggyError::InvalidCommand) - })?; - let offset = - body.get(5..13) - .ok_or(IggyError::InvalidCommand) - .and_then(|bytes| { - <[u8; 8]>::try_from(bytes) - .map(u64::from_le_bytes) - .map_err(|_| IggyError::InvalidCommand) - })?; - let consumer = match consumer_kind { - 1 => PollingConsumer::Consumer(consumer_id, 0), - 2 => PollingConsumer::ConsumerGroup(consumer_id, 0), - _ => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica = consensus.replica(), - op = header.op, - namespace_raw = namespace.inner(), - operation = ?header.operation, - consumer_kind, - "unknown consumer kind while applying replicated offset update" - ); - return Err(IggyError::InvalidCommand); - } - }; - + Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + let pending = + Self::parse_staged_consumer_offset_commit(header.operation, &message)?; + let write_lock = self + .get_by_ns(namespace) + .expect("store_consumer_offset: partition not found for namespace") + .write_lock + .clone(); + let _guard = write_lock.lock().await; let partition = self .get_by_ns(namespace) .expect("store_consumer_offset: partition not found for namespace"); - let _ = partition.store_consumer_offset(consumer, offset); + partition.persist_consumer_offset_commit(pending).await?; + let partition = self + .get_mut_by_ns(namespace) + .expect("store_consumer_offset: partition not found for namespace"); + partition.stage_consumer_offset_commit(header.op, pending); debug!( target: "iggy.partitions.diag", @@ -693,10 +752,9 @@ where op = header.op, namespace_raw = namespace.inner(), operation = ?header.operation, - consumer_kind, - consumer_id, - offset, - "replicated consumer offset stored" + consumer = ?pending.owner(), + pending = ?pending.mutation, + "replicated consumer offset persisted and staged" ); Ok(()) } @@ -1055,17 +1113,14 @@ where !failed_ns.contains(&entry_namespace) } - Operation::StoreConsumerOffset => { - // TODO: Commit consumer offset update. - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - op = prepare_header.op, - namespace_raw = entry_namespace.inner(), - "consumer offset committed" - ); - true + Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + self.commit_consumer_offset_entry( + consensus, + prepare_header, + entry_namespace, + failed_ns, + ) + .await } _ => { warn!( @@ -1105,6 +1160,93 @@ where )) } + fn parse_staged_consumer_offset_commit( + operation: Operation, + message: &Message<PrepareHeader>, + ) -> Result<PendingConsumerOffsetCommit, IggyError> { + let total_size = message.header().size() as usize; + let body = &message.as_slice()[std::mem::size_of::<PrepareHeader>()..total_size]; + let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; + let consumer_id = body + .get(1..5) + .ok_or(IggyError::InvalidCommand) + .and_then(|bytes| { + <[u8; 4]>::try_from(bytes) + .map(u32::from_le_bytes) + .map_err(|_| IggyError::InvalidCommand) + })?; + let owner = match consumer_kind { + 1 => PendingConsumerOffsetOwner::Consumer(consumer_id), + 2 => PendingConsumerOffsetOwner::ConsumerGroup(consumer_id), + _ => return Err(IggyError::InvalidCommand), + }; + match operation { + Operation::StoreConsumerOffset => { + let offset = + body.get(5..13) + .ok_or(IggyError::InvalidCommand) + .and_then(|bytes| { + <[u8; 8]>::try_from(bytes) + .map(u64::from_le_bytes) + .map_err(|_| IggyError::InvalidCommand) + })?; + Ok(PendingConsumerOffsetCommit::upsert(owner, offset)) + } + Operation::DeleteConsumerOffset => Ok(PendingConsumerOffsetCommit::delete(owner)), + _ => Err(IggyError::InvalidCommand), + } + } + + async fn commit_consumer_offset_entry( + &self, + consensus: &VsrConsensus<B, NamespacedPipeline>, + prepare_header: PrepareHeader, + entry_namespace: IggyNamespace, + failed_ns: &mut HashSet<IggyNamespace>, + ) -> bool { + let write_lock = self + .get_by_ns(&entry_namespace) + .expect("commit_partition_entry: partition not found") + .write_lock + .clone(); + let _guard = write_lock.lock().await; + + let pending = { + let partition = self + .get_mut_by_ns(&entry_namespace) + .expect("commit_partition_entry: partition not found"); + partition.take_staged_consumer_offset_commit(prepare_header.op) + }; + let Some(pending) = pending else { + failed_ns.insert(entry_namespace); + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + op = prepare_header.op, + namespace_raw = entry_namespace.inner(), + "missing staged consumer offset commit for committed prepare" + ); + return false; + }; + + let partition = self + .get_by_ns(&entry_namespace) + .expect("commit_partition_entry: partition not found"); + partition.apply_consumer_offset_commit(pending); + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = consensus.replica(), + op = prepare_header.op, + namespace_raw = entry_namespace.inner(), + consumer = ?pending.owner(), + mutation = ?pending.mutation, + "consumer offset committed" + ); + true + } + /// Persist frozen batches to disk and update segment bookkeeping. async fn persist_frozen_batches_to_disk( &self, diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index fd5bb0234..be2b18647 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -24,6 +24,7 @@ mod iggy_partitions; mod journal; mod log; mod messages_writer; +mod offset_storage; mod segment; mod types; diff --git a/core/partitions/src/offset_storage.rs b/core/partitions/src/offset_storage.rs new file mode 100644 index 000000000..9f62b7d08 --- /dev/null +++ b/core/partitions/src/offset_storage.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionsConfig; +use compio::{ + fs::{OpenOptions, create_dir_all, remove_file}, + io::AsyncWriteAtExt, +}; +use iggy_common::{ + ConsumerGroupId, ConsumerKind, ConsumerOffset, IggyError, sharding::IggyNamespace, +}; +use std::{io::Read, path::Path}; + +pub async fn create_offset_file_hierarchy( + config: &PartitionsConfig, + namespace: IggyNamespace, +) -> Result<(), IggyError> { + let stream_id = namespace.stream_id(); + let topic_id = namespace.topic_id(); + let partition_id = namespace.partition_id(); + let partition_path = config.get_partition_path(stream_id, topic_id, partition_id); + + if !Path::new(&partition_path).exists() { + create_dir_all(&partition_path).await.map_err(|_| { + IggyError::CannotCreatePartitionDirectory(partition_id, stream_id, topic_id) + })?; + } + + let consumer_offsets_path = config.get_consumer_offsets_path(stream_id, topic_id, partition_id); + if !Path::new(&consumer_offsets_path).exists() { + create_dir_all(&consumer_offsets_path).await.map_err(|_| { + IggyError::CannotCreateConsumerOffsetsDirectory(consumer_offsets_path.clone()) + })?; + } + + let consumer_group_offsets_path = + config.get_consumer_group_offsets_path(stream_id, topic_id, partition_id); + if !Path::new(&consumer_group_offsets_path).exists() { + create_dir_all(&consumer_group_offsets_path) + .await + .map_err(|_| { + IggyError::CannotCreateConsumerOffsetsDirectory(consumer_group_offsets_path.clone()) + })?; + } + + Ok(()) +} + +pub async fn persist_offset(path: &str, offset: u64) -> Result<(), IggyError> { + if let Some(parent) = Path::new(path).parent() + && !parent.exists() + { + create_dir_all(parent).await.map_err(|_| { + IggyError::CannotCreateConsumerOffsetsDirectory(parent.display().to_string()) + })?; + } + + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(path) + .await + .map_err(|_| IggyError::CannotOpenConsumerOffsetsFile(path.to_owned()))?; + let buf = offset.to_le_bytes(); + file.write_all_at(buf, 0) + .await + .0 + .map_err(|_| IggyError::CannotWriteToFile)?; + Ok(()) +} + +pub async fn delete_persisted_offset(path: &str) -> Result<(), IggyError> { + if !Path::new(path).exists() { + return Ok(()); + } + + remove_file(path) + .await + .map_err(|_| IggyError::CannotDeleteConsumerOffsetFile(path.to_owned())) +} + +pub fn load_consumer_offsets(path: &str) -> Result<Vec<ConsumerOffset>, IggyError> { + if !Path::new(path).exists() { + return Ok(Vec::new()); + } + + let dir_entries = std::fs::read_dir(path) + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let mut consumer_offsets = Vec::new(); + + for dir_entry in dir_entries { + let dir_entry = + dir_entry.map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let metadata = dir_entry + .metadata() + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + if metadata.is_dir() { + continue; + } + + let file_name = dir_entry + .file_name() + .into_string() + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let consumer_id = file_name + .parse::<u32>() + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let offset_path = dir_entry + .path() + .to_str() + .ok_or_else(|| IggyError::CannotReadConsumerOffsets(path.to_owned()))? + .to_owned(); + let offset = read_offset(&offset_path)?; + + consumer_offsets.push(ConsumerOffset::new( + ConsumerKind::Consumer, + consumer_id, + offset, + offset_path, + )); + } + + consumer_offsets.sort_by_key(|offset| offset.consumer_id); + Ok(consumer_offsets) +} + +pub fn load_consumer_group_offsets( + path: &str, +) -> Result<Vec<(ConsumerGroupId, ConsumerOffset)>, IggyError> { + if !Path::new(path).exists() { + return Ok(Vec::new()); + } + + let dir_entries = std::fs::read_dir(path) + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let mut consumer_group_offsets = Vec::new(); + + for dir_entry in dir_entries { + let dir_entry = + dir_entry.map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let metadata = dir_entry + .metadata() + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + if metadata.is_dir() { + continue; + } + + let file_name = dir_entry + .file_name() + .into_string() + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let consumer_group_id = file_name + .parse::<u32>() + .map_err(|_| IggyError::CannotReadConsumerOffsets(path.to_owned()))?; + let offset_path = dir_entry + .path() + .to_str() + .ok_or_else(|| IggyError::CannotReadConsumerOffsets(path.to_owned()))? + .to_owned(); + let offset = read_offset(&offset_path)?; + + consumer_group_offsets.push(( + ConsumerGroupId( + usize::try_from(consumer_group_id).expect("u32 group id must fit usize"), + ), + ConsumerOffset::new( + ConsumerKind::ConsumerGroup, + consumer_group_id, + offset, + offset_path, + ), + )); + } + + consumer_group_offsets.sort_by_key(|(group_id, _)| group_id.0); + Ok(consumer_group_offsets) +} + +fn read_offset(path: &str) -> Result<u64, IggyError> { + let file = std::fs::File::open(path).map_err(|_| IggyError::CannotReadFile)?; + let mut cursor = std::io::Cursor::new(file); + let mut offset = [0; 8]; + cursor + .get_mut() + .read_exact(&mut offset) + .map_err(|_| IggyError::CannotReadFile)?; + Ok(u64::from_le_bytes(offset)) +} diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs index dbaf57a59..178922755 100644 --- a/core/partitions/src/types.rs +++ b/core/partitions/src/types.rs @@ -219,6 +219,16 @@ pub struct PartitionsConfig { } impl PartitionsConfig { + #[must_use] + pub fn get_partition_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!("/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}",) + } + /// Constructs the file path for segment messages. /// /// TODO: This is a stub waiting for completion of issue to move server config @@ -233,7 +243,8 @@ impl PartitionsConfig { start_offset: u64, ) -> String { format!( - "/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.log", + "{}/{start_offset:0>20}.log", + self.get_partition_path(stream_id, topic_id, partition_id) ) } @@ -251,7 +262,47 @@ impl PartitionsConfig { start_offset: u64, ) -> String { format!( - "/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.index", + "{}/{start_offset:0>20}.index", + self.get_partition_path(stream_id, topic_id, partition_id) + ) + } + + #[must_use] + pub fn get_offsets_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!( + "{}/offsets", + self.get_partition_path(stream_id, topic_id, partition_id) + ) + } + + #[must_use] + pub fn get_consumer_offsets_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!( + "{}/consumers", + self.get_offsets_path(stream_id, topic_id, partition_id) + ) + } + + #[must_use] + pub fn get_consumer_group_offsets_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!( + "{}/groups", + self.get_offsets_path(stream_id, topic_id, partition_id) ) } } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 2d64d0f40..fce752b62 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -114,6 +114,19 @@ impl SimClient { self.build_request_with_namespace(Operation::StoreConsumerOffset, &payload, namespace) } + pub fn delete_consumer_offset( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + ) -> Message<RequestHeader> { + let mut payload = Vec::with_capacity(5); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + + self.build_request_with_namespace(Operation::DeleteConsumerOffset, &payload, namespace) + } + #[allow(clippy::cast_possible_truncation)] fn build_request_with_namespace( &self,
