hubcio commented on code in PR #3071:
URL: https://github.com/apache/iggy/pull/3071#discussion_r3044653964


##########
core/partitions/src/offset_storage.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use compio::{
+    fs::{OpenOptions, create_dir_all, remove_file},
+    io::AsyncWriteAtExt,
+};
+use iggy_common::IggyError;
+use std::path::Path;
+
+pub async fn persist_offset(path: &str, offset: u64) -> Result<(), IggyError> {
+    if let Some(parent) = Path::new(path).parent()
+        && !parent.exists()
+    {
+        create_dir_all(parent).await.map_err(|_| {
+            
IggyError::CannotCreateConsumerOffsetsDirectory(parent.display().to_string())
+        })?;
+    }
+
+    let mut file = OpenOptions::new()
+        .write(true)
+        .create(true)
+        .open(path)
+        .await
+        .map_err(|_| 
IggyError::CannotOpenConsumerOffsetsFile(path.to_owned()))?;
+    let buf = offset.to_le_bytes();
+    file.write_all_at(buf, 0)
+        .await
+        .0
+        .map_err(|_| IggyError::CannotWriteToFile)?;

Review Comment:
   `persist_offset` writes via `write_all_at` without `sync_data()` or 
`sync_all()`. the rest of the codebase (`messages_writer.rs:90-91`, 
`iggy_index_writer.rs:92-93`) conditionally calls `self.fsync().await` when 
`enforce_fsync` is true. `send_prepare_ok` is called with `Some(true)` at 
`iggy_partition.rs:1322`, which tells the primary this replica has durably 
persisted - but the data may only be in the page cache. this is a false 
durability claim to the consensus layer.
   
   also: `OpenOptions` is missing `.truncate(true)` - safe today (always 8 
bytes at offset 0), but fragile if the format changes.
   
   fix: accept `enforce_fsync` from `PartitionsConfig` and call 
`file.sync_data().await` when enabled.



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -67,11 +145,193 @@ impl IggyPartition {
             revision_id: 0,
             should_increment_offset: false,
             write_lock: Arc::new(TokioMutex::new(())),
+            consumer_offsets_path: None,
+            consumer_group_offsets_path: None,
+            pending_consumer_offset_commits: HashMap::new(),
+        }
+    }
+
+    #[must_use]
+    pub const fn consensus(&self) -> &VsrConsensus<B> {
+        &self.consensus
+    }
+
+    #[must_use]
+    pub fn with_in_memory_storage(
+        stats: Arc<PartitionStats>,
+        consensus: VsrConsensus<B>,
+        segment_size: IggyByteSize,
+    ) -> Self {
+        let mut partition = Self::new(stats, consensus);
+        let start_offset = 0;
+        let segment = Segment::new(start_offset, segment_size);
+        let storage = SegmentStorage::default();
+        partition
+            .log
+            .add_persisted_segment(segment, storage, None, None);
+        partition.offset.store(start_offset, Ordering::Release);
+        partition
+            .dirty_offset
+            .store(start_offset, Ordering::Relaxed);
+        partition.should_increment_offset = false;
+        partition.stats.increment_segments_count(1);
+        partition
+    }
+
+    pub fn configure_consumer_offset_storage(
+        &mut self,
+        consumer_offsets_path: String,
+        consumer_group_offsets_path: String,
+        consumer_offsets: ConsumerOffsets,
+        consumer_group_offsets: ConsumerGroupOffsets,
+    ) {
+        self.consumer_offsets = Arc::new(consumer_offsets);
+        self.consumer_group_offsets = Arc::new(consumer_group_offsets);
+        self.consumer_offsets_path = Some(consumer_offsets_path);
+        self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
+    }
+
+    pub(crate) async fn persist_and_stage_consumer_offset_upsert(
+        &mut self,
+        op: u64,
+        kind: ConsumerKind,
+        consumer_id: u32,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, 
offset);
+        self.persist_consumer_offset_commit(pending).await?;

Review Comment:
   consumer offsets are persisted to disk here during the prepare phase (before 
quorum), unlike `SendMessages` which only writes to the in-memory journal 
during prepare and persists to disk during commit (`commit_messages`).
   
   if the prepare never commits (leader crash, view change), disk has 
