hubcio commented on code in PR #3020:
URL: https://github.com/apache/iggy/pull/3020#discussion_r2996007386


##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -688,104 +678,258 @@ where
 
         if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
             // Freeze journal batches.
-            let frozen_batches = {
-                let batches = partition.log.journal_mut().inner.commit();
+            let (frozen_batches, index_bytes, batch_count) = {
+                let entries = partition.log.journal().inner.entries();
+                let segment = partition.log.active_segment();
+                let mut file_position = segment.size.as_bytes_u64();
                 partition.log.ensure_indexes();
-                
batches.append_indexes_to(partition.log.active_indexes_mut().unwrap());
-
-                let frozen: Vec<_> = batches
-                    .into_inner()
-                    .into_iter()
-                    .map(|mut b| b.freeze())
-                    .collect();
-                partition.log.set_in_flight(frozen.clone());
-                frozen
+                let indexes = partition.log.active_indexes_mut().unwrap();
+                let mut flush_index = None;
+                let mut frozen = Vec::with_capacity(entries.len());
+                let mut batch_count = 0u32;
+
+                for entry in entries {
+                    let Ok(batch) = decode_prepare_slice(entry.as_slice()) 
else {
+                        continue;
+                    };
+                    let message_count = batch.message_count();
+                    if message_count == 0 {
+                        continue;
+                    }
+
+                    let index = crate::iggy_index::IggyIndex::new(
+                        batch.header.base_offset,
+                        batch.header.base_timestamp,
+                        file_position,
+                    );
+                    if flush_index.is_none() {
+                        indexes.insert(index.offset, index.timestamp, 
index.position);
+                        flush_index = Some(index);
+                    }
+                    file_position += batch.header.total_size() as u64;
+                    batch_count += message_count;
+                    frozen.push(entry);
+                }
+
+                let index_bytes =
+                    flush_index.map(|index| 
crate::iggy_index::IggyIndexCache::serialize(&index));
+
+                (frozen, index_bytes, batch_count)
+            };
+
+            let Some(index_bytes) = index_bytes else {
+                warn!(
+                    ?namespace,
+                    "commit_messages: failed to build a sparse index entry 
from pending journal batches"
+                );
+                return Err(IggyError::InvalidCommand);
             };
 
             // Persist to disk.
-            self.persist_frozen_batches_to_disk(namespace, frozen_batches)
-                .await;
+            self.persist_frozen_batches_to_disk(
+                namespace,
+                frozen_batches,
+                index_bytes,
+                batch_count,
+            )
+            .await?;
 
             if is_full {
-                self.rotate_segment(namespace).await;
+                self.rotate_segment(namespace).await?;
             }
 
             // Reset journal info after drain.
             let partition = self
                 .get_mut_by_ns(namespace)
                 .expect("commit_messages: partition not found");
+            let _ = partition.log.journal_mut().inner.commit();
             partition.log.journal_mut().info = JournalInfo::default();
         }
 
-        // 4. Advance committed offset (last, so consumers only see offset 
after data is durable).
+        // 2. Update segment metadata from journal state.
+        // Note: segment.current_position is already updated in append_batch 
(prepare phase).
         let partition = self
             .get_mut_by_ns(namespace)
             .expect("commit_messages: partition not found");
+        let segment_index = partition.log.segments().len() - 1;
+        let segment = &mut partition.log.segments_mut()[segment_index];
+
+        if segment.end_offset == 0 && journal_info.first_timestamp != 0 {
+            segment.start_timestamp = journal_info.first_timestamp;
+        }
+        segment.end_timestamp = journal_info.end_timestamp;
+        segment.max_timestamp = 
segment.max_timestamp.max(journal_info.max_timestamp);
+        segment.end_offset = journal_info.current_offset;
+
+        // 3. Update stats.
+        partition
+            .stats
+            .increment_size_bytes(journal_info.size.as_bytes_u64());
+        partition
+            .stats
+            .increment_messages_count(u64::from(journal_info.messages_count));
+
+        // 4. Advance committed offset (last, so consumers only see offset 
after data is durable).
         let committed_offset = journal_info.current_offset;
         partition.offset.store(committed_offset, Ordering::Relaxed);

