This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f1279e46c00ae1111dd06eaeed170ddc745a84c8
Author: numinex <[email protected]>
AuthorDate: Thu Apr 23 14:29:06 2026 +0200

    tmp v2
---
 core/common/src/types/streaming_stats.rs |  28 +++
 core/consensus/src/impls.rs              |  27 +++
 core/metadata/src/lib.rs                 |   2 -
 core/metadata/src/stats/mod.rs           | 381 -------------------------------
 core/metadata/src/stm/stream.rs          |   5 +-
 core/partitions/src/iggy_index_writer.rs |  15 ++
 core/partitions/src/messages_writer.rs   |  16 ++
 core/server-ng/src/bootstrap.rs          |  97 +++++---
 core/server-ng/src/lib.rs                |   2 +
 core/server-ng/src/main.rs               |  30 ++-
 core/server-ng/src/server_error.rs       |  12 +
 11 files changed, 199 insertions(+), 416 deletions(-)

diff --git a/core/common/src/types/streaming_stats.rs 
b/core/common/src/types/streaming_stats.rs
index e1dbbc061..2c5829d13 100644
--- a/core/common/src/types/streaming_stats.rs
+++ b/core/common/src/types/streaming_stats.rs
@@ -85,6 +85,20 @@ impl StreamStats {
         self.zero_out_messages_count();
         self.zero_out_segments_count();
     }
+
+    pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
+        (
+            self.size_bytes.load(Ordering::Relaxed),
+            self.messages_count.load(Ordering::Relaxed),
+            self.segments_count.load(Ordering::Relaxed),
+        )
+    }
+
+    pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, 
segments_count: u32) {
+        self.size_bytes.store(size_bytes, Ordering::Relaxed);
+        self.messages_count.store(messages_count, Ordering::Relaxed);
+        self.segments_count.store(segments_count, Ordering::Relaxed);
+    }
 }
 
 #[derive(Default, Debug)]
@@ -199,6 +213,20 @@ impl TopicStats {
         self.zero_out_messages_count();
         self.zero_out_segments_count();
     }
+
+    pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
+        (
+            self.size_bytes.load(Ordering::Relaxed),
+            self.messages_count.load(Ordering::Relaxed),
+            self.segments_count.load(Ordering::Relaxed),
+        )
+    }
+
+    pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, 
segments_count: u32) {
+        self.size_bytes.store(size_bytes, Ordering::Relaxed);
+        self.messages_count.store(messages_count, Ordering::Relaxed);
+        self.segments_count.store(segments_count, Ordering::Relaxed);
+    }
 }
 
 #[derive(Default, Debug)]
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 8cc8d1c2a..8b45286cb 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -662,6 +662,33 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         assert!(self.commit_max.get() >= self.commit_min.get());
     }
 
+    /// Restore local commit progress from already-applied state during 
bootstrap.
+    ///
+    /// Unlike `advance_commit_min`, this is intended for recovery paths where 
the
+    /// state machine has already been restored up to the supplied commit 
point.
+    ///
+    /// # Panics
+    /// - If `commit_min > commit_max`.
+    /// - If commit progress has already been initialized on this consensus 
instance.
+    pub fn restore_commit_state(&self, commit_min: u64, commit_max: u64) {
+        assert!(
+            commit_min <= commit_max,
+            "commit_min ({commit_min}) must be <= commit_max ({commit_max})"
+        );
+        assert_eq!(
+            self.commit_min.get(),
+            0,
+            "restore_commit_state must only be used on a fresh consensus 
instance"
+        );
+        assert_eq!(
+            self.commit_max.get(),
+            0,
+            "restore_commit_state must only be used on a fresh consensus 
instance"
+        );
+        self.commit_max.set(commit_max);
+        self.commit_min.set(commit_min);
+    }
+
     /// Maximum number of faulty replicas that can be tolerated.
     /// For a cluster of 2f+1 replicas, this returns f.
     #[must_use]
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 7ae61f217..a50cf4f27 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -21,8 +21,6 @@ pub mod impls;
 pub mod permissioner;
 pub mod stm;
 
