This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/replica_bootstrap by this push:
     new cc31a42c3 address code review nits
cc31a42c3 is described below

commit cc31a42c3465a78d3779e739356435c2a6a09965
Author: numinex <[email protected]>
AuthorDate: Wed May 6 10:38:35 2026 +0200

    address code review nits
---
 core/metadata/src/impls/recovery.rs      |  5 ++-
 core/partitions/src/iggy_index_writer.rs |  9 +++-
 core/partitions/src/messages_writer.rs   | 10 +++--
 core/server-ng/src/bootstrap.rs          | 76 +++++++++++++++++++++++++++++---
 core/server-ng/src/server_error.rs       | 24 ++++++++++
 core/shard/src/router.rs                 | 38 +++++++---------
 6 files changed, 127 insertions(+), 35 deletions(-)

diff --git a/core/metadata/src/impls/recovery.rs 
b/core/metadata/src/impls/recovery.rs
index a75eca7c7..3d8d0d436 100644
--- a/core/metadata/src/impls/recovery.rs
+++ b/core/metadata/src/impls/recovery.rs
@@ -135,7 +135,10 @@ where
     )
     .await?;
 
-    // 4. Replay journal entries after snapshot
+    // 4. Replay journal entries after snapshot.
+    // Intentional fail-fast for now: if replay hits a bad entry, recovery
+    // aborts and the operator must repair or truncate the WAL before the
+    // node can boot again.
     let headers_to_replay = journal.iter_headers_from(replay_from);
 
     let mut last_applied_op: Option<u64> = None;
