This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fbafdee10dcbde194a7249c9345adf77cf1020fe
Author: numinex <[email protected]>
AuthorDate: Tue Apr 21 13:52:26 2026 +0200

    temp
---
 Cargo.lock                                   |   7 +-
 Cargo.toml                                   |   1 +
 core/common/src/sharding/mod.rs              |   5 +-
 core/common/src/sharding/namespace.rs        | 118 +++++++
 core/configs/src/server_config/defaults.rs   |   3 +-
 core/configs/src/server_config/displays.rs   |  24 +-
 core/configs/src/server_config/server.rs     |  56 ++-
 core/configs/src/server_config/validators.rs |  27 +-
 core/metadata/src/stm/stream.rs              |   8 +
 core/partitions/src/lib.rs                   |   3 +
 core/server-ng/Cargo.toml                    |   7 +-
 core/server-ng/PLAN.md                       | 127 +++++++
 core/server-ng/config.toml                   |  11 +
 core/server-ng/src/bootstrap.rs              | 504 +++++++++++++++++++++++++++
 core/server-ng/src/lib.rs                    |   2 +
 core/server-ng/src/main.rs                   |  21 +-
 core/server-ng/src/server_error.rs           |  74 ++++
 17 files changed, 982 insertions(+), 16 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index cb0d18d90..ec90a34c0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10165,7 +10165,6 @@ name = "server-ng"
 version = "0.8.0"
 dependencies = [
  "ahash 0.8.12",
- "anyhow",
  "argon2",
  "async-channel",
  "async_zip",
@@ -10176,6 +10175,7 @@ dependencies = [
  "clap",
  "compio",
  "configs",
+ "consensus",
  "ctrlc",
  "cyper",
  "cyper-axum",
@@ -10192,8 +10192,10 @@ dependencies = [
  "hwlocality",
  "iggy_binary_protocol",
  "iggy_common",
+ "journal",
  "jsonwebtoken",
  "left-right",
+ "message_bus",
  "metadata",
  "mimalloc",
  "mime_guess",
@@ -10204,6 +10206,7 @@ dependencies = [
  "opentelemetry-semantic-conventions",
  "opentelemetry_sdk",
  "papaya",
+ "partitions",
  "prometheus-client",
  "rand 0.10.1",
  "ringbuffer",
@@ -10215,6 +10218,8 @@ dependencies = [
  "secrecy",
  "send_wrapper",
  "serde",
+ "server",
+ "shard",
  "slab",
  "socket2 0.6.3",
  "strum 0.28.0",
diff --git a/Cargo.toml b/Cargo.toml
index 250c81acc..ba1f59ff7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -254,6 +254,7 @@ serde_yaml_ng = "0.10.0"
 serial_test = "3.4.0"
 server = { path = "core/server" }
 server-ng = { path = "core/server-ng" }
+shard = { path = "core/shard" }
 simd-json = { version = "0.17.0", features = ["serde_impl"] }
 slab = "0.4.12"
 smallvec = "1.15"
diff --git a/core/common/src/sharding/mod.rs b/core/common/src/sharding/mod.rs
index 460c04ec3..051802ecf 100644
--- a/core/common/src/sharding/mod.rs
+++ b/core/common/src/sharding/mod.rs
@@ -22,8 +22,9 @@ mod shard_id;
 
 pub use local_idx::LocalIdx;
 pub use namespace::{
-    IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, 
PARTITION_MASK,
-    PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, 
TOPIC_MASK, TOPIC_SHIFT,
+    IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, 
NamespaceCapacityError, PARTITION_BITS,
+    PARTITION_MASK, PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, 
TOPIC_BITS,
+    TOPIC_MASK, TOPIC_SHIFT,
 };
 pub use partition_location::PartitionLocation;
 pub use shard_id::ShardId;
diff --git a/core/common/src/sharding/namespace.rs 
b/core/common/src/sharding/namespace.rs
index a0a7c21f3..ceb8d918e 100644
--- a/core/common/src/sharding/namespace.rs
+++ b/core/common/src/sharding/namespace.rs
@@ -49,6 +49,47 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1;
 pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
 pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum NamespaceCapacityError {
+    ZeroStreams,
+    ZeroTopics,
+    ZeroPartitions,
+    ExceedsU64 {
+        required_bits: u32,
+    },
+    ExceedsLayout {
+        stream_bits: u32,
+        topic_bits: u32,
+        partition_bits: u32,
+    },
+}
+
+impl std::fmt::Display for NamespaceCapacityError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::ZeroStreams => write!(f, "max_streams must be greater than 
0"),
+            Self::ZeroTopics => write!(f, "max_topics must be greater than 0"),
+            Self::ZeroPartitions => write!(f, "max_partitions must be greater 
than 0"),
+            Self::ExceedsU64 { required_bits } => write!(
+                f,
+                "namespace capacity requires {required_bits} bits, which does 
not fit in u64"
+            ),
+            Self::ExceedsLayout {
+                stream_bits,
+                topic_bits,
+                partition_bits,
+            } => write!(
+                f,
+                "namespace capacity requires stream/topic/partition bits of \
+                 {stream_bits}/{topic_bits}/{partition_bits}, but 
IggyNamespace supports \
+                 {STREAM_BITS}/{TOPIC_BITS}/{PARTITION_BITS}"
+            ),
+        }
+    }
+}
+
+impl std::error::Error for NamespaceCapacityError {}
+
 /// Packed namespace identifier for shard assignment.
 ///
 /// Encodes stream_id (12 bits), topic_id (12 bits), and partition_id (20 bits)
@@ -89,4 +130,81 @@ impl IggyNamespace {
             | ((partition as u64) & PARTITION_MASK) << PARTITION_SHIFT;
         Self(value)
     }
+
+    pub fn validate_capacity(
+        max_streams: usize,
+        max_topics: usize,
+        max_partitions: usize,
+    ) -> Result<(), NamespaceCapacityError> {
+        let stream_bits = if max_streams == 0 {
+            return Err(NamespaceCapacityError::ZeroStreams);
+        } else {
+            bits_required((max_streams - 1) as u64)
+        };
+        let topic_bits = if max_topics == 0 {
+            return Err(NamespaceCapacityError::ZeroTopics);
+        } else {
+            bits_required((max_topics - 1) as u64)
+        };
+        let partition_bits = if max_partitions == 0 {
+            return Err(NamespaceCapacityError::ZeroPartitions);
+        } else {
+            bits_required((max_partitions - 1) as u64)
+        };
+
+        let required_bits = stream_bits
+            .checked_add(topic_bits)
+            .and_then(|bits| bits.checked_add(partition_bits))
+            .ok_or(NamespaceCapacityError::ExceedsU64 {
+                required_bits: u32::MAX,
+            })?;
+        if required_bits > u64::BITS {
+            return Err(NamespaceCapacityError::ExceedsU64 { required_bits });
+        }
+
+        if stream_bits > STREAM_BITS || topic_bits > TOPIC_BITS || 
partition_bits > PARTITION_BITS {
+            return Err(NamespaceCapacityError::ExceedsLayout {
+                stream_bits,
+                topic_bits,
+                partition_bits,
+            });
+        }
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, 
NamespaceCapacityError};
+
+    #[test]
+    fn validates_default_namespace_capacity() {
+        assert!(IggyNamespace::validate_capacity(MAX_STREAMS, MAX_TOPICS, 
MAX_PARTITIONS).is_ok());
+    }
+
+    #[test]
+    fn rejects_zero_capacity_values() {
+        assert_eq!(
+            IggyNamespace::validate_capacity(0, MAX_TOPICS, MAX_PARTITIONS),
+            Err(NamespaceCapacityError::ZeroStreams)
+        );
+        assert_eq!(
+            IggyNamespace::validate_capacity(MAX_STREAMS, 0, MAX_PARTITIONS),
+            Err(NamespaceCapacityError::ZeroTopics)
+        );
+        assert_eq!(
+            IggyNamespace::validate_capacity(MAX_STREAMS, MAX_TOPICS, 0),
+            Err(NamespaceCapacityError::ZeroPartitions)
+        );
+    }
+
+    #[test]
+    fn rejects_capacity_that_exceeds_current_layout() {
+        let err = IggyNamespace::validate_capacity(MAX_STREAMS + 1, 
MAX_TOPICS, MAX_PARTITIONS);
+        assert!(matches!(
+            err,
+            Err(NamespaceCapacityError::ExceedsLayout { .. })
+        ));
+    }
 }
diff --git a/core/configs/src/server_config/defaults.rs 
b/core/configs/src/server_config/defaults.rs
index 25c6da929..98a03cbea 100644
--- a/core/configs/src/server_config/defaults.rs
+++ b/core/configs/src/server_config/defaults.rs
@@ -20,7 +20,7 @@ use super::cluster::{ClusterConfig, ClusterNodeConfig, 
TransportPorts};
 use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, 
HttpMetricsConfig, HttpTlsConfig};
 use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig};
 use super::server::{
-    ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, 
MemoryPoolConfig,
+    ConsumerGroupConfig, DataMaintenanceConfig, ExtraConfig, HeartbeatConfig, 
MemoryPoolConfig,
     MessageSaverConfig, MessagesMaintenanceConfig, 
PersonalAccessTokenCleanerConfig,
     PersonalAccessTokenConfig, ServerConfig, TelemetryConfig, 
TelemetryLogsConfig,
     TelemetryTracesConfig,
@@ -49,6 +49,7 @@ impl Default for ServerConfig {
         ServerConfig {
             consumer_group: ConsumerGroupConfig::default(),
             data_maintenance: DataMaintenanceConfig::default(),
+            extra: ExtraConfig::default(),
             heartbeat: HeartbeatConfig::default(),
             message_saver: MessageSaverConfig::default(),
             personal_access_token: PersonalAccessTokenConfig::default(),
diff --git a/core/configs/src/server_config/displays.rs 
b/core/configs/src/server_config/displays.rs
index 959d35476..9d61db3e4 100644
--- a/core/configs/src/server_config/displays.rs
+++ b/core/configs/src/server_config/displays.rs
@@ -19,8 +19,9 @@
 
 use super::quic::{QuicCertificateConfig, QuicConfig};
 use super::server::{
-    ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, 
MessagesMaintenanceConfig,
-    TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig,
+    ConsumerGroupConfig, DataMaintenanceConfig, ExtraConfig, HeartbeatConfig,
+    MessagesMaintenanceConfig, NamespaceConfig, TelemetryConfig, 
TelemetryLogsConfig,
+    TelemetryTracesConfig,
 };
 use super::system::MessageDeduplicationConfig;
 use super::{
@@ -156,9 +157,10 @@ impl Display for ServerConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ consumer_group: {}, data_maintenance: {}, message_saver: {}, 
heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, telemetry: {} }}",
+            "{{ consumer_group: {}, data_maintenance: {}, extra: {}, 
message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, 
telemetry: {} }}",
             self.consumer_group,
             self.data_maintenance,
+            self.extra,
             self.message_saver,
             self.heartbeat,
             self.system,
@@ -170,6 +172,22 @@ impl Display for ServerConfig {
     }
 }
 