uncommitted values with no rollback mechanism. on restart, the uncommitted 
offset file is loaded as if it were committed.
   
   recommended fix: move the `persist_consumer_offset_commit` call from 
`persist_and_stage_*` (prepare phase) to `commit_consumer_offset_entry` (commit 
phase). during prepare, only stage in-memory. this eliminates the rollback 
problem entirely and aligns with the SendMessages pattern.



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -52,11 +75,66 @@ pub struct IggyPartition {
     pub revision_id: u64,
     pub should_increment_offset: bool,
     pub write_lock: Arc<TokioMutex<()>>,
+    consumer_offsets_path: Option<String>,
+    consumer_group_offsets_path: Option<String>,
+    pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>,

Review Comment:
   `pending_consumer_offset_commits` has two compounding problems:
   
   1. **unbounded growth on followers**: entries are inserted during 
`on_replicate` (lines 203, 215) but only removed via `on_ack` -> 
`handle_committed_entries` -> `apply_staged_consumer_offset_commit` (line 222). 
followers never reach that path because `ack_preflight` 
(`plane_helpers.rs:161`) returns `Err(NotPrimary)`. the HashMap grows 
monotonically on every follower - ~88 bytes/entry, 1M consumer offset ops = 
~88MB leaked.
   
   2. **orphaned on view change**: `reset_view_change_state` clears the 
pipeline but has no hook into `IggyPartition`. after view change, orphaned 
entries with reused op numbers could shadow new ones.
   
   this also means followers never apply consumer offset commits to their 
in-memory state (`consumer_offsets` / `consumer_group_offsets`), so follower 
reads are stale/missing. the metadata plane has `commit_journal()` called on 
followers (`metadata.rs:417-419`) but the partition plane has no equivalent 
follower-side commit path.
   
   fix: (a) followers should apply consumer offset commits directly during 
`on_replicate` (they don't need staging since they don't send client replies), 
(b) add a view-change cleanup hook that clears this HashMap.



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -191,7 +451,10 @@ impl Partition for IggyPartition {
         if args.auto_commit && !fragments.is_empty() {
             let last_offset =
                 last_matching_offset.expect("non-empty poll result must have a 
last offset");
-            if let Err(err) = self.store_consumer_offset(consumer, 
last_offset) {
+            if let Err(err) = self

Review Comment:
   `poll_messages` with `auto_commit=true` calls 
`store_consumer_offset_and_persist` which persists and applies directly without 
going through consensus replication. this creates three divergent write paths 
for consumer offsets:
   
   1. replicated via `StoreConsumerOffset` operation (new in this PR) - full 
prepare/commit cycle
   2. local-only persist + apply via auto-commit here - no replication
   3. in-memory-only via `Partition::store_consumer_offset` trait method (line 
473) - no persistence, no replication
   
   on failover, the new leader has no record of auto-committed offsets since 
they were never replicated. this is a pre-existing pattern, but now 
inconsistent since explicit `StoreConsumerOffset` IS replicated. worth 
documenting whether this is intentional (local optimization for consumer 
progress) or should be migrated to the consensus path.



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -272,3 +502,823 @@ impl Partition for IggyPartition {
         )
     }
 }
+
+impl<B> IggyPartition<B>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+{
+    #[must_use]
+    fn namespace(&self) -> IggyNamespace {
+        IggyNamespace::from_raw(self.consensus.namespace())
+    }
+
+    /// Handles a client request for this partition and turns it into a 
prepare.
+    ///
+    /// # Panics
+    /// Panics if called when this partition's consensus instance is not the
+    /// primary, is not in normal status, or is currently syncing.
+    #[allow(clippy::future_not_send)]
+    pub async fn on_request(&mut self, message: Message<RequestHeader>) {
+        let namespace = IggyNamespace::from_raw(message.header().namespace);
+        let prepare = {
+            let consensus = self.consensus();
+            emit_sim_event(
+                SimEventKind::ClientRequestReceived,
+                &RequestLogEvent {
+                    replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                    client_id: message.header().client,
+                    request_id: message.header().request,
+                    operation: message.header().operation,
+                },
+            );
+
+            let message = if message.header().operation == 
Operation::SendMessages {
+                match convert_request_message(namespace, message) {
+                    Ok(message) => message,
+                    Err(error) => {
+                        warn!(
+                            target: "iggy.partitions.diag",
+                            plane = "partitions",
+                            replica_id = consensus.replica(),
+                            namespace_raw = namespace.inner(),
+                            operation = ?Operation::SendMessages,
+                            error = %error,
+                            "failed to convert send_messages request"
+                        );
+                        return;
+                    }
+                }
+            } else {
+                message
+            };
+
+            assert!(!consensus.is_follower(), "on_request: primary only");
+            assert!(consensus.is_normal(), "on_request: status must be 
normal");
+            assert!(!consensus.is_syncing(), "on_request: must not be 
syncing");
+
+            let prepare = message.project(consensus);
+            consensus.verify_pipeline();
+            consensus.pipeline_message(PlaneKind::Partitions, &prepare);
+            prepare
+        };
+        self.on_replicate(prepare).await;
+    }
+
+    #[allow(clippy::future_not_send)]
+    pub async fn on_replicate(&mut self, message: Message<PrepareHeader>) {
+        let header = *message.header();
+        let current_op = {
+            let consensus = self.consensus();
+            let current_op = match replicate_preflight(consensus, &header) {
+                Ok(current_op) => current_op,
+                Err(reason) => {
+                    warn!(
+                        target: "iggy.partitions.diag",
+                        plane = "partitions",
+                        replica_id = consensus.replica(),
+                        view = consensus.view(),
+                        op = header.op,
+                        namespace_raw = header.namespace,
+                        operation = ?header.operation,
+                        reason = reason.as_str(),
+                        "ignoring prepare during replicate preflight"
+                    );
+                    return;
+                }
+            };
+
+            if fence_old_prepare_by_commit(consensus, &header) {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    view = consensus.view(),
+                    op = header.op,
+                    commit = consensus.commit(),
+                    namespace_raw = header.namespace,
+                    operation = ?header.operation,
+                    "received old prepare, skipping replication"
+                );
+                return;
+            }
+
+            current_op
+        };
+
+        let message = {
+            let consensus = self.consensus();
+            replicate_to_next_in_chain(consensus, message).await
+        };
+        if let Err(error) = self.apply_replicated_operation(message).await {
+            let replica_id = self.consensus.replica();
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id,
+                op = header.op,
+                namespace_raw = self.namespace().inner(),
+                operation = ?header.operation,
+                %error,
+                "failed to apply replicated partition operation"
+            );
+            return;
+        }
+
+        debug_assert_eq!(header.op, current_op + 1);
+        {
+            let consensus = self.consensus();
+            consensus.sequencer().set_sequence(header.op);
+            consensus.set_last_prepare_checksum(header.checksum);
+            emit_namespace_progress_event(
+                SimEventKind::NamespaceProgressUpdated,
+                &ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                header.op,
+                consensus.pipeline().borrow().len(),
+            );
+        }
+
+        self.send_prepare_ok(&header).await;
+    }
+
+    #[allow(clippy::future_not_send)]
+    pub async fn on_ack(&mut self, message: Message<PrepareOkHeader>, config: 
&PartitionsConfig) {
+        let header = *message.header();
+        {
+            let consensus = self.consensus();
+            if let Err(reason) = ack_preflight(consensus) {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    view = consensus.view(),
+                    op = header.op,
+                    reason = reason.as_str(),
+                    "ignoring ack during preflight"
+                );
+                return;
+            }
+
+            let pipeline = consensus.pipeline().borrow();
+            if pipeline
+                .entry_by_op_and_checksum(header.op, header.prepare_checksum)
+                .is_none()
+            {
+                debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    op = header.op,
+                    prepare_checksum = header.prepare_checksum,
+                    "ack target prepare not in pipeline"
+                );
+                return;
+            }
+        }
+
+        if !ack_quorum_reached(self.consensus(), PlaneKind::Partitions, 
&header) {
+            return;
+        }
+
+        let drained = drain_committable_prefix(self.consensus());
+        if drained.is_empty() {
+            return;
+        }
+
+        self.handle_committed_entries(drained, config).await;
+        {
+            let consensus = self.consensus();
+            emit_namespace_progress_event(
+                SimEventKind::NamespaceProgressUpdated,
+                &ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                consensus.commit(),
+                consensus.pipeline().borrow().len(),
+            );
+        }
+    }
+
+    async fn apply_replicated_operation(
+        &mut self,
+        message: Message<PrepareHeader>,
+    ) -> Result<(), IggyError> {
+        let header = *message.header();
+        let replica_id = self.consensus.replica();
+        let namespace_raw = self.consensus.namespace();
+
+        match header.operation {
+            Operation::SendMessages => {
+                self.append_send_messages_to_journal(message).await?;
+                debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica = replica_id,
+                    op = header.op,
+                    namespace_raw,
+                    operation = ?header.operation,
+                    "replicated send_messages appended to partition journal"
+                );
+                Ok(())
+            }
+            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
+                let (kind, consumer_id, offset) =
+                    
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
+                let write_lock = self.write_lock.clone();
+                let _guard = write_lock.lock().await;
+                match header.operation {
+                    Operation::StoreConsumerOffset => {
+                        self.persist_and_stage_consumer_offset_upsert(
+                            header.op,
+                            kind,
+                            consumer_id,
+                            offset.expect("store_consumer_offset must include 
offset"),
+                        )
+                        .await?;
+                    }
+                    Operation::DeleteConsumerOffset => {
+                        
self.persist_and_stage_consumer_offset_delete(header.op, kind, consumer_id)
+                            .await?;
+                    }
+                    _ => unreachable!(),
+                }
+
+                debug!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica = replica_id,
+                    op = header.op,
+                    namespace_raw,
+                    operation = ?header.operation,
+                    consumer_kind = ?kind,
+                    consumer_id,
+                    offset = ?offset,
+                    "replicated consumer offset persisted and staged"
+                );
+                Ok(())
+            }
+            _ => {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica = replica_id,
+                    namespace_raw,
+                    op = header.op,
+                    operation = ?header.operation,
+                    "unexpected replicated partition operation"
+                );
+                Ok(())
+            }
+        }
+    }
+
+    async fn append_send_messages_to_journal(
+        &mut self,
+        message: Message<PrepareHeader>,
+    ) -> Result<(), IggyError> {
+        let write_lock = self.write_lock.clone();
+        let _guard = write_lock.lock().await;
+        self.append_messages(message).await.map(|_| ())
+    }
+
+    #[allow(clippy::too_many_lines)]
+    async fn commit_messages(&mut self, config: &PartitionsConfig) -> 
Result<(), IggyError> {
+        let write_lock = self.write_lock.clone();
+        let _guard = write_lock.lock().await;
+
+        let journal_info = self.log.journal().info;
+        if journal_info.messages_count == 0 {
+            return Ok(());
+        }
+
+        let is_full = self.log.active_segment().is_full();
+        let unsaved_messages_count_exceeded =
+            journal_info.messages_count >= config.messages_required_to_save;
+        let unsaved_messages_size_exceeded = journal_info.size.as_bytes_u64()
+            >= config.size_of_messages_required_to_save.as_bytes_u64();
+        let should_persist =
+            is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded;
+        if !should_persist {
+            return Ok(());
+        }
+
+        let (frozen_batches, index_bytes, batch_count) = {
+            let entries = self.log.journal().inner.entries();
+            let segment = self.log.active_segment();
+            let mut file_position = segment.size.as_bytes_u64();
+            self.log.ensure_indexes();
+            let indexes = self.log.active_indexes_mut().expect("indexes must 
exist");
+            let mut flush_index = None;
+            let mut frozen = Vec::with_capacity(entries.len());
+            let mut batch_count = 0u32;
+
+            for entry in entries {
+                let Ok(batch) = decode_prepare_slice(entry.as_slice()) else {
+                    continue;
+                };
+                let message_count = batch.message_count();
+                if message_count == 0 {
+                    continue;
+                }
+
+                let index = crate::iggy_index::IggyIndex::new(
+                    batch.header.base_offset,
+                    batch.header.base_timestamp,
+                    file_position,
+                );
+                if flush_index.is_none() {
+                    indexes.insert(index.offset, index.timestamp, 
index.position);
+                    flush_index = Some(index);
+                }
+                file_position += batch.header.total_size() as u64;
+                batch_count += message_count;
+                frozen.push(entry);
+            }
+
+            let index_bytes =
+                flush_index.map(|index| 
crate::iggy_index::IggyIndexCache::serialize(&index));
+
+            (frozen, index_bytes, batch_count)
+        };
+
+        let Some(index_bytes) = index_bytes else {
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                namespace_raw = self.namespace().inner(),
+                "failed to build sparse index entry from pending journal 
batches"
+            );
+            return Err(IggyError::InvalidCommand);
+        };
+
+        self.persist_frozen_batches_to_disk(frozen_batches, index_bytes, 
batch_count)
+            .await?;
+
+        if is_full {
+            self.rotate_segment(config).await?;
+        }
+
+        let _ = self.log.journal_mut().inner.commit();
+        self.log.journal_mut().info = JournalInfo::default();
+
+        let segment_index = self.log.segments().len() - 1;
+        let segment = &mut self.log.segments_mut()[segment_index];
+        if segment.end_offset == 0 && journal_info.first_timestamp != 0 {
+            segment.start_timestamp = journal_info.first_timestamp;
+        }
+        segment.end_timestamp = journal_info.end_timestamp;
+        segment.max_timestamp = 
segment.max_timestamp.max(journal_info.max_timestamp);
+        segment.end_offset = journal_info.current_offset;
+
+        self.stats
+            .increment_size_bytes(journal_info.size.as_bytes_u64());
+        self.stats
+            .increment_messages_count(u64::from(journal_info.messages_count));
+
+        let durable_offset = journal_info.current_offset;
+        self.offset.store(durable_offset, Ordering::Release);
+        self.stats.set_current_offset(durable_offset);
+        Ok(())
+    }
+
+    async fn handle_committed_entries(
+        &mut self,
+        drained: Vec<PipelineEntry>,
+        config: &PartitionsConfig,
+    ) {
+        let replica_id = self.consensus.replica();
+        let namespace_raw = self.consensus.namespace();
+        if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
+            debug!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id,
+                first_op = first.header.op,
+                last_op = last.header.op,
+                drained_count = drained.len(),
+                "draining committed partition ops"
+            );
+        }
+
+        let mut failed_commit = false;
+        let committed_visible_offsets = 
self.resolve_committed_visible_offsets(&drained).await;
+        let mut messages_committed = false;
+
+        for PipelineEntry {
+            header: prepare_header,
+            ..
+        } in drained
+        {
+            if !self
+                .commit_partition_entry(
+                    prepare_header,
+                    &mut messages_committed,
+                    &committed_visible_offsets,
+                    &mut failed_commit,
+                    config,
+                )
+                .await
+            {
+                continue;
+            }
+
+            let pipeline_depth = self.consensus.pipeline().borrow().len();
+            let event = CommitLogEvent {
+                replica: ReplicaLogContext::from_consensus(&self.consensus, 
PlaneKind::Partitions),
+                op: prepare_header.op,
+                client_id: prepare_header.client,
+                request_id: prepare_header.request,
+                operation: prepare_header.operation,
+                pipeline_depth,
+            };
+            emit_sim_event(SimEventKind::OperationCommitted, &event);
+            emit_namespace_progress_event(
+                SimEventKind::NamespaceProgressUpdated,
+                &event.replica,
+                prepare_header.op,
+                pipeline_depth,
+            );
+
+            let reply_buffers =
+                build_reply_message(&self.consensus, &prepare_header, 
bytes::Bytes::new())
+                    .into_generic();
+            emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
+
+            if let Err(error) = self
+                .consensus
+                .message_bus()
+                .send_to_client(prepare_header.client, reply_buffers)
+                .await
+            {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    client = prepare_header.client,
+                    op = prepare_header.op,
+                    namespace_raw,
+                    %error,
+                    "failed to send reply to client"
+                );
+            }
+        }
+
+        if failed_commit {
+            warn!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                replica_id,
+                namespace_raw,
+                "partition failed local commit handling for one or more ops"
+            );
+        }
+    }
+
+    async fn resolve_committed_visible_offsets(
+        &self,
+        drained: &[PipelineEntry],
+    ) -> HashMap<u64, u64> {
+        let mut committed_visible_offsets = HashMap::new();
+
+        for entry in drained {
+            if entry.header.operation != Operation::SendMessages {
+                continue;
+            }
+
+            match self.committed_end_offset_for_prepare(&entry.header).await {
+                Ok(Some(end_offset)) => {
+                    committed_visible_offsets.insert(entry.header.op, 
end_offset);
+                }
+                Ok(None) => {}
+                Err(error) => {
+                    warn!(
+                        target: "iggy.partitions.diag",
+                        plane = "partitions",
+                        replica_id = self.consensus.replica(),
+                        namespace_raw = self.namespace().inner(),
+                        op = entry.header.op,
+                        operation = ?entry.header.operation,
+                        %error,
+                        "failed to resolve committed visible offset for 
partition entry"
+                    );
+                }
+            }
+        }
+
+        committed_visible_offsets
+    }
+
+    async fn commit_partition_entry(
+        &mut self,
+        prepare_header: PrepareHeader,
+        messages_committed: &mut bool,
+        committed_visible_offsets: &HashMap<u64, u64>,
+        failed_commit: &mut bool,
+        config: &PartitionsConfig,
+    ) -> bool {
+        match prepare_header.operation {
+            Operation::SendMessages => {
+                if !*messages_committed {
+                    if let Err(error) = self.commit_messages(config).await {
+                        *failed_commit = true;
+                        warn!(
+                            target: "iggy.partitions.diag",
+                            plane = "partitions",
+                            replica_id = self.consensus.replica(),
+                            namespace_raw = self.namespace().inner(),
+                            op = prepare_header.op,
+                            operation = ?prepare_header.operation,
+                            %error,
+                            "failed to commit partition messages"
+                        );
+                        return false;
+                    }
+                    *messages_committed = true;
+                }
+
+                if let Some(visible_offset) = 
committed_visible_offsets.get(&prepare_header.op) {
+                    self.offset.store(*visible_offset, Ordering::Release);
+                    self.stats.set_current_offset(*visible_offset);
+                }
+                !*failed_commit
+            }
+            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
+                self.commit_consumer_offset_entry(prepare_header, 
failed_commit)
+                    .await
+            }
+            _ => {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = self.consensus.replica(),
+                    op = prepare_header.op,
+                    namespace_raw = self.namespace().inner(),
+                    operation = ?prepare_header.operation,
+                    "unexpected committed partition operation"
+                );
+                true
+            }
+        }
+    }
+
+    async fn committed_end_offset_for_prepare(
+        &self,
+        prepare_header: &PrepareHeader,
+    ) -> Result<Option<u64>, IggyError> {
+        let Some(entry) = self.log.journal().inner.entry(prepare_header).await 
else {
+            return Err(IggyError::InvalidCommand);
+        };
+        let batch =
+            decode_prepare_slice(entry.as_slice()).map_err(|_| 
IggyError::InvalidCommand)?;
+        let message_count = batch.message_count();
+        if message_count == 0 {
+            return Ok(None);
+        }
+
+        Ok(Some(
+            batch.header.base_offset + u64::from(message_count) - 1,
+        ))
+    }
+
+    fn parse_staged_consumer_offset_commit(
+        operation: Operation,
+        message: &Message<PrepareHeader>,
+    ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+        let total_size = message.header().size as usize;

Review Comment:
   `header.size as usize` is used to slice `message.as_slice()` without bounds 
validation. if `total_size > message.as_slice().len()` this panics. the call 
chain is on the replication hot path: `on_replicate` -> 
`apply_replicated_operation` -> `parse_staged_consumer_offset_commit`. a 
malformed prepare from the primary (or protocol version mismatch during rolling 
upgrade) would crash every follower.
   
   note: the subsequent body parsing (lines 1085-1103) correctly uses checked 
access (`.first()`, `.get(1..5)`, `.get(5..13)`). only this initial slice 
creation lacks bounds checking.
   
   fix: 
`message.as_slice().get(std::mem::size_of::<PrepareHeader>()..total_size).ok_or(IggyError::InvalidCommand)?`



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -272,3 +502,823 @@ impl Partition for IggyPartition {
         )
     }
 }