Review Comment:
   the comment at line 772 says "consumers only see offset after data is 
durable" - but this line runs unconditionally, even when the flush-threshold 
condition at line 679 is false and the persist block (lines 680-747) is skipped 
entirely. when thresholds aren't met, data exists only in the in-memory journal 
but the committed offset tells consumers it's available. crash before the next 
flush cycle = silent data loss for messages consumers believe they successfully 
processed.
   
   combined with follower `commit_journal` being a no-op (line 648), data can 
be "committed" without being durable on any replica.
   
   fix: only advance `partition.offset` after data is persisted, or restructure 
so the offset tracks what's actually durable.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -542,7 +548,7 @@ where
         &self,

Review Comment:
   also at the comment block lines 488-498.
   
   `apply_replicated_operation` unconditionally calls 
`append_send_messages_to_journal` with no dedup check. the comment at line 491 
explicitly says `append_messages` is NOT idempotent and lists two guards 
required before production traffic:
   1. op-based dedup: check 
`journal.op_to_storage_offset.contains_key(&header.op)` before appending
   2. client reply dedup by `(client_id, request_id)`
   
   the `op_to_storage_offset` BTreeMap already exists in journal.rs:114 and is 
populated on every append - the dedup check is a one-liner. after a VSR view 
change, the new primary replays from `max_commit+1` and re-executes, producing 
duplicate partition data with different offsets.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -688,104 +678,258 @@ where
 
         if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
             // Freeze journal batches.