+impl Display for ExtraConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{{ namespace: {} }}", self.namespace)
+    }
+}
+
+impl Display for NamespaceConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ max_streams: {}, max_topics: {}, max_partitions: {} }}",
+            self.max_streams, self.max_topics, self.max_partitions
+        )
+    }
+}
+
 impl Display for ConsumerGroupConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
diff --git a/core/configs/src/server_config/server.rs 
b/core/configs/src/server_config/server.rs
index 8c005f599..0a35351ce 100644
--- a/core/configs/src/server_config/server.rs
+++ b/core/configs/src/server_config/server.rs
@@ -30,7 +30,10 @@ use err_trail::ErrContext;
 use figment::providers::{Format, Toml};
 use figment::value::Dict;
 use figment::{Metadata, Profile, Provider};
-use iggy_common::{IggyByteSize, IggyDuration, MemoryPoolConfigOther, 
Validatable};
+use iggy_common::{
+    IggyByteSize, IggyDuration, MemoryPoolConfigOther, Validatable,
+    sharding::{MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS},
+};
 use serde::{Deserialize, Serialize};
 use serde_with::DisplayFromStr;
 use serde_with::serde_as;
@@ -45,6 +48,8 @@ const DEFAULT_CONFIG_PATH: &str = "core/server/config.toml";
 pub struct ServerConfig {
     pub consumer_group: ConsumerGroupConfig,
     pub data_maintenance: DataMaintenanceConfig,
+    #[serde(default)]
+    pub extra: ExtraConfig,
     pub message_saver: MessageSaverConfig,
     pub personal_access_token: PersonalAccessTokenConfig,
     pub heartbeat: HeartbeatConfig,
@@ -57,6 +62,29 @@ pub struct ServerConfig {
     pub cluster: ClusterConfig,
 }
 
+// TODO: Rename this to something more sensible, once we figure out all of the 
extra configuration we need.
+#[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)]
+pub struct ExtraConfig {
+    pub namespace: NamespaceConfig,
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
+pub struct NamespaceConfig {
+    pub max_streams: usize,
+    pub max_topics: usize,
+    pub max_partitions: usize,
+}
+
+impl Default for NamespaceConfig {
+    fn default() -> Self {
+        Self {
+            max_streams: MAX_STREAMS,
+            max_topics: MAX_TOPICS,
+            max_partitions: MAX_PARTITIONS,
+        }
+    }
+}
+
 /// Configuration for the memory pool.
 #[derive(Debug, Deserialize, Serialize, ConfigEnv)]
 pub struct MemoryPoolConfig {
@@ -183,9 +211,21 @@ impl ServerConfig {
     ///
     /// Uses compile-time generated env var mappings for unambiguous 
resolution.
     pub async fn load() -> Result<ServerConfig, ConfigurationError> {
+        Self::load_with_path(
+            DEFAULT_CONFIG_PATH,
+            include_str!("../../../server/config.toml"),
+        )
+        .await
+    }
+
+    pub async fn load_with_path(
+        default_config_path: &str,
+        default_config: &'static str,
+    ) -> Result<ServerConfig, ConfigurationError> {
         let config_path =
-            env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| 
DEFAULT_CONFIG_PATH.to_string());
-        let config_provider = ServerConfig::config_provider(&config_path);
+            env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| 
default_config_path.to_string());
+        let config_provider =
+            ServerConfig::config_provider_with_default(&config_path, 
default_config);
         let server_config: ServerConfig =
             config_provider
                 .load_config()
@@ -203,7 +243,15 @@ impl ServerConfig {
 
     /// Create a config provider using compile-time generated env var mappings.
     pub fn config_provider(config_path: &str) -> 
FileConfigProvider<ServerConfigEnvProvider> {
-        let default_config = 
Toml::string(include_str!("../../../server/config.toml"));
+        Self::config_provider_with_default(config_path, 
include_str!("../../../server/config.toml"))
+    }
+
+    /// Create a config provider using compile-time generated env var mappings.
+    pub fn config_provider_with_default(
+        config_path: &str,
+        default_config: &'static str,
+    ) -> FileConfigProvider<ServerConfigEnvProvider> {
+        let default_config = Toml::string(default_config);
         FileConfigProvider::new(
             config_path.to_string(),
             ServerConfigEnvProvider::default(),
diff --git a/core/configs/src/server_config/validators.rs 
b/core/configs/src/server_config/validators.rs
index 1512961d6..2e23e48a4 100644
--- a/core/configs/src/server_config/validators.rs
+++ b/core/configs/src/server_config/validators.rs
@@ -20,7 +20,8 @@
 use super::COMPONENT;
 use super::cluster::ClusterConfig;
 use super::server::{
-    DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, 
TelemetryConfig,
+    DataMaintenanceConfig, ExtraConfig, MessageSaverConfig, 
MessagesMaintenanceConfig,
+    NamespaceConfig, TelemetryConfig,
 };
 use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig};
 use super::sharding::{CpuAllocation, ShardingConfig};
@@ -32,6 +33,7 @@ use iggy_common::CompressionAlgorithm;
 use iggy_common::IggyExpiry;
 use iggy_common::MaxTopicSize;
 use iggy_common::Validatable;
+use iggy_common::sharding::IggyNamespace;
 use std::thread::available_parallelism;
 use tracing::warn;
 
@@ -76,6 +78,9 @@ impl Validatable<ConfigurationError> for ServerConfig {
                     "{COMPONENT} (error: {e}) - failed to validate personal 
access token config"
                 )
             })?;
+        self.extra.validate().error(|e: &ConfigurationError| {
+            format!("{COMPONENT} (error: {e}) - failed to validate extra 
config")
+        })?;
         self.system
             .segment
             .validate()
@@ -164,6 +169,26 @@ impl Validatable<ConfigurationError> for ServerConfig {
     }
 }
 
+impl Validatable<ConfigurationError> for ExtraConfig {
+    fn validate(&self) -> Result<(), ConfigurationError> {
+        self.namespace.validate().error(|e: &ConfigurationError| {
+            format!("{COMPONENT} (error: {e}) - failed to validate namespace 
config")
+        })?;
+        Ok(())
+    }
+}
+
+impl Validatable<ConfigurationError> for NamespaceConfig {
+    fn validate(&self) -> Result<(), ConfigurationError> {
+        IggyNamespace::validate_capacity(self.max_streams, self.max_topics, 
self.max_partitions)
+            .map_err(|error| {
+                eprintln!("extra.namespace is invalid: {error}");
+                ConfigurationError::InvalidConfigurationValue
+            })?;
+        Ok(())
+    }
+}
+
 impl Validatable<ConfigurationError> for CompressionConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         let compression_alg = &self.default_algorithm;
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 3aea30bd7..0e2f574c2 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -270,6 +270,14 @@ impl StreamsInner {
 }
 
 impl Streams {
+    #[must_use]
+    pub fn read<F, R>(&self, f: F) -> R
+    where
+        F: FnOnce(&StreamsInner) -> R,
+    {
+        self.inner.read(f)
+    }
+
     #[must_use]
     pub fn partition_count_context(
         &self,
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index f55e83bb0..3dd4c3f2e 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -31,8 +31,11 @@ mod types;
 use iggy_binary_protocol::{Message, PrepareHeader};
 use iggy_common::IggyError;
 pub use iggy_common::send_messages2::{IggyMessage2, IggyMessage2Header, 
IggyMessages2};
+pub use iggy_index_writer::IggyIndexWriter;
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
+pub use messages_writer::MessagesWriter;
+pub use segment::Segment;
 pub use types::{
     AppendResult, Fragment, PartitionOffsets, PartitionsConfig, PollFragments, 
PollQueryResult,
     PollingArgs, PollingConsumer, SendMessagesResult,
diff --git a/core/server-ng/Cargo.toml b/core/server-ng/Cargo.toml
index 79211fe7c..ce06b9dbb 100644
--- a/core/server-ng/Cargo.toml
+++ b/core/server-ng/Cargo.toml
@@ -105,7 +105,6 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"]
 
 [dependencies]
 ahash = { workspace = true }
-anyhow = { workspace = true }
 argon2 = { workspace = true }
 async-channel = { workspace = true }
 async_zip = { workspace = true }
@@ -116,6 +115,7 @@ chrono = { workspace = true }
 clap = { workspace = true }
 compio = { workspace = true }
 configs = { workspace = true }
+consensus = { workspace = true }
 ctrlc = { workspace = true }
 cyper = { workspace = true }
 cyper-axum = { workspace = true }
@@ -131,9 +131,11 @@ hash32 = { workspace = true }
 human-repr = { workspace = true }
 iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
+journal = { workspace = true }
 jsonwebtoken = { workspace = true }
 left-right = { workspace = true }
 metadata = { workspace = true }
+message_bus = { workspace = true }
 mimalloc = { workspace = true, optional = true }
 mime_guess = { workspace = true, optional = true }
 nix = { workspace = true }
@@ -143,6 +145,7 @@ opentelemetry-otlp = { workspace = true }
 opentelemetry-semantic-conventions = { workspace = true }
 opentelemetry_sdk = { workspace = true }
 papaya = { workspace = true }
+partitions = { workspace = true }
 prometheus-client = { workspace = true }
 rand = { workspace = true }
 ringbuffer = { workspace = true }
@@ -162,6 +165,8 @@ tempfile = { workspace = true }
 thiserror = { workspace = true }
 toml = { workspace = true }
 tower-http = { workspace = true }
+server = { workspace = true }
+shard = { workspace = true }
 tracing = { workspace = true }
 tracing-appender = { workspace = true }
 tracing-opentelemetry = { workspace = true }
diff --git a/core/server-ng/PLAN.md b/core/server-ng/PLAN.md
new file mode 100644
index 000000000..006b0a5bf
--- /dev/null
+++ b/core/server-ng/PLAN.md
@@ -0,0 +1,127 @@
+# Server-NG Bootstrap Plan
+
+## Goal
+
+Implement startup for the new `server-ng` binary using:
+
+- `partitions`
+- `consensus`
+- `message_bus`
+- `metadata`
+- `journal`
+
+and support:
+
+1. new `config.toml` handling in `server-ng`
+2. config loading/parsing
+3. metadata journal + snapshot loading
+4. `IggyMetadata` restoration
+5. partition + log loading (`IGGY-131`)
+6. single-shard bootstrap only, without transport/runtime infra for now
+
+## Plan
+
+### 1. Config surface
+
+- Reuse the existing server config surface instead of introducing a separate 
top-level `server-ng` config module.
+- Add an extra config section or extension on top of the current config for 
`server-ng`-specific additions.
+- The main new config requirement is broker capacity limits for:
+  - maximum streams
+  - maximum topics
+  - maximum partitions
+- Add validation that these three limits can be packed into a single `u64` 
namespace layout.
+- The validation must be tied directly to `IggyNamespace`, since it packs 
`stream_id/topic_id/partition_id` into `u64`.
+- Keep config loading through the existing config infrastructure, with the 
extra section layered on top.
+
+### 2. Server-NG config file
+
+- Create a dedicated `core/server-ng/config.toml`.
+- It should be exactly the same as the existing server config file, plus the 
extra `server-ng` additions.
+- Do not create a reduced or bootstrap-only config file.
+- Keep it structurally aligned with the current server config so existing 
parsing behavior can be reused as much as possible.
+
+### 3. Bootstrap entrypoint
+
+- Replace the placeholder `server-ng` main startup with a bootstrap flow.
+- Keep startup split into clear phases:
+  - initialize tracing/logging
+  - load config
+  - prepare filesystem paths
+  - restore metadata
+  - restore partitions
+  - build shard
+- Leave listeners/timers out for now.
+- Add an explicit `TODO` at the bootstrap boundary where transport/runtime 
services will be started once infrastructure exists.
+
+### 4. Metadata recovery
+
+- Reuse the existing recovery code from `core/metadata`.
+- Load:
+  - snapshot
+  - metadata WAL / journal
+- Restore the metadata state machine from snapshot.
+- Replay journal entries after the snapshot point.
+- Construct `IggyMetadata` from the recovered pieces.
+
+### 5. Consensus state restoration
+
+- Initialize it with recovered progress so sequence/commit state matches 
restored metadata.
+- Keep this scoped to single-replica / single-shard bootstrap first.
+
+### 6. Partition discovery
+
+- Read restored metadata to determine which partitions exist.
+- For each partition, resolve:
+  - stream id
+  - topic id
+  - partition id
+  - consensus group / namespace info
+  - persisted storage paths
+
+### 7. Partition + log loading
+
+- Reuse the existing partition and log loading code from the current server 
binary because the on-disk format and directory layout have not changed.
+- Use the current `server` binary startup path as the reference implementation.
+- In particular, trace the existing logic from the current server 
`main`/bootstrap flow and adapt that code path into `server-ng`.
+- Avoid inventing a parallel restore path unless extraction is required for 
reuse.
+- Scope this part of the work to wiring existing logic into the new bootstrap, 
not redesigning the storage format.
+
+### 8. Shard construction
+
+- Build one shard using:
+  - restored `IggyMetadata`
+  - restored `IggyPartitions`
+  - `message_bus`
+  - `consensus`
+- Keep the initial version fixed to one shard.
+- Ensure routing tables and namespace ownership are initialized consistently 
for that single shard.
+
+### 9. Runtime services
+
+- Skip runtime services for now.
+- Do not plan TCP listeners, timers, HTTP, QUIC, or related infra in this 
phase.
+- Leave a clear `TODO` in the bootstrap where these services will be attached 
later.
+
+### 10. Verification
+
+- Verify config parsing against `core/server-ng/config.toml`.
+- Run `cargo check -p server-ng`.
+- Smoke-test startup with:
+  - empty data directory
+  - existing metadata snapshot/journal
+  - existing partition logs
+
+## Suggested implementation order
+
+1. `core/server-ng/config.toml` aligned with existing server config plus extras
+2. metadata restore path
+3. partition restore path by reusing current server bootstrap logic
+4. shard construction
+5. compile + smoke verification
+
+## Notes from Feedback
+
+- Do not introduce a brand new top-level `server-ng` config model if the 
current config can be reused.
+- The only clearly identified new config need right now is namespace-capacity 
validation for packed `u64` `IggyNamespace`.
+- Do not scope this task around listeners or timer startup yet.
+- Prefer extracting or reusing existing server bootstrap code over 
reimplementing storage restore logic from scratch.
diff --git a/core/server-ng/config.toml b/core/server-ng/config.toml
index e24f73fce..3b8eebf22 100644
--- a/core/server-ng/config.toml
+++ b/core/server-ng/config.toml
@@ -126,6 +126,12 @@ decoding_secret = ""
 # `false` means the secret is in plain text.
 use_base64_secret = false
 
+# Trusted issuers for A2A (Application-to-Application) authentication
+[[http.jwt.trusted_issuers]]
+issuer = "test-issuer"
+jwks_url = "http://127.0.0.1:8081/.well-known/jwks.json";
+audience = "iggy.apache.org"
+
 # Metrics configuration for HTTP.
 [http.metrics]
 # Enable or disable the metrics endpoint.
@@ -583,3 +589,8 @@ enabled = false
 self_signed = true
 cert_file = "core/certs/iggy_cert.pem"
 key_file = "core/certs/iggy_key.pem"
+
+[extra.namespace]
+max_streams = 4096
+max_topics = 4096
+max_partitions = 1_000_000
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
new file mode 100644
index 000000000..1ddefecfb
--- /dev/null
+++ b/core/server-ng/src/bootstrap.rs
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server_error::ServerNgError;
+use configs::server::ServerConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
sharding::LocalIdx,
+    variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::IggyMessageBus;
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::shards_table::PapayaShardsTable;
+use shard::{IggyShard, PartitionConsensusConfig, ShardIdentity};
+use std::future::pending;
+use std::path::{Path, PathBuf};
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml";
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    IggyMessageBus,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+pub trait RunServerNg {
+    fn run(&self) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    async fn run(&self) -> Result<(), ServerNgError> {
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+        warn!("TODO: start listeners, timers, and runtime services once the 
new infra exists");
+        pending::<()>().await;
+        #[allow(unreachable_code)]
+        Ok(())
+    }
+}
+
+pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, 
ServerNgError> {
+    let config = ServerConfig::load_with_path(DEFAULT_CONFIG_PATH, 
include_str!("../config.toml"))
+        .await
+        .map_err(ServerNgError::Config)?;
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    
iggy_common::MemoryPool::init_pool(&config.system.memory_pool.into_other());
+
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered
+        .last_applied_op
+        .unwrap_or_else(|| recovered.snapshot.sequence_number());
+
+    let metadata = ServerNgMetadata::new(
+        Some(restore_metadata_consensus(&recovered.journal, restored_op)),
+        Some(recovered.journal),
+        Some(recovered.snapshot),
+        recovered.mux_stm,
+        Some(PathBuf::from(&config.system.path)),
+    );
+    let shard = Rc::new(build_single_shard(&config, metadata).await?);
+    info!(shard = shard.id, "server-ng bootstrap complete");
+
+    Ok(shard)
+}
+
+fn restore_metadata_consensus(
+    journal: &PrepareJournal,
+    restored_op: u64,
+) -> VsrConsensus<IggyMessageBus> {
+    let mut consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        SHARD_ID as u8,
+        1,
+        0,
+        IggyMessageBus::new(1, SHARD_ID, 0),
+        LocalPipeline::new(),
+    );
+
+    let last_header = journal
+        .last_op()
+        .and_then(|op| journal.header(op as usize).map(|header| *header));
+    if let Some(header) = last_header {
+        consensus.set_view(header.view);
+    }
+
+    consensus.init();
+    consensus.sequencer().set_sequence(restored_op);
+    consensus.advance_commit_max(restored_op);
+    for op in 1..=restored_op {
+        consensus.advance_commit_min(op);
+    }
+    if let Some(header) = last_header {
+        consensus.set_last_prepare_checksum(header.checksum);
+        consensus.set_log_view(header.view);
+    }
+
+    consensus
+}
+
+async fn build_single_shard(
+    config: &ServerConfig,
+    metadata: ServerNgMetadata,
+) -> Result<ServerNgShard, ServerNgError> {
+    let shard_id = ShardId::new(SHARD_ID);
+    let partition_count = metadata.mux_stm.streams().read(|inner| {
+        inner
+            .items
+            .iter()
+            .map(|(_, stream)| {
+                stream
+                    .topics
+                    .iter()
+                    .map(|(_, topic)| topic.partitions.len())
+                    .sum::<usize>()
+            })
+            .sum()
+    });
+    let mut partitions = IggyPartitions::with_capacity(
+        shard_id,
+        PartitionsConfig {
+            messages_required_to_save: 
config.system.partition.messages_required_to_save,
+            size_of_messages_required_to_save: config
+                .system
+                .partition
+                .size_of_messages_required_to_save,
+            enforce_fsync: config.system.partition.enforce_fsync,
+            segment_size: config.system.segment.size,
+        },
+        partition_count,
+    );
+    let shards_table = PapayaShardsTable::with_capacity(partition_count);
+
+    let mut namespaces = Vec::with_capacity(partition_count);
+    let _ = metadata.mux_stm.streams().read(|inner| {
+        for (_, stream) in inner.items.iter() {
+            for (topic_id, topic) in stream.topics.iter() {
+                for partition in &topic.partitions {
+                    namespaces.push((stream.id, topic_id, partition.clone()));
+                }
+            }
+        }
+    });
+
+    for (stream_id, topic_id, partition_metadata) in namespaces {
+        let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_metadata.id);
+        let partition = load_partition(config, namespace, 
&partition_metadata).await?;
+        let local_idx = partitions.insert(namespace, partition);
+        shards_table.insert(
+            namespace,
+            PartitionLocation::new(shard_id, LocalIdx::new(*local_idx)),
+        );
+    }
+
+    Ok(IggyShard::without_inbox(
+        ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()),
+        metadata,
+        partitions,
+        shards_table,
+        PartitionConsensusConfig::new(CLUSTER_ID, 1, IggyMessageBus::new(1, 
SHARD_ID, 0)),
+    ))
+}
+
+async fn load_partition(
+    config: &ServerConfig,
+    namespace: IggyNamespace,
+    partition_metadata: &Partition,
+) -> Result<IggyPartition<IggyMessageBus>, ServerNgError> {
+    let stream_id = namespace.stream_id();
+    let topic_id = namespace.topic_id();
+    let partition_id = namespace.partition_id();
+    let stats = Arc::new(PartitionStats::default());
+    let consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        SHARD_ID as u8,
+        1,
+        namespace.inner(),
+        IggyMessageBus::new(1, SHARD_ID, namespace.inner()),
+        LocalPipeline::new(),
+    );
+    consensus.init();
+
+    let loaded_log = server::bootstrap::load_segments(
+        &config.system,
+        stream_id,
+        topic_id,
+        partition_id,
+        config
+            .system
+            .get_partition_path(stream_id, topic_id, partition_id),
+        stats.clone(),
+    )
+    .await
+    .map_err(|source| ServerNgError::PartitionLogLoad {
+        stream_id,
+        topic_id,
+        partition_id,
+        source,
+    })?;
+
+    let mut partition = IggyPartition::new(stats.clone(), consensus);
+    hydrate_partition_log(
+        &mut partition,
+        config,
+        stream_id,
+        topic_id,
+        partition_id,
+        loaded_log,
+    )
+    .await?;
+
+    let current_offset = partition
+        .log
+        .segments()
+        .iter()
+        .filter(|segment| segment.size > IggyByteSize::default())
+        .map(|segment| segment.end_offset)
+        .max()
+        .unwrap_or(0);
+    partition.created_at = partition_metadata.created_at;
+    partition.offset.store(current_offset, Ordering::Release);
+    partition
+        .dirty_offset
+        .store(current_offset, Ordering::Relaxed);
+    partition.should_increment_offset = partition
+        .log
+        .segments()
+        .iter()
+        .any(|segment| segment.size > IggyByteSize::default());
+    partition.stats.set_current_offset(current_offset);
+    partition
+        .stats
+        .increment_segments_count(partition.log.segments().len() as u32);
+
+    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset)?;
+    ensure_initial_segment(&mut partition, config, stream_id, topic_id, 
partition_id).await?;
+
+    Ok(partition)
+}
+
+async fn hydrate_partition_log(
+    partition: &mut IggyPartition<IggyMessageBus>,
+    config: &ServerConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    loaded_log: server::streaming::partitions::log::SegmentedLog<
+        server::streaming::partitions::journal::MemoryMessageJournal,
+    >,
+) -> Result<(), ServerNgError> {
+    for (segment, storage) in loaded_log
+        .segments()
+        .iter()
+        .cloned()
+        .zip(loaded_log.storages().iter().cloned())
+    {
+        partition
+            .log
+            .add_persisted_segment(convert_segment(segment), storage, None, 
None);
+    }
+
+    if let Some(active_index) = partition.log.segments().len().checked_sub(1) {
+        let storage = &partition.log.storages()[active_index];
+        if let (Some(messages_reader), Some(index_reader)) = (
+            storage.messages_reader.as_ref(),
+            storage.index_reader.as_ref(),
+        ) {
+            let index_path = index_reader.path();
+            let index_size = std::fs::metadata(&index_path)
+                .map(|metadata| metadata.len())
+                .unwrap_or(0);
+            partition.log.messages_writers_mut()[active_index] = Some(Rc::new(
+                MessagesWriter::new(
+                    &messages_reader.path(),
+                    Rc::new(AtomicU64::new(messages_reader.file_size() as 
u64)),
+                    config.system.partition.enforce_fsync,
+                    true,
+                )
+                .await
+                .map_err(|source| ServerNgError::MessagesWriterInit {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    source,
+                })?,
+            ));
+            partition.log.index_writers_mut()[active_index] = Some(Rc::new(
+                IggyIndexWriter::new(
+                    &index_path,
+                    Rc::new(AtomicU64::new(index_size)),
+                    config.system.partition.enforce_fsync,
+                    true,
+                )
+                .await
+                .map_err(|source| ServerNgError::IndexWriterInit {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    source,
+                })?,
+            ));
+        }
+    } else {
+        let _ = (stream_id, topic_id, partition_id);
+    }
+
+    Ok(())
+}
+
+fn convert_segment(segment: iggy_common::Segment) -> Segment {
+    Segment {
+        sealed: segment.sealed,
+        start_timestamp: segment.start_timestamp,
+        end_timestamp: segment.end_timestamp,
+        max_timestamp: segment.end_timestamp,
+        current_position: u64::from(segment.current_position),
+        start_offset: segment.start_offset,
+        end_offset: segment.end_offset,
+        size: segment.size,
+        max_size: segment.max_size,
+    }
+}
+
+fn configure_consumer_offsets(
+    partition: &mut IggyPartition<IggyMessageBus>,
+    config: &ServerConfig,
+    namespace: IggyNamespace,
+    current_offset: u64,
+) -> Result<(), ServerNgError> {
+    let stream_id = namespace.stream_id();
+    let topic_id = namespace.topic_id();
+    let partition_id = namespace.partition_id();
+    let consumer_offsets_path =
+        config
+            .system
+            .get_consumer_offsets_path(stream_id, topic_id, partition_id);
+    let consumer_group_offsets_path =
+        config
+            .system
+            .get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id);
+
+    let loaded_consumer_offsets = 
load_consumer_offsets(&consumer_offsets_path).unwrap_or_default();
+    let consumer_offsets = 
ConsumerOffsets::with_capacity(loaded_consumer_offsets.len());
+    {
+        let guard = consumer_offsets.pin();
+        for offset in loaded_consumer_offsets {
+            if offset.offset.load(Ordering::Relaxed) > current_offset {
+                offset.offset.store(current_offset, Ordering::Relaxed);
+            }
+            guard.insert(offset.consumer_id as usize, offset);
+        }
+    }
+
+    let loaded_group_offsets =
+        
load_consumer_group_offsets(&consumer_group_offsets_path).unwrap_or_default();
+    let consumer_group_offsets = 
ConsumerGroupOffsets::with_capacity(loaded_group_offsets.len());
+    {
+        let guard = consumer_group_offsets.pin();
+        for (group_id, offset) in loaded_group_offsets {
+            if offset.offset.load(Ordering::Relaxed) > current_offset {
+                offset.offset.store(current_offset, Ordering::Relaxed);
+            }
+            guard.insert(group_id, offset);
+        }
+    }
+
+    partition.configure_consumer_offset_storage(
+        consumer_offsets_path,
+        consumer_group_offsets_path,
+        consumer_offsets,
+        consumer_group_offsets,
+        config.system.partition.enforce_fsync,
+    );
+
+    Ok(())
+}
+
+async fn ensure_initial_segment(
+    partition: &mut IggyPartition<IggyMessageBus>,
+    config: &ServerConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+) -> Result<(), ServerNgError> {
+    if partition.log.has_segments() {
+        return Ok(());
+    }
+
+    let storage =
+        create_segment_storage(&config.system, stream_id, topic_id, 
partition_id, 0, 0, 0)
+            .await
+            .map_err(|source| ServerNgError::InitialSegmentStorage {
+                stream_id,
+                topic_id,
+                partition_id,
+                source,
+            })?;
+    let messages_path = config
+        .system
+        .get_messages_file_path(stream_id, topic_id, partition_id, 0);
+    let index_path = config
+        .system
+        .get_index_path(stream_id, topic_id, partition_id, 0);
+    partition.log.add_persisted_segment(
+        Segment::new(0, config.system.segment.size),
+        storage,
+        Some(Rc::new(
+            MessagesWriter::new(
+                &messages_path,
+                Rc::new(AtomicU64::new(0)),
+                config.system.partition.enforce_fsync,
+                false,
+            )
+            .await
+            .map_err(|source| ServerNgError::MessagesWriterInit {
+                stream_id,
+                topic_id,
+                partition_id,
+                source,
+            })?,
+        )),
+        Some(Rc::new(
+            IggyIndexWriter::new(
+                &index_path,
+                Rc::new(AtomicU64::new(0)),
+                config.system.partition.enforce_fsync,
+                false,
+            )
+            .await
+            .map_err(|source| ServerNgError::IndexWriterInit {
+                stream_id,
+                topic_id,
+                partition_id,
+                source,
+            })?,
+        )),
+    );
+    partition.stats.increment_segments_count(1);
+
+    Ok(())
+}
diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs
index caae94a3a..c8c412834 100644
--- a/core/server-ng/src/lib.rs
+++ b/core/server-ng/src/lib.rs
@@ -17,5 +17,7 @@
  * under the License.
  */
 
+pub mod bootstrap;
 pub mod login_register;
+pub mod server_error;
 pub mod session_manager;
diff --git a/core/server-ng/src/main.rs b/core/server-ng/src/main.rs
index ffc8ee202..b6a7ef885 100644
--- a/core/server-ng/src/main.rs
+++ b/core/server-ng/src/main.rs
@@ -17,7 +17,22 @@
  * under the License.
  */
 
-fn main() {
-    tracing_subscriber::fmt::init();
-    tracing::info!("iggy-server-ng starting...");
+use server_ng::server_error::ServerNgError;
+
+fn main() -> Result<(), ServerNgError> {
+    let runtime = compio::runtime::Runtime::new().expect("failed to create 
compio runtime");
+    runtime.block_on(async {
+        if let Ok(env_path) = std::env::var("IGGY_ENV_PATH") {
+            let _ = dotenvy::from_path(&env_path);
+        } else {
+            let _ = dotenvy::dotenv();
+        }
+
+        use server_ng::bootstrap::RunServerNg;
+        let mut logging = server::log::logger::Logging::new();
+        logging.early_init();
+
+        let shard = server_ng::bootstrap::bootstrap(&mut logging).await?;
+        shard.run().await
+    })
 }
diff --git a/core/server-ng/src/server_error.rs 
b/core/server-ng/src/server_error.rs
new file mode 100644
index 000000000..c071f1608
--- /dev/null
+++ b/core/server-ng/src/server_error.rs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use metadata::impls::recovery::RecoveryError;
+use server::server_error::LogError;
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum ServerNgError {
+    #[error("failed to load server-ng config")]
+    Config(#[source] configs::ConfigurationError),
+    #[error("failed to prepare server-ng directories")]
+    CreateDirectories(#[source] iggy_common::IggyError),
+    #[error("failed to initialize server-ng logging")]
+    Logging(#[source] LogError),
+    #[error("failed to recover metadata snapshot and journal")]
+    MetadataRecovery(#[source] RecoveryError),
+    #[error(
+        "failed to load partition log for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
+    )]
+    PartitionLogLoad {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        #[source]
+        source: iggy_common::IggyError,
+    },
+    #[error(
+        "failed to initialize messages writer for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
+    )]
+    MessagesWriterInit {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        #[source]
+        source: iggy_common::IggyError,
+    },
+    #[error(
+        "failed to initialize index writer for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
+    )]
+    IndexWriterInit {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        #[source]
+        source: iggy_common::IggyError,
+    },
+    #[error(
+        "failed to create initial segment storage for stream {stream_id}, 
topic {topic_id}, partition {partition_id}"
+    )]
+    InitialSegmentStorage {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        #[source]
+        source: iggy_common::IggyError,
+    },
+}


Reply via email to