+
+impl<B> IggyPartition<B>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+{
+    #[must_use]
+    fn namespace(&self) -> IggyNamespace {
+        IggyNamespace::from_raw(self.consensus.namespace())
+    }
+
+    /// Handles a client request for this partition and turns it into a 
prepare.
+    ///
+    /// # Panics
+    /// Panics if called when this partition's consensus instance is not the
+    /// primary, is not in normal status, or is currently syncing.
+    #[allow(clippy::future_not_send)]
+    pub async fn on_request(&mut self, message: Message<RequestHeader>) {
+        let namespace = IggyNamespace::from_raw(message.header().namespace);
+        let prepare = {
+            let consensus = self.consensus();
+            emit_sim_event(
+                SimEventKind::ClientRequestReceived,
+                &RequestLogEvent {
+                    replica: ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                    client_id: message.header().client,
+                    request_id: message.header().request,
+                    operation: message.header().operation,
+                },
+            );
+
+            let message = if message.header().operation == 
Operation::SendMessages {
+                match convert_request_message(namespace, message) {
+                    Ok(message) => message,
+                    Err(error) => {
+                        warn!(
+                            target: "iggy.partitions.diag",
+                            plane = "partitions",
+                            replica_id = consensus.replica(),
+                            namespace_raw = namespace.inner(),
+                            operation = ?Operation::SendMessages,
+                            error = %error,
+                            "failed to convert send_messages request"
+                        );
+                        return;
+                    }
+                }
+            } else {
+                message
+            };
+
+            assert!(!consensus.is_follower(), "on_request: primary only");
+            assert!(consensus.is_normal(), "on_request: status must be 
normal");
+            assert!(!consensus.is_syncing(), "on_request: must not be 
syncing");
+
+            let prepare = message.project(consensus);
+            consensus.verify_pipeline();
+            consensus.pipeline_message(PlaneKind::Partitions, &prepare);
+            prepare
+        };
+        self.on_replicate(prepare).await;
+    }
+
+    #[allow(clippy::future_not_send)]
+    pub async fn on_replicate(&mut self, message: Message<PrepareHeader>) {
+        let header = *message.header();
+        let current_op = {
+            let consensus = self.consensus();
+            let current_op = match replicate_preflight(consensus, &header) {
+                Ok(current_op) => current_op,
+                Err(reason) => {
+                    warn!(
+                        target: "iggy.partitions.diag",
+                        plane = "partitions",
+                        replica_id = consensus.replica(),
+                        view = consensus.view(),
+                        op = header.op,
+                        namespace_raw = header.namespace,
+                        operation = ?header.operation,
+                        reason = reason.as_str(),
+                        "ignoring prepare during replicate preflight"
+                    );
+                    return;
+                }
+            };
+
+            if fence_old_prepare_by_commit(consensus, &header) {
+                warn!(
+                    target: "iggy.partitions.diag",
+                    plane = "partitions",
+                    replica_id = consensus.replica(),
+                    view = consensus.view(),
+                    op = header.op,
+                    commit = consensus.commit(),
+                    namespace_raw = header.namespace,
+                    operation = ?header.operation,
+                    "received old prepare, skipping replication"
+                );
+                return;
+            }
+
+            current_op
+        };
+
+        let message = {
+            let consensus = self.consensus();
+            replicate_to_next_in_chain(consensus, message).await
+        };
+        if let Err(error) = self.apply_replicated_operation(message).await {

Review Comment:
   if `apply_replicated_operation` fails here, the early `return` at line 625 
skips `sequencer().set_sequence(header.op)` at line 630 and `send_prepare_ok` 
at line 640. but `replicate_to_next_in_chain` at line 610 already forwarded the 
prepare to the next replica BEFORE this point.
   
   result: chain continues, quorum may be reached, op commits cluster-wide - 
but this replica permanently misses it. `debug_assert_eq!(header.op, current_op 
+ 1)` at line 627 fires in debug, but in release the replica silently diverges. 
subsequent prepares arrive with op numbers that no longer match the sequencer.
   
   this is a design divergence from the metadata plane, which advances the 
sequencer unconditionally before the journal append (`metadata.rs:394-396`).
   
   fix: advance the sequencer before `apply_replicated_operation` (matching the 
metadata plane), or advance it unconditionally regardless of apply outcome. if 
the operation truly can't be applied, the replica should request state transfer 
rather than silently diverging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to