This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch integration_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ce87d2c97b5e519eefa12f6fb75a43503b270d6a Author: numinex <[email protected]> AuthorDate: Tue Apr 21 13:52:26 2026 +0200 temp --- core/configs/src/server_config/defaults.rs | 3 +- core/configs/src/server_config/displays.rs | 24 ++++- core/configs/src/server_config/server.rs | 30 ++++++- core/configs/src/server_config/validators.rs | 27 +++++- core/server-ng/PLAN.md | 127 +++++++++++++++++++++++++++ 5 files changed, 205 insertions(+), 6 deletions(-) diff --git a/core/configs/src/server_config/defaults.rs b/core/configs/src/server_config/defaults.rs index 25c6da929..98a03cbea 100644 --- a/core/configs/src/server_config/defaults.rs +++ b/core/configs/src/server_config/defaults.rs @@ -20,7 +20,7 @@ use super::cluster::{ClusterConfig, ClusterNodeConfig, TransportPorts}; use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig}; use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; use super::server::{ - ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, MemoryPoolConfig, + ConsumerGroupConfig, DataMaintenanceConfig, ExtraConfig, HeartbeatConfig, MemoryPoolConfig, MessageSaverConfig, MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, ServerConfig, TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, @@ -49,6 +49,7 @@ impl Default for ServerConfig { ServerConfig { consumer_group: ConsumerGroupConfig::default(), data_maintenance: DataMaintenanceConfig::default(), + extra: ExtraConfig::default(), heartbeat: HeartbeatConfig::default(), message_saver: MessageSaverConfig::default(), personal_access_token: PersonalAccessTokenConfig::default(), diff --git a/core/configs/src/server_config/displays.rs b/core/configs/src/server_config/displays.rs index 959d35476..9d61db3e4 100644 --- a/core/configs/src/server_config/displays.rs +++ b/core/configs/src/server_config/displays.rs @@ -19,8 +19,9 @@ use super::quic::{QuicCertificateConfig, QuicConfig}; use super::server::{ - ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, MessagesMaintenanceConfig, - TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, + ConsumerGroupConfig, DataMaintenanceConfig, ExtraConfig, HeartbeatConfig, + MessagesMaintenanceConfig, NamespaceConfig, TelemetryConfig, TelemetryLogsConfig, + TelemetryTracesConfig, }; use super::system::MessageDeduplicationConfig; use super::{ @@ -156,9 +157,10 @@ impl Display for ServerConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ consumer_group: {}, data_maintenance: {}, message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, telemetry: {} }}", + "{{ consumer_group: {}, data_maintenance: {}, extra: {}, message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, telemetry: {} }}", self.consumer_group, self.data_maintenance, + self.extra, self.message_saver, self.heartbeat, self.system, @@ -170,6 +172,22 @@ impl Display for ServerConfig { } } +impl Display for ExtraConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{{ namespace: {} }}", self.namespace) + } +} + +impl Display for NamespaceConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ max_streams: {}, max_topics: {}, max_partitions: {} }}", + self.max_streams, self.max_topics, self.max_partitions + ) + } +} + impl Display for ConsumerGroupConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( diff --git a/core/configs/src/server_config/server.rs b/core/configs/src/server_config/server.rs index fbfb1d458..0a35351ce 100644 --- a/core/configs/src/server_config/server.rs +++ b/core/configs/src/server_config/server.rs @@ -30,7 +30,10 @@ use err_trail::ErrContext; use figment::providers::{Format, Toml}; use figment::value::Dict; use figment::{Metadata, Profile, Provider}; -use iggy_common::{IggyByteSize, IggyDuration, MemoryPoolConfigOther, Validatable}; +use iggy_common::{ + IggyByteSize, IggyDuration, MemoryPoolConfigOther, Validatable, + sharding::{MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS}, +}; use serde::{Deserialize, Serialize}; use serde_with::DisplayFromStr; use serde_with::serde_as; @@ -45,6 +48,8 @@ const DEFAULT_CONFIG_PATH: &str = "core/server/config.toml"; pub struct ServerConfig { pub consumer_group: ConsumerGroupConfig, pub data_maintenance: DataMaintenanceConfig, + #[serde(default)] + pub extra: ExtraConfig, pub message_saver: MessageSaverConfig, pub personal_access_token: PersonalAccessTokenConfig, pub heartbeat: HeartbeatConfig, @@ -57,6 +62,29 @@ pub struct ServerConfig { pub cluster: ClusterConfig, } +// TODO: Rename this to something more sensible, once we figure out all of the extra configuration we need. +#[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)] +pub struct ExtraConfig { + pub namespace: NamespaceConfig, +} + +#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] +pub struct NamespaceConfig { + pub max_streams: usize, + pub max_topics: usize, + pub max_partitions: usize, +} + +impl Default for NamespaceConfig { + fn default() -> Self { + Self { + max_streams: MAX_STREAMS, + max_topics: MAX_TOPICS, + max_partitions: MAX_PARTITIONS, + } + } +} + /// Configuration for the memory pool. #[derive(Debug, Deserialize, Serialize, ConfigEnv)] pub struct MemoryPoolConfig { diff --git a/core/configs/src/server_config/validators.rs b/core/configs/src/server_config/validators.rs index 1512961d6..2e23e48a4 100644 --- a/core/configs/src/server_config/validators.rs +++ b/core/configs/src/server_config/validators.rs @@ -20,7 +20,8 @@ use super::COMPONENT; use super::cluster::ClusterConfig; use super::server::{ - DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, TelemetryConfig, + DataMaintenanceConfig, ExtraConfig, MessageSaverConfig, MessagesMaintenanceConfig, + NamespaceConfig, TelemetryConfig, }; use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig}; use super::sharding::{CpuAllocation, ShardingConfig}; @@ -32,6 +33,7 @@ use iggy_common::CompressionAlgorithm; use iggy_common::IggyExpiry; use iggy_common::MaxTopicSize; use iggy_common::Validatable; +use iggy_common::sharding::IggyNamespace; use std::thread::available_parallelism; use tracing::warn; @@ -76,6 +78,9 @@ impl Validatable<ConfigurationError> for ServerConfig { "{COMPONENT} (error: {e}) - failed to validate personal access token config" ) })?; + self.extra.validate().error(|e: &ConfigurationError| { + format!("{COMPONENT} (error: {e}) - failed to validate extra config") + })?; self.system .segment .validate() @@ -164,6 +169,26 @@ impl Validatable<ConfigurationError> for ServerConfig { } } +impl Validatable<ConfigurationError> for ExtraConfig { + fn validate(&self) -> Result<(), ConfigurationError> { + self.namespace.validate().error(|e: &ConfigurationError| { + format!("{COMPONENT} (error: {e}) - failed to validate namespace config") + })?; + Ok(()) + } +} + +impl Validatable<ConfigurationError> for NamespaceConfig { + fn validate(&self) -> Result<(), ConfigurationError> { + IggyNamespace::validate_capacity(self.max_streams, self.max_topics, self.max_partitions) + .map_err(|error| { + eprintln!("extra.namespace is invalid: {error}"); + ConfigurationError::InvalidConfigurationValue + })?; + Ok(()) + } +} + impl Validatable<ConfigurationError> for CompressionConfig { fn validate(&self) -> Result<(), ConfigurationError> { let compression_alg = &self.default_algorithm; diff --git a/core/server-ng/PLAN.md b/core/server-ng/PLAN.md new file mode 100644 index 000000000..006b0a5bf --- /dev/null +++ b/core/server-ng/PLAN.md @@ -0,0 +1,127 @@ +# Server-NG Bootstrap Plan + +## Goal + +Implement startup for the new `server-ng` binary using: + +- `partitions` +- `consensus` +- `message_bus` +- `metadata` +- `journal` + +and support: + +1. new `config.toml` handling in `server-ng` +2. config loading/parsing +3. metadata journal + snapshot loading +4. `IggyMetadata` restoration +5. partition + log loading (`IGGY-131`) +6. single-shard bootstrap only, without transport/runtime infra for now + +## Plan + +### 1. Config surface + +- Reuse the existing server config surface instead of introducing a separate top-level `server-ng` config module. +- Add an extra config section or extension on top of the current config for `server-ng`-specific additions. +- The main new config requirement is broker capacity limits for: + - maximum streams + - maximum topics + - maximum partitions +- Add validation that these three limits can be packed into a single `u64` namespace layout. +- The validation must be tied directly to `IggyNamespace`, since it packs `stream_id/topic_id/partition_id` into `u64`. +- Keep config loading through the existing config infrastructure, with the extra section layered on top. + +### 2. Server-NG config file + +- Create a dedicated `core/server-ng/config.toml`. +- It should be exactly the same as the existing server config file, plus the extra `server-ng` additions. +- Do not create a reduced or bootstrap-only config file. +- Keep it structurally aligned with the current server config so existing parsing behavior can be reused as much as possible. + +### 3. Bootstrap entrypoint + +- Replace the placeholder `server-ng` main startup with a bootstrap flow. +- Keep startup split into clear phases: + - initialize tracing/logging + - load config + - prepare filesystem paths + - restore metadata + - restore partitions + - build shard +- Leave listeners/timers out for now. +- Add an explicit `TODO` at the bootstrap boundary where transport/runtime services will be started once infrastructure exists. + +### 4. Metadata recovery + +- Reuse the existing recovery code from `core/metadata`. +- Load: + - snapshot + - metadata WAL / journal +- Restore the metadata state machine from snapshot. +- Replay journal entries after the snapshot point. +- Construct `IggyMetadata` from the recovered pieces. + +### 5. Consensus state restoration + +- Initialize it with recovered progress so sequence/commit state matches restored metadata. +- Keep this scoped to single-replica / single-shard bootstrap first. + +### 6. Partition discovery + +- Read restored metadata to determine which partitions exist. +- For each partition, resolve: + - stream id + - topic id + - partition id + - consensus group / namespace info + - persisted storage paths + +### 7. Partition + log loading + +- Reuse the existing partition and log loading code from the current server binary because the on-disk format and directory layout have not changed. +- Use the current `server` binary startup path as the reference implementation. +- In particular, trace the existing logic from the current server `main`/bootstrap flow and adapt that code path into `server-ng`. +- Avoid inventing a parallel restore path unless extraction is required for reuse. +- Scope this part of the work to wiring existing logic into the new bootstrap, not redesigning the storage format. + +### 8. Shard construction + +- Build one shard using: + - restored `IggyMetadata` + - restored `IggyPartitions` + - `message_bus` + - `consensus` +- Keep the initial version fixed to one shard. +- Ensure routing tables and namespace ownership are initialized consistently for that single shard. + +### 9. Runtime services + +- Skip runtime services for now. +- Do not plan TCP listeners, timers, HTTP, QUIC, or related infra in this phase. +- Leave a clear `TODO` in the bootstrap where these services will be attached later. + +### 10. Verification + +- Verify config parsing against `core/server-ng/config.toml`. +- Run `cargo check -p server-ng`. +- Smoke-test startup with: + - empty data directory + - existing metadata snapshot/journal + - existing partition logs + +## Suggested implementation order + +1. `core/server-ng/config.toml` aligned with existing server config plus extras +2. metadata restore path +3. partition restore path by reusing current server bootstrap logic +4. shard construction +5. compile + smoke verification + +## Notes from Feedback + +- Do not introduce a brand new top-level `server-ng` config model if the current config can be reused. +- The only clearly identified new config need right now is namespace-capacity validation for packed `u64` `IggyNamespace`. +- Do not scope this task around listeners or timer startup yet. +- Prefer extracting or reusing existing server bootstrap code over reimplementing storage restore logic from scratch.