-mod stats;
-
 // Re-export IggyMetadata for use in other modules
 pub use impls::metadata::IggyMetadata;
 
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
deleted file mode 100644
index 28dd5c5a4..000000000
--- a/core/metadata/src/stats/mod.rs
+++ /dev/null
@@ -1,381 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#![expect(
-    unused,
-    reason = "Methods are part of the state API and will be used once the 
implementation is complete"
-)]
-use std::sync::{
-    Arc,
-    atomic::{AtomicU32, AtomicU64, Ordering},
-};
-
-#[derive(Default, Debug)]
-pub struct StreamStats {
-    size_bytes: AtomicU64,
-    messages_count: AtomicU64,
-    segments_count: AtomicU32,
-}
-
-impl StreamStats {
-    pub fn increment_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
-    }
-
-    pub fn increment_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_add(messages_count, Ordering::AcqRel);
-    }
-
-    pub fn increment_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_add(segments_count, Ordering::AcqRel);
-    }
-
-    pub fn decrement_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
-    }
-
-    pub fn decrement_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_sub(messages_count, Ordering::AcqRel);
-    }
-
-    pub fn decrement_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_sub(segments_count, Ordering::AcqRel);
-    }
-
-    pub fn size_bytes_inconsistent(&self) -> u64 {
-        self.size_bytes.load(Ordering::Relaxed)
-    }
-
-    pub fn messages_count_inconsistent(&self) -> u64 {
-        self.messages_count.load(Ordering::Relaxed)
-    }
-
-    pub fn segments_count_inconsistent(&self) -> u32 {
-        self.segments_count.load(Ordering::Relaxed)
-    }
-
-    pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-    }
-
-    pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-    }
-
-    pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-    }
-
-    pub fn zero_out_all(&self) {
-        self.zero_out_size_bytes();
-        self.zero_out_messages_count();
-        self.zero_out_segments_count();
-    }
-
-    pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
-        (
-            self.size_bytes.load(Ordering::Relaxed),
-            self.messages_count.load(Ordering::Relaxed),
-            self.segments_count.load(Ordering::Relaxed),
-        )
-    }
-
-    pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, 
segments_count: u32) {
-        self.size_bytes.store(size_bytes, Ordering::Relaxed);
-        self.messages_count.store(messages_count, Ordering::Relaxed);
-        self.segments_count.store(segments_count, Ordering::Relaxed);
-    }
-}
-
-#[derive(Default, Debug)]
-pub struct TopicStats {
-    parent: Arc<StreamStats>,
-    size_bytes: AtomicU64,
-    messages_count: AtomicU64,
-    segments_count: AtomicU32,
-}
-
-impl TopicStats {
-    pub const fn new(parent: Arc<StreamStats>) -> Self {
-        Self {
-            parent,
-            size_bytes: AtomicU64::new(0),
-            messages_count: AtomicU64::new(0),
-            segments_count: AtomicU32::new(0),
-        }
-    }
-
-    pub fn parent(&self) -> Arc<StreamStats> {
-        self.parent.clone()
-    }
-
-    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.increment_size_bytes(size_bytes);
-    }
-
-    pub fn increment_parent_messages_count(&self, messages_count: u64) {
-        self.parent.increment_messages_count(messages_count);
-    }
-
-    pub fn increment_parent_segments_count(&self, segments_count: u32) {
-        self.parent.increment_segments_count(segments_count);
-    }
-
-    pub fn increment_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
-        self.increment_parent_size_bytes(size_bytes);
-    }
-
-    pub fn increment_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_add(messages_count, Ordering::AcqRel);
-        self.increment_parent_messages_count(messages_count);
-    }
-
-    pub fn increment_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_add(segments_count, Ordering::AcqRel);
-        self.increment_parent_segments_count(segments_count);
-    }
-
-    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.decrement_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
-        self.parent.decrement_messages_count(messages_count);
-    }
-
-    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
-        self.parent.decrement_segments_count(segments_count);
-    }
-
-    pub fn decrement_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
-        self.decrement_parent_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_sub(messages_count, Ordering::AcqRel);
-        self.decrement_parent_messages_count(messages_count);
-    }
-
-    pub fn decrement_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_sub(segments_count, Ordering::AcqRel);
-        self.decrement_parent_segments_count(segments_count);
-    }
-
-    pub fn size_bytes_inconsistent(&self) -> u64 {
-        self.size_bytes.load(Ordering::Relaxed)
-    }
-
-    pub fn messages_count_inconsistent(&self) -> u64 {
-        self.messages_count.load(Ordering::Relaxed)
-    }
-
-    pub fn segments_count_inconsistent(&self) -> u32 {
-        self.segments_count.load(Ordering::Relaxed)
-    }
-
-    pub fn zero_out_parent_size_bytes(&self) {
-        self.parent.zero_out_size_bytes();
-    }
-
-    pub fn zero_out_parent_messages_count(&self) {
-        self.parent.zero_out_messages_count();
-    }
-
-    pub fn zero_out_parent_segments_count(&self) {
-        self.parent.zero_out_segments_count();
-    }
-
-    pub fn zero_out_parent_all(&self) {
-        self.parent.zero_out_all();
-    }
-
-    pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-        self.zero_out_parent_size_bytes();
-    }
-
-    pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_messages_count();
-    }
-
-    pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_segments_count();
-    }
-
-    pub fn zero_out_all(&self) {
-        self.zero_out_size_bytes();
-        self.zero_out_messages_count();
-        self.zero_out_segments_count();
-    }
-
-    pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
-        (
-            self.size_bytes.load(Ordering::Relaxed),
-            self.messages_count.load(Ordering::Relaxed),
-            self.segments_count.load(Ordering::Relaxed),
-        )
-    }
-
-    pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, 
segments_count: u32) {
-        self.size_bytes.store(size_bytes, Ordering::Relaxed);
-        self.messages_count.store(messages_count, Ordering::Relaxed);
-        self.segments_count.store(segments_count, Ordering::Relaxed);
-    }
-}
-
-#[derive(Default, Debug)]
-pub struct PartitionStats {
-    parent: Arc<TopicStats>,
-    messages_count: AtomicU64,
-    size_bytes: AtomicU64,
-    segments_count: AtomicU32,
-}
-
-impl PartitionStats {
-    pub const fn new(parent_stats: Arc<TopicStats>) -> Self {
-        Self {
-            parent: parent_stats,
-            messages_count: AtomicU64::new(0),
-            size_bytes: AtomicU64::new(0),
-            segments_count: AtomicU32::new(0),
-        }
-    }
-
-    pub fn parent(&self) -> Arc<TopicStats> {
-        self.parent.clone()
-    }
-
-    pub fn increment_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
-        self.increment_parent_size_bytes(size_bytes);
-    }
-
-    pub fn increment_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_add(messages_count, Ordering::AcqRel);
-        self.increment_parent_messages_count(messages_count);
-    }
-
-    pub fn increment_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_add(segments_count, Ordering::AcqRel);
-        self.increment_parent_segments_count(segments_count);
-    }
-
-    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.increment_size_bytes(size_bytes);
-    }
-
-    pub fn increment_parent_messages_count(&self, messages_count: u64) {
-        self.parent.increment_messages_count(messages_count);
-    }
-
-    pub fn increment_parent_segments_count(&self, segments_count: u32) {
-        self.parent.increment_segments_count(segments_count);
-    }
-
-    pub fn decrement_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
-        self.decrement_parent_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_sub(messages_count, Ordering::AcqRel);
-        self.decrement_parent_messages_count(messages_count);
-    }
-
-    pub fn decrement_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_sub(segments_count, Ordering::AcqRel);
-        self.decrement_parent_segments_count(segments_count);
-    }
-
-    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.decrement_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
-        self.parent.decrement_messages_count(messages_count);
-    }
-
-    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
-        self.parent.decrement_segments_count(segments_count);
-    }
-
-    pub fn size_bytes_inconsistent(&self) -> u64 {
-        self.size_bytes.load(Ordering::Relaxed)
-    }
-
-    pub fn messages_count_inconsistent(&self) -> u64 {
-        self.messages_count.load(Ordering::Relaxed)
-    }
-
-    pub fn segments_count_inconsistent(&self) -> u32 {
-        self.segments_count.load(Ordering::Relaxed)
-    }
-
-    pub fn zero_out_parent_size_bytes(&self) {
-        self.parent.zero_out_size_bytes();
-    }
-
-    pub fn zero_out_parent_messages_count(&self) {
-        self.parent.zero_out_messages_count();
-    }
-
-    pub fn zero_out_parent_segments_count(&self) {
-        self.parent.zero_out_segments_count();
-    }
-
-    pub fn zero_out_parent_all(&self) {
-        self.parent.zero_out_all();
-    }
-
-    pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-        self.zero_out_parent_size_bytes();
-    }
-
-    pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_messages_count();
-    }
-
-    pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_segments_count();
-    }
-
-    pub fn zero_out_all(&self) {
-        self.zero_out_size_bytes();
-        self.zero_out_messages_count();
-        self.zero_out_segments_count();
-        self.zero_out_parent_all();
-    }
-}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 0e2f574c2..d91f67df6 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stats::{StreamStats, TopicStats};
 use crate::stm::StateHandler;
 use crate::stm::snapshot::Snapshotable;
 use crate::{collect_handlers, define_state, impl_fill_restore};