diff --git a/core/partitions/src/iggy_index_writer.rs 
b/core/partitions/src/iggy_index_writer.rs
index 8d2eb9c1b..0d9027bc5 100644
--- a/core/partitions/src/iggy_index_writer.rs
+++ b/core/partitions/src/iggy_index_writer.rs
@@ -43,14 +43,19 @@ impl IggyIndexWriter {
         file_exists: bool,
     ) -> Result<Self, IggyError> {
         let mut opts = OpenOptions::new();
-        opts.create(true).write(true);
+        opts.write(true);
+        if !file_exists {
+            opts.create(true);
+        }
         let file = opts
             .open(file_path)
             .await
             .map_err(|_| IggyError::CannotReadFile)?;
 
         if file_exists {
-            let _ = file.sync_all().await;
+            file.sync_all()
+                .await
+                .map_err(|_| IggyError::CannotWriteToFile)?;
 
             let actual_index_size = file
                 .metadata()
diff --git a/core/partitions/src/messages_writer.rs 
b/core/partitions/src/messages_writer.rs
index 517a036c9..d45f31146 100644
--- a/core/partitions/src/messages_writer.rs
+++ b/core/partitions/src/messages_writer.rs
@@ -50,17 +50,19 @@ impl MessagesWriter {
         file_exists: bool,
     ) -> Result<Self, IggyError> {
         let mut opts = OpenOptions::new();
-        opts.create(true).write(true);
+        opts.write(true);
+        if !file_exists {
+            opts.create(true);
+        }
         let file = opts
             .open(file_path)
             .await
             .map_err(|_| IggyError::CannotReadFile)?;
 
         if file_exists {
-            let _ = file
-                .sync_all()
+            file.sync_all()
                 .await
-                .map_err(|_| IggyError::CannotWriteToFile);
+                .map_err(|_| IggyError::CannotWriteToFile)?;
 
             let actual_messages_size = file
                 .metadata()
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index ffbc0424f..24b9a9565 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -476,7 +476,7 @@ async fn load_partition(
         .any(|segment| segment.size > IggyByteSize::default());
     partition.stats.set_current_offset(current_offset);
 
-    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset);
+    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset)?;
     ensure_initial_segment(&mut partition, config, stream_id, topic_id, 
partition_id).await?;
 
     Ok(partition)
@@ -500,6 +500,17 @@ async fn hydrate_partition_log(
         .zip(loaded_log.storages().iter().cloned())
         .enumerate()
     {
+        validate_recovered_segment(
+            stream_id,
+            topic_id,
+            partition_id,
+            segment,
+            &storage,
+            loaded_log
+                .indexes()
+                .get(segment_index)
+                .and_then(|indexes| indexes.as_ref()),
+        )?;
         let max_timestamp = match loaded_log
             .indexes()
             .get(segment_index)
@@ -560,6 +571,34 @@ async fn hydrate_partition_log(
     Ok(())
 }
 
+fn validate_recovered_segment(
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    segment: &iggy_common::Segment,
+    storage: &iggy_common::SegmentStorage,
+    indexes: Option<&server::streaming::segments::IggyIndexesMut>,
+) -> Result<(), ServerNgError> {
+    let messages_size_bytes = storage
+        .messages_reader
+        .as_ref()
+        .map_or(0, |reader| u64::from(reader.file_size()));
+    let indexed_size_bytes = indexes.map_or(0, |indexes| 
u64::from(indexes.messages_size()));
+    if messages_size_bytes == indexed_size_bytes {
+        return Ok(());
+    }
+
+    Err(ServerNgError::RecoveredSegmentSizeDivergence {
+        stream_id,
+        topic_id,
+        partition_id,
+        start_offset: segment.start_offset,
+        end_offset: segment.end_offset,
+        messages_size_bytes,
+        indexed_size_bytes,
+    })
+}
+
 fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> 
Segment {
     Segment {
         sealed: segment.sealed,
@@ -612,7 +651,7 @@ fn configure_consumer_offsets(
     config: &ServerNgConfig,
     namespace: IggyNamespace,
     current_offset: u64,
-) {
+) -> Result<(), ServerNgError> {
     let stream_id = namespace.stream_id();
     let topic_id = namespace.topic_id();
     let partition_id = namespace.partition_id();
@@ -631,8 +670,17 @@ fn configure_consumer_offsets(
     {
         let guard = consumer_offsets.pin();
         for offset in loaded_consumer_offsets {
-            if offset.offset.load(Ordering::Relaxed) > current_offset {
-                offset.offset.store(current_offset, Ordering::Relaxed);
+            let recovered_offset = offset.offset.load(Ordering::Relaxed);
+            if recovered_offset > current_offset {
+                return Err(ServerNgError::RecoveredConsumerOffsetOutOfBounds {
+                    consumer_kind: "consumer",
+                    consumer_id: offset.consumer_id as usize,
+                    offset: recovered_offset,
+                    current_offset,
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                });
             }
             guard.insert(offset.consumer_id as usize, offset);
         }
@@ -645,8 +693,17 @@ fn configure_consumer_offsets(
     {
         let guard = consumer_group_offsets.pin();
         for (group_id, offset) in loaded_group_offsets {
-            if offset.offset.load(Ordering::Relaxed) > current_offset {
-                offset.offset.store(current_offset, Ordering::Relaxed);
+            let recovered_offset = offset.offset.load(Ordering::Relaxed);
+            if recovered_offset > current_offset {
+                return Err(ServerNgError::RecoveredConsumerOffsetOutOfBounds {
+                    consumer_kind: "consumer group",
+                    consumer_id: group_id.0,
+                    offset: recovered_offset,
+                    current_offset,
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                });
             }
             guard.insert(group_id, offset);
         }
@@ -659,6 +716,7 @@ fn configure_consumer_offsets(
         consumer_group_offsets,
         config.system.partition.enforce_fsync,
     );
+    Ok(())
 }
 
 async fn ensure_initial_segment(
@@ -740,6 +798,12 @@ fn resolve_tcp_topology(
     let default_quic_addr =
         resolve_optional_listener_addr(config.quic.enabled, "quic.address", 
&config.quic.address)?;
     if !config.cluster.enabled {
+        if let Some(replica_id) = current_replica_id {
+            warn!(
+                replica_id,
+                "cluster is disabled, ignoring --replica-id for single-node 
server-ng startup"
+            );
+        }
         return Ok(TcpTopology {
             // Keep parity with the current server binary and the integration
             // harness: `--replica-id` may be passed unconditionally, but in
diff --git a/core/server-ng/src/server_error.rs 
b/core/server-ng/src/server_error.rs
index 67f574644..bfe848781 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -55,6 +55,30 @@ pub enum ServerNgError {
     MissingReplicaId,
     #[error("cluster node for replica {replica_id} is missing tcp_replica 
port")]
     ClusterReplicaPortMissing { replica_id: u8 },
+    #[error(
+        "recovered segment for stream {stream_id}, topic {topic_id}, partition 
{partition_id} at start_offset {start_offset} has message/index divergence 
(messages_size={messages_size_bytes}, indexed_size={indexed_size_bytes}, 
end_offset={end_offset})"
+    )]
+    RecoveredSegmentSizeDivergence {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        start_offset: u64,
+        end_offset: u64,
+        messages_size_bytes: u64,
+        indexed_size_bytes: u64,
+    },
+    #[error(
+        "recovered {consumer_kind} offset {offset} for id {consumer_id} 
exceeds current_offset {current_offset} in stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
+    )]
+    RecoveredConsumerOffsetOutOfBounds {
+        consumer_kind: &'static str,
+        consumer_id: usize,
+        offset: u64,
+        current_offset: u64,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    },
     #[error(
         "recovered namespace stream {stream_id}, topic {topic_id}, partition 
{partition_id} exceeds configured limits (max_streams={max_streams}, 
max_topics={max_topics}, max_partitions={max_partitions})"
     )]
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 7fb31b214..c09ae2c6f 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -89,18 +89,17 @@ where
         let target = if operation.is_metadata() {
             0
         } else if operation.is_partition() {
-            self.shards_table
-                .shard_for(partition_namespace)
-                .unwrap_or_else(|| {
-                    tracing::warn!(
-                        shard = self.id,
-                        stream = partition_namespace.stream_id(),
-                        topic = partition_namespace.topic_id(),
-                        partition = partition_namespace.partition_id(),
-                        "namespace not found in shards_table, falling back to 
shard 0"
-                    );
-                    0
-                })
+            let Some(target) = 
self.shards_table.shard_for(partition_namespace) else {
+                tracing::error!(
+                    shard = self.id,
+                    stream = partition_namespace.stream_id(),
+                    topic = partition_namespace.topic_id(),
+                    partition = partition_namespace.partition_id(),
+                    "namespace not found in shards_table, dropping message"
+                );
+                return;
+            };
+            target
         } else {
             // View-change / Commit messages carry an opaque u64 consensus
             // namespace (not an `IggyNamespace`). Hash it with the same
@@ -196,16 +195,11 @@ where
         } else if operation.is_partition() {
             self.shards_table
                 .shard_for(partition_namespace)
-                .unwrap_or_else(|| {
-                    tracing::warn!(
-                        shard = self.id,
-                        stream = partition_namespace.stream_id(),
-                        topic = partition_namespace.topic_id(),
-                        partition = partition_namespace.partition_id(),
-                        "namespace not found in shards_table, falling back to 
shard 0"
-                    );
-                    0
-                })
+                .ok_or_else(|| {
+                    ConsensusError::InvalidField(format!(
+                        "namespace {raw_namespace} is not registered in 
shards_table"
+                    ))
+                })?
         } else {
             #[allow(clippy::cast_lossless)]
             let shard_count = 
u32::try_from(self.senders.len()).unwrap_or(u32::MAX);

Reply via email to