-            let frozen_batches = {
-                let batches = partition.log.journal_mut().inner.commit();
+            let (frozen_batches, index_bytes, batch_count) = {
+                let entries = partition.log.journal().inner.entries();

Review Comment:
   also at lines 745-746.
   
   `entries()` clones a snapshot of the journal here, then 
`persist_frozen_batches_to_disk().await` yields to the compio executor at line 
729. during the yield, the executor can poll another ready future - an incoming 
prepare triggers `on_replicate` -> `append_messages`, which pushes a new entry 
to this partition's journal. when persist completes, `commit()` at line 745 
drains ALL entries via `std::mem::take` (including the one appended during the 
await), and `journal_info` is reset to default at line 746. the entry appended 
during persist was never written to disk, is now gone from the journal, and no 
error is logged.
   
   this is silent permanent data loss during normal operation under load - no 
crash required.
   
   fix: replace `entries()` (clone) + `commit()` (drain-all) with a single 
atomic `drain()` before the async persist. new appends during persist go into a 
fresh journal and are committed in the next cycle.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -386,7 +439,16 @@ where
         // TODO: Figure out the flow of the partition operations.
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
-        self.apply_replicated_operation(&namespace, message).await;
+        if let Err(error) = self.apply_replicated_operation(&namespace, 
message).await {

Review Comment:
   when `apply_replicated_operation` fails here, `on_replicate` early-returns 
at line 450 without calling `send_prepare_ok`. but by this point:
   - line 432: prepare already forwarded to downstream replicas via 
`replicate_to_next_in_chain`
   - line 436: sequencer already advanced
   - line 437: checksum chain already updated
   
   downstream replicas process the prepare and send PrepareOk, but this replica 
doesn't. result: partial quorum that can never complete - the primary's 
pipeline entry for this op is stuck forever, blocking the namespace. no 
timeout, no retry, no NACK.
   
   worse: the sequencer advancement at line 436 means subsequent ops (N+1, 
N+2...) are accepted while there's a gap at op N, violating the sequential 
execution invariant.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -553,17 +559,18 @@ where
         match header.operation {
             Operation::SendMessages => {
                 self.append_send_messages_to_journal(namespace, message)
-                    .await;
+                    .await?;
                 debug!(
                     replica = consensus.replica(),
                     op = header.op,
                     ?namespace,
                     "on_replicate: send_messages appended to partition journal"
                 );
+                Ok(())
             }
             Operation::StoreConsumerOffset => {
-                let body = message.body_bytes();
-                let body = body.as_ref();
+                let total_size = header.size() as usize;

Review Comment:
   also at lines 574-575.
   
   `body[0]`, `body[1..5]`, `body[5..13]` are unchecked indexing and the 
`.try_into().unwrap()` calls will panic on malformed input. in a consensus 
system, a malformed `StoreConsumerOffset` prepare that achieves quorum would 
crash every replica that processes it (f+1 replicas panic = total cluster crash 
from a single message).
   
   fix: use `body.get(0..13)` with an early return on `None`, and replace 
`.unwrap()` with `.map_err()`.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -688,104 +678,258 @@ where
 
         if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
             // Freeze journal batches.
-            let frozen_batches = {
-                let batches = partition.log.journal_mut().inner.commit();
+            let (frozen_batches, index_bytes, batch_count) = {
+                let entries = partition.log.journal().inner.entries();
+                let segment = partition.log.active_segment();
+                let mut file_position = segment.size.as_bytes_u64();
                 partition.log.ensure_indexes();
-                
batches.append_indexes_to(partition.log.active_indexes_mut().unwrap());
-
-                let frozen: Vec<_> = batches
-                    .into_inner()
-                    .into_iter()
-                    .map(|mut b| b.freeze())
-                    .collect();
-                partition.log.set_in_flight(frozen.clone());
-                frozen
+                let indexes = partition.log.active_indexes_mut().unwrap();
+                let mut flush_index = None;
+                let mut frozen = Vec::with_capacity(entries.len());
+                let mut batch_count = 0u32;
+
+                for entry in entries {
+                    let Ok(batch) = decode_prepare_slice(entry.as_slice()) 
else {
+                        continue;
+                    };
+                    let message_count = batch.message_count();
+                    if message_count == 0 {
+                        continue;
+                    }
+
+                    let index = crate::iggy_index::IggyIndex::new(
+                        batch.header.base_offset,
+                        batch.header.base_timestamp,
+                        file_position,
+                    );
+                    if flush_index.is_none() {
+                        indexes.insert(index.offset, index.timestamp, 
index.position);
+                        flush_index = Some(index);
+                    }
+                    file_position += batch.header.total_size() as u64;
+                    batch_count += message_count;
+                    frozen.push(entry);
+                }
+
+                let index_bytes =
+                    flush_index.map(|index| 
crate::iggy_index::IggyIndexCache::serialize(&index));
+
+                (frozen, index_bytes, batch_count)
+            };
+
+            let Some(index_bytes) = index_bytes else {
+                warn!(
+                    ?namespace,
+                    "commit_messages: failed to build a sparse index entry 
from pending journal batches"
+                );
+                return Err(IggyError::InvalidCommand);
             };
 
             // Persist to disk.
-            self.persist_frozen_batches_to_disk(namespace, frozen_batches)
-                .await;
+            self.persist_frozen_batches_to_disk(
+                namespace,
+                frozen_batches,
+                index_bytes,
+                batch_count,
+            )
+            .await?;
 
             if is_full {
-                self.rotate_segment(namespace).await;
+                self.rotate_segment(namespace).await?;
             }
 
             // Reset journal info after drain.
             let partition = self
                 .get_mut_by_ns(namespace)
                 .expect("commit_messages: partition not found");
+            let _ = partition.log.journal_mut().inner.commit();
             partition.log.journal_mut().info = JournalInfo::default();
         }
 
-        // 4. Advance committed offset (last, so consumers only see offset 
after data is durable).
+        // 2. Update segment metadata from journal state.
+        // Note: segment.current_position is already updated in append_batch 
(prepare phase).
         let partition = self
             .get_mut_by_ns(namespace)
             .expect("commit_messages: partition not found");
+        let segment_index = partition.log.segments().len() - 1;
+        let segment = &mut partition.log.segments_mut()[segment_index];
+
+        if segment.end_offset == 0 && journal_info.first_timestamp != 0 {
+            segment.start_timestamp = journal_info.first_timestamp;
+        }
+        segment.end_timestamp = journal_info.end_timestamp;
+        segment.max_timestamp = 
segment.max_timestamp.max(journal_info.max_timestamp);
+        segment.end_offset = journal_info.current_offset;
+
+        // 3. Update stats.
+        partition
+            .stats
+            .increment_size_bytes(journal_info.size.as_bytes_u64());
+        partition
+            .stats
+            .increment_messages_count(u64::from(journal_info.messages_count));
+
+        // 4. Advance committed offset (last, so consumers only see offset 
after data is durable).
         let committed_offset = journal_info.current_offset;
         partition.offset.store(committed_offset, Ordering::Relaxed);
         partition.stats.set_current_offset(committed_offset);
+        Ok(())
+    }
+
+    async fn handle_committed_entries(
+        &self,
+        consensus: &VsrConsensus<B, NamespacedPipeline>,
+        drained: Vec<PipelineEntry>,
+    ) -> bool {
+        if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
+            debug!(
+                "on_ack: draining committed ops=[{}..={}] count={}",
+                first.header.op,
+                last.header.op,
+                drained.len()
+            );
+        }
+
+        let mut committed_ns: HashSet<IggyNamespace> = HashSet::new();
+        let mut failed_ns: HashSet<IggyNamespace> = HashSet::new();
+
+        for PipelineEntry {
+            header: prepare_header,
+            ..
+        } in drained
+        {
+            let entry_namespace = 
IggyNamespace::from_raw(prepare_header.namespace);
+
+            match prepare_header.operation {
+                Operation::SendMessages => {
+                    if committed_ns.insert(entry_namespace)
+                        && let Err(error) = 
self.commit_messages(&entry_namespace).await
+                    {
+                        failed_ns.insert(entry_namespace);
+                        warn!(
+                            ?entry_namespace,
+                            op = prepare_header.op,
+                            %error,
+                            "on_ack: failed to commit partition messages"
+                        );
+                    }
+                    if failed_ns.contains(&entry_namespace) {
+                        continue;
+                    }
+                    debug!("on_ack: messages committed for op={}", 
prepare_header.op);
+                }
+                Operation::StoreConsumerOffset => {
+                    // TODO: Commit consumer offset update.
+                    debug!(
+                        "on_ack: consumer offset committed for op={}",
+                        prepare_header.op
+                    );
+                }
+                _ => {
+                    warn!(
+                        "on_ack: unexpected operation {:?} for op={}",
+                        prepare_header.operation, prepare_header.op
+                    );
+                }
+            }
+
+            let generic_reply =
+                build_reply_message(consensus, &prepare_header, 
bytes::Bytes::new()).into_generic();
+            let reply_buffers = freeze_client_reply(generic_reply);
+            debug!(
+                "on_ack: sending reply to client={} for op={}",
+                prepare_header.client, prepare_header.op
+            );
+
+            if let Err(error) = consensus
+                .message_bus()
+                .send_to_client(prepare_header.client, reply_buffers)
+                .await
+            {
+                warn!(
+                    client = prepare_header.client,
+                    op = prepare_header.op,
+                    %error,
+                    "on_ack: failed to send reply to client"
+                );
+            }
+        }
+
+        if failed_ns.is_empty() {

Review Comment:
   if any namespace fails to commit, `handle_committed_entries` returns `false` 
here, which causes `on_ack` (line 508) to skip `advance_commit_number` 
entirely. this blocks global commit advancement for ALL namespaces on the shard 
- one poisoned partition halts the entire shard's consensus progress. the 
pipeline fills up after ~8 more ops (PIPELINE_PREPARE_QUEUE_MAX), stalling all 
partitions.
   
   client replies are still sent inside the loop (lines 836-855), so clients 
see success while the shard is silently dying.
   
   this undermines the per-namespace independence that `drain_committable_all` 
was designed to provide.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -618,22 +627,22 @@ where
         &self,
         namespace: &IggyNamespace,
         message: Message<PrepareHeader>,
-    ) {
+    ) -> Result<(), IggyError> {
         let partition = self
             .get_mut_by_ns(namespace)
             .expect("append_send_messages_to_journal: partition not found for 
namespace");
-        let _ = partition.append_messages(message).await;
+        partition.append_messages(message).await.map(|_| ())
     }
 
     /// Replicate a prepare message to the next replica in the chain.
     ///
     /// Chain replication: primary -> first backup -> ... -> last backup.
     /// Stops when the next replica would be the primary.
-    async fn replicate(&self, message: Message<PrepareHeader>) {
+    async fn replicate(&self, message: Message<PrepareHeader>) -> 
Message<PrepareHeader> {
         let consensus = self
             .consensus()
             .expect("replicate: consensus not initialized");
-        replicate_to_next_in_chain(consensus, message).await;
+        replicate_to_next_in_chain(consensus, message).await
     }
 
     #[allow(clippy::unused_self, clippy::missing_const_for_fn)]

Review Comment:
   followers replicate data via `on_replicate` -> `append_messages` (in-memory 
journal only) but this function never persists anything. failover to a follower 
= complete data loss for that replica. combined with the 
committed-offset-without-durability issue at line 774, the system has no 
durable state on any replica when flush thresholds aren't met.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -845,33 +985,63 @@ where
             false, // file_exists (new segment)
         )
         .await
-        .expect("Failed to create segment storage");
-
-        // Clear old segment's indexes.
-        // TODO: Waiting for issue to move server config to shared module.
-        // Once complete, conditionally clear based on cache_indexes config:
-        // if !matches!(self.config.cache_indexes, CacheIndexesConfig::All) {
-        //     partition.log.indexes_mut()[old_segment_index] = None;
-        // }
-        partition.log.indexes_mut()[old_segment_index] = None;
+        .map_err(|_| 
IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?;
+        let messages_size_bytes = storage
+            .messages_writer
+            .as_ref()
+            .ok_or_else(|| 
IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?
+            .size_counter();
+        let messages_writer = Rc::new(
+            MessagesWriter::new(
+                &messages_path,
+                messages_size_bytes,
+                self.config.enforce_fsync,
+                false,
+            )
+            .await
+            .map_err(|_| 
IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?,
+        );
+        let index_writer = Rc::new(
+            IggyIndexWriter::new(
+                &index_path,
+                Rc::new(std::sync::atomic::AtomicU64::new(0)),
+                self.config.enforce_fsync,
+                false,
+            )
+            .await
+            .map_err(|_| 
IggyError::CannotCreateSegmentIndexFile(index_path.clone()))?,
+        );
 
         // Close writers for the sealed segment.
         let old_storage = &mut partition.log.storages_mut()[old_segment_index];
         let _ = old_storage.shutdown();
-
-        partition.log.add_persisted_segment(segment, storage);
+        partition.log.messages_writers_mut()[old_segment_index] = None;
+        partition.log.index_writers_mut()[old_segment_index] = None;
+
+        partition.log.add_persisted_segment(
+            segment,
+            storage,
+            Some(messages_writer),
+            Some(index_writer),
+        );
         partition.stats.increment_segments_count(1);
 
         debug!(?namespace, start_offset, "rotated to new segment");
+        Ok(())
     }
 
     async fn send_prepare_ok(&self, header: &PrepareHeader) {

Review Comment:
   `send_prepare_ok` checks 
`partition.log.journal().inner.entry(header).await.is_some()` which verifies 
the entry exists in the in-memory journal - not that it's durable on disk. the 
journal backing is `PartitionJournalMemStorage` (a `Vec` behind `UnsafeCell`). 
in VSR, a replica sending PrepareOk promises durability. power loss after ack 
but before the next flush cycle means the replica promised durability it can't 
deliver.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -322,11 +333,41 @@ impl<C> IggyPartitions<C> {
         )
         .await
         .expect("Failed to create segment storage");

Review Comment:
   also at lines 339, 349, 359.
   
   four `.expect()` calls in `init_partition` will crash the shard on any 
filesystem error (disk full, permission denied, path not found). 
`rotate_segment` was correctly updated to return `Result` with `map_err` - this 
function should follow the same pattern.



##########
core/partitions/src/segment.rs:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::{IggyByteSize, IggyExpiry, IggyTimestamp};
+use std::fmt::Display;
+
+#[derive(Default, Debug, Clone)]
+pub struct Segment {
+    pub sealed: bool,
+    pub start_timestamp: u64,
+    pub end_timestamp: u64,
+    pub max_timestamp: u64,
+    pub current_position: u32,

Review Comment:
   `current_position` is `u32` but `max_size` is `IggyByteSize` (u64-backed). 
`current_position` is incremented in the prepare path (iggy_partition.rs:112) 
with `+= batch_messages_size` before the commit path checks `is_full()`. 
multiple prepares can accumulate past 4GB before a commit triggers rotation, 
causing silent overflow (wraps in release mode, panics in debug).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to