@@ -31,7 +30,9 @@ use iggy_binary_protocol::requests::streams::{
 use iggy_binary_protocol::requests::topics::{
     CreateTopicWithAssignmentsRequest, DeleteTopicRequest, PurgeTopicRequest, 
UpdateTopicRequest,
 };
-use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
+use iggy_common::{
+    CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize, 
StreamStats, TopicStats,
+};
 use serde::{Deserialize, Serialize};
 use slab::Slab;
 use std::sync::Arc;
diff --git a/core/partitions/src/iggy_index_writer.rs 
b/core/partitions/src/iggy_index_writer.rs
index 3aa05ac5b..8d2eb9c1b 100644
--- a/core/partitions/src/iggy_index_writer.rs
+++ b/core/partitions/src/iggy_index_writer.rs
@@ -31,6 +31,11 @@ pub struct IggyIndexWriter {
 }
 
 impl IggyIndexWriter {
+    /// Creates an index writer backed by the sparse index file at `file_path`.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the file cannot be opened, synchronized, or 
queried for metadata.
     pub async fn new(
         file_path: &str,
         index_size_bytes: Rc<AtomicU64>,
@@ -72,6 +77,11 @@ impl IggyIndexWriter {
         })
     }
 
+    /// Appends encoded sparse index bytes to the backing file.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the index bytes cannot be written or synced to 
disk.
     pub async fn save_indexes(&self, indexes: Vec<u8>) -> Result<(), 
IggyError> {
         if indexes.is_empty() {
             return Ok(());
@@ -103,6 +113,11 @@ impl IggyIndexWriter {
         Ok(())
     }
 
+    /// Flushes buffered index file contents to disk.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the file cannot be synchronized.
     pub async fn fsync(&self) -> Result<(), IggyError> {
         self.file
             .sync_all()
diff --git a/core/partitions/src/messages_writer.rs 
b/core/partitions/src/messages_writer.rs
index 4af82d48a..517a036c9 100644
--- a/core/partitions/src/messages_writer.rs
+++ b/core/partitions/src/messages_writer.rs
@@ -38,6 +38,11 @@ pub struct MessagesWriter {
 }
 
 impl MessagesWriter {
+    /// Creates a messages writer backed by the segment file at `file_path`.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the file cannot be opened, synchronized, or 
queried for metadata.
     pub async fn new(
         file_path: &str,
         messages_size_bytes: Rc<AtomicU64>,
@@ -74,6 +79,11 @@ impl MessagesWriter {
         })
     }
 
+    /// Appends a batch of frozen message buffers to the segment file.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if any chunk cannot be written or synced to disk.
     pub async fn save_frozen_batches<const ALIGN: usize>(
         &self,
         buffers: &[Frozen<ALIGN>],
@@ -97,10 +107,16 @@ impl MessagesWriter {
         Ok(IggyByteSize::from(messages_size))
     }
 
+    #[must_use]
     pub fn path(&self) -> String {
         self.file_path.clone()
     }
 
+    /// Flushes buffered segment file contents to disk.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the file cannot be synchronized.
     pub async fn fsync(&self) -> Result<(), IggyError> {
         self.file
             .sync_all()
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 1ddefecfb..a17dfd064 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -22,8 +22,8 @@ use configs::server::ServerConfig;
 use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
 use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
 use iggy_common::{
-    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
sharding::LocalIdx,
-    variadic,
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
 };
 use journal::Journal;
 use journal::prepare_journal::PrepareJournal;
@@ -39,6 +39,7 @@ use metadata::stm::user::Users;
 use partitions::{
     IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
 };
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
 use server::bootstrap::create_directories;
 use server::log::logger::Logging;
 use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
@@ -54,6 +55,7 @@ use tracing::{info, warn};
 
 const CLUSTER_ID: u128 = 1;
 const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
 const SHARD_NAME: &str = "server-ng-shard-0";
 const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml";
 
@@ -90,10 +92,17 @@ impl RunServerNg for Rc<ServerNgShard> {
     }
 }
 
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, logging setup,
+/// metadata recovery, or partition hydration fails.
 pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, 
ServerNgError> {
     let config = ServerConfig::load_with_path(DEFAULT_CONFIG_PATH, 
include_str!("../config.toml"))
         .await
         .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
     create_directories(&config.system)
         .await
         .map_err(ServerNgError::CreateDirectories)?;
@@ -133,7 +142,7 @@ fn restore_metadata_consensus(
 ) -> VsrConsensus<IggyMessageBus> {
     let mut consensus = VsrConsensus::new(
         CLUSTER_ID,
-        SHARD_ID as u8,
+        SHARD_REPLICA_ID,
         1,
         0,
         IggyMessageBus::new(1, SHARD_ID, 0),
@@ -142,17 +151,15 @@ fn restore_metadata_consensus(
 
     let last_header = journal
         .last_op()
-        .and_then(|op| journal.header(op as usize).map(|header| *header));
+        .and_then(|op| usize::try_from(op).ok())
+        .and_then(|op| journal.header(op).map(|header| *header));
     if let Some(header) = last_header {
         consensus.set_view(header.view);
     }
 
     consensus.init();
     consensus.sequencer().set_sequence(restored_op);
-    consensus.advance_commit_max(restored_op);
-    for op in 1..=restored_op {
-        consensus.advance_commit_min(op);
-    }
+    consensus.restore_commit_state(restored_op, restored_op);
     if let Some(header) = last_header {
         consensus.set_last_prepare_checksum(header.checksum);
         consensus.set_log_view(header.view);
@@ -194,20 +201,28 @@ async fn build_single_shard(
     );
     let shards_table = PapayaShardsTable::with_capacity(partition_count);
 
-    let mut namespaces = Vec::with_capacity(partition_count);
-    let _ = metadata.mux_stm.streams().read(|inner| {
-        for (_, stream) in inner.items.iter() {
-            for (topic_id, topic) in stream.topics.iter() {
+    let (topic_stats, namespaces) = metadata.mux_stm.streams().read(|inner| {
+        let mut topic_stats = Vec::new();
+        let mut namespaces = Vec::with_capacity(partition_count);
+        for (_, stream) in &inner.items {
+            for (topic_id, topic) in &stream.topics {
+                topic_stats.push(topic.stats.clone());
                 for partition in &topic.partitions {
-                    namespaces.push((stream.id, topic_id, partition.clone()));
+                    namespaces.push((stream.id, topic_id, topic.stats.clone(), 
partition.clone()));
                 }
             }
         }
+        (topic_stats, namespaces)
     });
 
-    for (stream_id, topic_id, partition_metadata) in namespaces {
+    for topic_stats in topic_stats {
+        topic_stats.zero_out_all();
+    }
+
+    for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces {
+        validate_recovered_namespace(config, stream_id, topic_id, 
partition_metadata.id)?;
         let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_metadata.id);
-        let partition = load_partition(config, namespace, 
&partition_metadata).await?;
+        let partition = load_partition(config, namespace, topic_stats, 
&partition_metadata).await?;
         let local_idx = partitions.insert(namespace, partition);
         shards_table.insert(
             namespace,
@@ -224,18 +239,43 @@ async fn build_single_shard(
     ))
 }
 
+const fn validate_recovered_namespace(
+    config: &ServerConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+) -> Result<(), ServerNgError> {
+    let namespace = &config.extra.namespace;
+    if stream_id < namespace.max_streams
+        && topic_id < namespace.max_topics
+        && partition_id < namespace.max_partitions
+    {
+        return Ok(());
+    }
+
+    Err(ServerNgError::RecoveredNamespaceOutOfBounds {
+        stream_id,
+        topic_id,
+        partition_id,
+        max_streams: namespace.max_streams,
+        max_topics: namespace.max_topics,
+        max_partitions: namespace.max_partitions,
+    })
+}
+
 async fn load_partition(
     config: &ServerConfig,
     namespace: IggyNamespace,
+    topic_stats: Arc<TopicStats>,
     partition_metadata: &Partition,
 ) -> Result<IggyPartition<IggyMessageBus>, ServerNgError> {
     let stream_id = namespace.stream_id();
     let topic_id = namespace.topic_id();
     let partition_id = namespace.partition_id();
-    let stats = Arc::new(PartitionStats::default());
+    let stats = Arc::new(PartitionStats::new(topic_stats));
     let consensus = VsrConsensus::new(
         CLUSTER_ID,
-        SHARD_ID as u8,
+        SHARD_REPLICA_ID,
         1,
         namespace.inner(),
         IggyMessageBus::new(1, SHARD_ID, namespace.inner()),
@@ -243,6 +283,8 @@ async fn load_partition(
     );
     consensus.init();
 
+    // TODO: decouple the loading logic from the `server` crate and load 
directly
+    // into the new `partitions` log/runtime types.
     let loaded_log = server::bootstrap::load_segments(
         &config.system,
         stream_id,
@@ -291,11 +333,8 @@ async fn load_partition(
         .iter()
         .any(|segment| segment.size > IggyByteSize::default());
     partition.stats.set_current_offset(current_offset);
-    partition
-        .stats
-        .increment_segments_count(partition.log.segments().len() as u32);
 
-    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset)?;
+    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset);
     ensure_initial_segment(&mut partition, config, stream_id, topic_id, 
partition_id).await?;
 
     Ok(partition)
@@ -311,10 +350,11 @@ async fn hydrate_partition_log(
         server::streaming::partitions::journal::MemoryMessageJournal,
     >,
 ) -> Result<(), ServerNgError> {
+    // TODO: decouple the loading logic from the `server` crate. This currently
+    // adapts the old server segmented log into the new `partitions` log.
     for (segment, storage) in loaded_log
         .segments()
         .iter()
-        .cloned()
         .zip(loaded_log.storages().iter().cloned())
     {
         partition
@@ -335,7 +375,7 @@ async fn hydrate_partition_log(
             partition.log.messages_writers_mut()[active_index] = Some(Rc::new(
                 MessagesWriter::new(
                     &messages_reader.path(),
-                    Rc::new(AtomicU64::new(messages_reader.file_size() as 
u64)),
+                    
Rc::new(AtomicU64::new(u64::from(messages_reader.file_size()))),
                     config.system.partition.enforce_fsync,
                     true,
                 )
@@ -363,14 +403,12 @@ async fn hydrate_partition_log(
                 })?,
             ));
         }
-    } else {
-        let _ = (stream_id, topic_id, partition_id);
     }
 
     Ok(())
 }
 
-fn convert_segment(segment: iggy_common::Segment) -> Segment {
+fn convert_segment(segment: &iggy_common::Segment) -> Segment {
     Segment {
         sealed: segment.sealed,
         start_timestamp: segment.start_timestamp,
@@ -389,7 +427,7 @@ fn configure_consumer_offsets(
     config: &ServerConfig,
     namespace: IggyNamespace,
     current_offset: u64,
-) -> Result<(), ServerNgError> {
+) {
     let stream_id = namespace.stream_id();
     let topic_id = namespace.topic_id();
     let partition_id = namespace.partition_id();
@@ -402,6 +440,7 @@ fn configure_consumer_offsets(
             .system
             .get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id);
 
+    // TODO: decouple consumer offset loading from the `server` crate.
     let loaded_consumer_offsets = 
load_consumer_offsets(&consumer_offsets_path).unwrap_or_default();
     let consumer_offsets = 
ConsumerOffsets::with_capacity(loaded_consumer_offsets.len());
     {
@@ -414,6 +453,7 @@ fn configure_consumer_offsets(
         }
     }
 
+    // TODO: decouple consumer group offset loading from the `server` crate.
     let loaded_group_offsets =
         
load_consumer_group_offsets(&consumer_group_offsets_path).unwrap_or_default();
     let consumer_group_offsets = 
ConsumerGroupOffsets::with_capacity(loaded_group_offsets.len());
@@ -434,8 +474,6 @@ fn configure_consumer_offsets(
         consumer_group_offsets,
         config.system.partition.enforce_fsync,
     );
-
-    Ok(())
 }
 
 async fn ensure_initial_segment(
@@ -449,6 +487,7 @@ async fn ensure_initial_segment(
         return Ok(());
     }
 
+    // TODO: decouple segment storage creation from the `server` crate.
     let storage =
         create_segment_storage(&config.system, stream_id, topic_id, 
partition_id, 0, 0, 0)
             .await
diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs
index c8c412834..be74cdd6e 100644
--- a/core/server-ng/src/lib.rs
+++ b/core/server-ng/src/lib.rs
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+#![allow(clippy::future_not_send)]
+
 pub mod bootstrap;
 pub mod login_register;
 pub mod server_error;
diff --git a/core/server-ng/src/main.rs b/core/server-ng/src/main.rs
index b6a7ef885..e7223ff72 100644
--- a/core/server-ng/src/main.rs
+++ b/core/server-ng/src/main.rs
@@ -17,10 +17,35 @@
  * under the License.
  */
 
+#![allow(clippy::future_not_send)]
+
+use server_ng::bootstrap::RunServerNg;
 use server_ng::server_error::ServerNgError;
 
 fn main() -> Result<(), ServerNgError> {
-    let runtime = compio::runtime::Runtime::new().expect("failed to create 
compio runtime");
+    // TODO: decouple runtime creation from the `server` crate and move the 
shared
+    // compio executor setup into a lower-level crate/module used by both 
binaries.
+    let runtime = match server::bootstrap::create_shard_executor() {
+        Ok(rt) => rt,
+        Err(e) => {
+            match e.kind() {
+                std::io::ErrorKind::InvalidInput => {
+                    // TODO: decouple io_uring diagnostics from the `server` 
crate.
+                    server::diagnostics::print_invalid_io_uring_args_info();
+                }
+                std::io::ErrorKind::OutOfMemory => {
+                    // TODO: decouple io_uring diagnostics from the `server` 
crate.
+                    server::diagnostics::print_locked_memory_limit_info();
+                }
+                std::io::ErrorKind::PermissionDenied => {
+                    // TODO: decouple io_uring diagnostics from the `server` 
crate.
+                    server::diagnostics::print_io_uring_permission_info();
+                }
+                _ => {}
+            }
+            panic!("Cannot create server-ng executor: {e}");
+        }
+    };
     runtime.block_on(async {
         if let Ok(env_path) = std::env::var("IGGY_ENV_PATH") {
             let _ = dotenvy::from_path(&env_path);
@@ -28,7 +53,8 @@ fn main() -> Result<(), ServerNgError> {
             let _ = dotenvy::dotenv();
         }
 
-        use server_ng::bootstrap::RunServerNg;
+        // TODO: decouple logging from the `server` crate and move the shared
+        // logging bootstrap into a lower-level crate/module.
         let mut logging = server::log::logger::Logging::new();
         logging.early_init();
 
diff --git a/core/server-ng/src/server_error.rs 
b/core/server-ng/src/server_error.rs
index c071f1608..1a667fc4b 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -18,6 +18,7 @@
  */
 
 use metadata::impls::recovery::RecoveryError;
+// TODO: decouple logging errors from the `server` crate.
 use server::server_error::LogError;
 use thiserror::Error;
 
@@ -31,6 +32,17 @@ pub enum ServerNgError {
     Logging(#[source] LogError),
     #[error("failed to recover metadata snapshot and journal")]
     MetadataRecovery(#[source] RecoveryError),
+    #[error(
+        "recovered namespace stream {stream_id}, topic {topic_id}, partition 
{partition_id} exceeds configured limits (max_streams={max_streams}, 
max_topics={max_topics}, max_partitions={max_partitions})"
+    )]
+    RecoveredNamespaceOutOfBounds {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        max_streams: usize,
+        max_topics: usize,
+        max_partitions: usize,
+    },
     #[error(
         "failed to load partition log for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
     )]

Reply via email to