This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch replica_bootstrap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fbafdee10dcbde194a7249c9345adf77cf1020fe Author: numinex <[email protected]> AuthorDate: Tue Apr 21 13:52:26 2026 +0200 temp --- Cargo.lock | 7 +- Cargo.toml | 1 + core/common/src/sharding/mod.rs | 5 +- core/common/src/sharding/namespace.rs | 118 +++++++ core/configs/src/server_config/defaults.rs | 3 +- core/configs/src/server_config/displays.rs | 24 +- core/configs/src/server_config/server.rs | 56 ++- core/configs/src/server_config/validators.rs | 27 +- core/metadata/src/stm/stream.rs | 8 + core/partitions/src/lib.rs | 3 + core/server-ng/Cargo.toml | 7 +- core/server-ng/PLAN.md | 127 +++++++ core/server-ng/config.toml | 11 + core/server-ng/src/bootstrap.rs | 504 +++++++++++++++++++++++++++ core/server-ng/src/lib.rs | 2 + core/server-ng/src/main.rs | 21 +- core/server-ng/src/server_error.rs | 74 ++++ 17 files changed, 982 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb0d18d90..ec90a34c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10165,7 +10165,6 @@ name = "server-ng" version = "0.8.0" dependencies = [ "ahash 0.8.12", - "anyhow", "argon2", "async-channel", "async_zip", @@ -10176,6 +10175,7 @@ dependencies = [ "clap", "compio", "configs", + "consensus", "ctrlc", "cyper", "cyper-axum", @@ -10192,8 +10192,10 @@ dependencies = [ "hwlocality", "iggy_binary_protocol", "iggy_common", + "journal", "jsonwebtoken", "left-right", + "message_bus", "metadata", "mimalloc", "mime_guess", @@ -10204,6 +10206,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk", "papaya", + "partitions", "prometheus-client", "rand 0.10.1", "ringbuffer", @@ -10215,6 +10218,8 @@ dependencies = [ "secrecy", "send_wrapper", "serde", + "server", + "shard", "slab", "socket2 0.6.3", "strum 0.28.0", diff --git a/Cargo.toml b/Cargo.toml index 250c81acc..ba1f59ff7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -254,6 +254,7 @@ serde_yaml_ng = "0.10.0" serial_test = "3.4.0" server = { path = "core/server" } server-ng = { path = "core/server-ng" } +shard = { path = "core/shard" } simd-json = { version = "0.17.0", features = ["serde_impl"] } slab = "0.4.12" smallvec = "1.15" diff --git a/core/common/src/sharding/mod.rs b/core/common/src/sharding/mod.rs index 460c04ec3..051802ecf 100644 --- a/core/common/src/sharding/mod.rs +++ b/core/common/src/sharding/mod.rs @@ -22,8 +22,9 @@ mod shard_id; pub use local_idx::LocalIdx; pub use namespace::{ - IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, PARTITION_MASK, - PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, TOPIC_MASK, TOPIC_SHIFT, + IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, NamespaceCapacityError, PARTITION_BITS, + PARTITION_MASK, PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, + TOPIC_MASK, TOPIC_SHIFT, }; pub use partition_location::PartitionLocation; pub use shard_id::ShardId; diff --git a/core/common/src/sharding/namespace.rs b/core/common/src/sharding/namespace.rs index a0a7c21f3..ceb8d918e 100644 --- a/core/common/src/sharding/namespace.rs +++ b/core/common/src/sharding/namespace.rs @@ -49,6 +49,47 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1; pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1; pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NamespaceCapacityError { + ZeroStreams, + ZeroTopics, + ZeroPartitions, + ExceedsU64 { + required_bits: u32, + }, + ExceedsLayout { + stream_bits: u32, + topic_bits: u32, + partition_bits: u32, + }, +} + +impl std::fmt::Display for NamespaceCapacityError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ZeroStreams => write!(f, "max_streams must be greater than 0"), + Self::ZeroTopics => write!(f, "max_topics must be greater than 0"), + Self::ZeroPartitions => write!(f, "max_partitions must be greater than 0"), + Self::ExceedsU64 { required_bits } => write!( + f, + "namespace capacity requires {required_bits} bits, which does not fit in u64" + ), + Self::ExceedsLayout { + stream_bits, + topic_bits, + partition_bits, + } => write!( + f, + "namespace capacity requires stream/topic/partition bits of \ + {stream_bits}/{topic_bits}/{partition_bits}, but IggyNamespace supports \ + {STREAM_BITS}/{TOPIC_BITS}/{PARTITION_BITS}" + ), + } + } +} + +impl std::error::Error for NamespaceCapacityError {} + /// Packed namespace identifier for shard assignment. /// /// Encodes stream_id (12 bits), topic_id (12 bits), and partition_id (20 bits) @@ -89,4 +130,81 @@ impl IggyNamespace { | ((partition as u64) & PARTITION_MASK) << PARTITION_SHIFT; Self(value) } + + pub fn validate_capacity( + max_streams: usize, + max_topics: usize, + max_partitions: usize, + ) -> Result<(), NamespaceCapacityError> { + let stream_bits = if max_streams == 0 { + return Err(NamespaceCapacityError::ZeroStreams); + } else { + bits_required((max_streams - 1) as u64) + }; + let topic_bits = if max_topics == 0 { + return Err(NamespaceCapacityError::ZeroTopics); + } else { + bits_required((max_topics - 1) as u64) + }; + let partition_bits = if max_partitions == 0 { + return Err(NamespaceCapacityError::ZeroPartitions); + } else { + bits_required((max_partitions - 1) as u64) + }; + + let required_bits = stream_bits + .checked_add(topic_bits) + .and_then(|bits| bits.checked_add(partition_bits)) + .ok_or(NamespaceCapacityError::ExceedsU64 { + required_bits: u32::MAX, + })?; + if required_bits > u64::BITS { + return Err(NamespaceCapacityError::ExceedsU64 { required_bits }); + } + + if stream_bits > STREAM_BITS || topic_bits > TOPIC_BITS || partition_bits > PARTITION_BITS { + return Err(NamespaceCapacityError::ExceedsLayout { + stream_bits, + topic_bits, + partition_bits, + }); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::{IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, NamespaceCapacityError}; + + #[test] + fn validates_default_namespace_capacity() { + assert!(IggyNamespace::validate_capacity(MAX_STREAMS, MAX_TOPICS, MAX_PARTITIONS).is_ok()); + } + + #[test] + fn rejects_zero_capacity_values() { + assert_eq!( + IggyNamespace::validate_capacity(0, MAX_TOPICS, MAX_PARTITIONS), + Err(NamespaceCapacityError::ZeroStreams) + ); + assert_eq!( + IggyNamespace::validate_capacity(MAX_STREAMS, 0, MAX_PARTITIONS), + Err(NamespaceCapacityError::ZeroTopics) + ); + assert_eq!( + IggyNamespace::validate_capacity(MAX_STREAMS, MAX_TOPICS, 0), + Err(NamespaceCapacityError::ZeroPartitions) + ); + } + + #[test] + fn rejects_capacity_that_exceeds_current_layout() { + let err = IggyNamespace::validate_capacity(MAX_STREAMS + 1, MAX_TOPICS, MAX_PARTITIONS); + assert!(matches!( + err, + Err(NamespaceCapacityError::ExceedsLayout { .. }) + )); + } } diff --git a/core/configs/src/server_config/defaults.rs b/core/configs/src/server_config/defaults.rs index 25c6da929..98a03cbea 100644 --- a/core/configs/src/server_config/defaults.rs +++ b/core/configs/src/server_config/defaults.rs @@ -20,7 +20,7 @@ use super::cluster::{ClusterConfig, ClusterNodeConfig, TransportPorts}; use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig}; use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; use super::server::{ - ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, MemoryPoolConfig, + ConsumerGroupConfig, DataMaintenanceConfig, ExtraConfig, HeartbeatConfig, MemoryPoolConfig, MessageSaverConfig, MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, ServerConfig, TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, @@ -49,6 +49,7 @@ impl Default for ServerConfig { ServerConfig { consumer_group: ConsumerGroupConfig::default(), data_maintenance: DataMaintenanceConfig::default(), + extra: ExtraConfig::default(), heartbeat: HeartbeatConfig::default(), message_saver: MessageSaverConfig::default(), personal_access_token: PersonalAccessTokenConfig::default(), diff --git a/core/configs/src/server_config/displays.rs b/core/configs/src/server_config/displays.rs index 959d35476..9d61db3e4 100644 --- a/core/configs/src/server_config/displays.rs +++ b/core/configs/src/server_config/displays.rs @@ -19,8 +19,9 @@ use super::quic::{QuicCertificateConfig, QuicConfig}; use super::server::{ - ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, MessagesMaintenanceConfig, - TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, + ConsumerGroupConfig, DataMaintenanceConfig, ExtraConfig, HeartbeatConfig, + MessagesMaintenanceConfig, NamespaceConfig, TelemetryConfig, TelemetryLogsConfig, + TelemetryTracesConfig, }; use super::system::MessageDeduplicationConfig; use super::{ @@ -156,9 +157,10 @@ impl Display for ServerConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ consumer_group: {}, data_maintenance: {}, message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, telemetry: {} }}", + "{{ consumer_group: {}, data_maintenance: {}, extra: {}, message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, telemetry: {} }}", self.consumer_group, self.data_maintenance, + self.extra, self.message_saver, self.heartbeat, self.system, @@ -170,6 +172,22 @@ impl Display for ServerConfig { } } +impl Display for ExtraConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{{ namespace: {} }}", self.namespace) + } +} + +impl Display for NamespaceConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ max_streams: {}, max_topics: {}, max_partitions: {} }}", + self.max_streams, self.max_topics, self.max_partitions + ) + } +} + impl Display for ConsumerGroupConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( diff --git a/core/configs/src/server_config/server.rs b/core/configs/src/server_config/server.rs index 8c005f599..0a35351ce 100644 --- a/core/configs/src/server_config/server.rs +++ b/core/configs/src/server_config/server.rs @@ -30,7 +30,10 @@ use err_trail::ErrContext; use figment::providers::{Format, Toml}; use figment::value::Dict; use figment::{Metadata, Profile, Provider}; -use iggy_common::{IggyByteSize, IggyDuration, MemoryPoolConfigOther, Validatable}; +use iggy_common::{ + IggyByteSize, IggyDuration, MemoryPoolConfigOther, Validatable, + sharding::{MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS}, +}; use serde::{Deserialize, Serialize}; use serde_with::DisplayFromStr; use serde_with::serde_as; @@ -45,6 +48,8 @@ const DEFAULT_CONFIG_PATH: &str = "core/server/config.toml"; pub struct ServerConfig { pub consumer_group: ConsumerGroupConfig, pub data_maintenance: DataMaintenanceConfig, + #[serde(default)] + pub extra: ExtraConfig, pub message_saver: MessageSaverConfig, pub personal_access_token: PersonalAccessTokenConfig, pub heartbeat: HeartbeatConfig, @@ -57,6 +62,29 @@ pub struct ServerConfig { pub cluster: ClusterConfig, } +// TODO: Rename this to something more sensible, once we figure out all of the extra configuration we need. +#[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)] +pub struct ExtraConfig { + pub namespace: NamespaceConfig, +} + +#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] +pub struct NamespaceConfig { + pub max_streams: usize, + pub max_topics: usize, + pub max_partitions: usize, +} + +impl Default for NamespaceConfig { + fn default() -> Self { + Self { + max_streams: MAX_STREAMS, + max_topics: MAX_TOPICS, + max_partitions: MAX_PARTITIONS, + } + } +} + /// Configuration for the memory pool. #[derive(Debug, Deserialize, Serialize, ConfigEnv)] pub struct MemoryPoolConfig { @@ -183,9 +211,21 @@ impl ServerConfig { /// /// Uses compile-time generated env var mappings for unambiguous resolution. pub async fn load() -> Result<ServerConfig, ConfigurationError> { + Self::load_with_path( + DEFAULT_CONFIG_PATH, + include_str!("../../../server/config.toml"), + ) + .await + } + + pub async fn load_with_path( + default_config_path: &str, + default_config: &'static str, + ) -> Result<ServerConfig, ConfigurationError> { let config_path = - env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| DEFAULT_CONFIG_PATH.to_string()); - let config_provider = ServerConfig::config_provider(&config_path); + env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| default_config_path.to_string()); + let config_provider = + ServerConfig::config_provider_with_default(&config_path, default_config); let server_config: ServerConfig = config_provider .load_config() @@ -203,7 +243,15 @@ impl ServerConfig { /// Create a config provider using compile-time generated env var mappings. pub fn config_provider(config_path: &str) -> FileConfigProvider<ServerConfigEnvProvider> { - let default_config = Toml::string(include_str!("../../../server/config.toml")); + Self::config_provider_with_default(config_path, include_str!("../../../server/config.toml")) + } + + /// Create a config provider using compile-time generated env var mappings. + pub fn config_provider_with_default( + config_path: &str, + default_config: &'static str, + ) -> FileConfigProvider<ServerConfigEnvProvider> { + let default_config = Toml::string(default_config); FileConfigProvider::new( config_path.to_string(), ServerConfigEnvProvider::default(), diff --git a/core/configs/src/server_config/validators.rs b/core/configs/src/server_config/validators.rs index 1512961d6..2e23e48a4 100644 --- a/core/configs/src/server_config/validators.rs +++ b/core/configs/src/server_config/validators.rs @@ -20,7 +20,8 @@ use super::COMPONENT; use super::cluster::ClusterConfig; use super::server::{ - DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, TelemetryConfig, + DataMaintenanceConfig, ExtraConfig, MessageSaverConfig, MessagesMaintenanceConfig, + NamespaceConfig, TelemetryConfig, }; use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig}; use super::sharding::{CpuAllocation, ShardingConfig}; @@ -32,6 +33,7 @@ use iggy_common::CompressionAlgorithm; use iggy_common::IggyExpiry; use iggy_common::MaxTopicSize; use iggy_common::Validatable; +use iggy_common::sharding::IggyNamespace; use std::thread::available_parallelism; use tracing::warn; @@ -76,6 +78,9 @@ impl Validatable<ConfigurationError> for ServerConfig { "{COMPONENT} (error: {e}) - failed to validate personal access token config" ) })?; + self.extra.validate().error(|e: &ConfigurationError| { + format!("{COMPONENT} (error: {e}) - failed to validate extra config") + })?; self.system .segment .validate() @@ -164,6 +169,26 @@ impl Validatable<ConfigurationError> for ServerConfig { } } +impl Validatable<ConfigurationError> for ExtraConfig { + fn validate(&self) -> Result<(), ConfigurationError> { + self.namespace.validate().error(|e: &ConfigurationError| { + format!("{COMPONENT} (error: {e}) - failed to validate namespace config") + })?; + Ok(()) + } +} + +impl Validatable<ConfigurationError> for NamespaceConfig { + fn validate(&self) -> Result<(), ConfigurationError> { + IggyNamespace::validate_capacity(self.max_streams, self.max_topics, self.max_partitions) + .map_err(|error| { + eprintln!("extra.namespace is invalid: {error}"); + ConfigurationError::InvalidConfigurationValue + })?; + Ok(()) + } +} + impl Validatable<ConfigurationError> for CompressionConfig { fn validate(&self) -> Result<(), ConfigurationError> { let compression_alg = &self.default_algorithm; diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 3aea30bd7..0e2f574c2 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -270,6 +270,14 @@ impl StreamsInner { } impl Streams { + #[must_use] + pub fn read<F, R>(&self, f: F) -> R + where + F: FnOnce(&StreamsInner) -> R, + { + self.inner.read(f) + } + #[must_use] pub fn partition_count_context( &self, diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index f55e83bb0..3dd4c3f2e 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -31,8 +31,11 @@ mod types; use iggy_binary_protocol::{Message, PrepareHeader}; use iggy_common::IggyError; pub use iggy_common::send_messages2::{IggyMessage2, IggyMessage2Header, IggyMessages2}; +pub use iggy_index_writer::IggyIndexWriter; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; +pub use messages_writer::MessagesWriter; +pub use segment::Segment; pub use types::{ AppendResult, Fragment, PartitionOffsets, PartitionsConfig, PollFragments, PollQueryResult, PollingArgs, PollingConsumer, SendMessagesResult, diff --git a/core/server-ng/Cargo.toml b/core/server-ng/Cargo.toml index 79211fe7c..ce06b9dbb 100644 --- a/core/server-ng/Cargo.toml +++ b/core/server-ng/Cargo.toml @@ -105,7 +105,6 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"] [dependencies] ahash = { workspace = true } -anyhow = { workspace = true } argon2 = { workspace = true } async-channel = { workspace = true } async_zip = { workspace = true } @@ -116,6 +115,7 @@ chrono = { workspace = true } clap = { workspace = true } compio = { workspace = true } configs = { workspace = true } +consensus = { workspace = true } ctrlc = { workspace = true } cyper = { workspace = true } cyper-axum = { workspace = true } @@ -131,9 +131,11 @@ hash32 = { workspace = true } human-repr = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } +journal = { workspace = true } jsonwebtoken = { workspace = true } left-right = { workspace = true } metadata = { workspace = true } +message_bus = { workspace = true } mimalloc = { workspace = true, optional = true } mime_guess = { workspace = true, optional = true } nix = { workspace = true } @@ -143,6 +145,7 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true } papaya = { workspace = true } +partitions = { workspace = true } prometheus-client = { workspace = true } rand = { workspace = true } ringbuffer = { workspace = true } @@ -162,6 +165,8 @@ tempfile = { workspace = true } thiserror = { workspace = true } toml = { workspace = true } tower-http = { workspace = true } +server = { workspace = true } +shard = { workspace = true } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-opentelemetry = { workspace = true } diff --git a/core/server-ng/PLAN.md b/core/server-ng/PLAN.md new file mode 100644 index 000000000..006b0a5bf --- /dev/null +++ b/core/server-ng/PLAN.md @@ -0,0 +1,127 @@ +# Server-NG Bootstrap Plan + +## Goal + +Implement startup for the new `server-ng` binary using: + +- `partitions` +- `consensus` +- `message_bus` +- `metadata` +- `journal` + +and support: + +1. new `config.toml` handling in `server-ng` +2. config loading/parsing +3. metadata journal + snapshot loading +4. `IggyMetadata` restoration +5. partition + log loading (`IGGY-131`) +6. single-shard bootstrap only, without transport/runtime infra for now + +## Plan + +### 1. Config surface + +- Reuse the existing server config surface instead of introducing a separate top-level `server-ng` config module. +- Add an extra config section or extension on top of the current config for `server-ng`-specific additions. +- The main new config requirement is broker capacity limits for: + - maximum streams + - maximum topics + - maximum partitions +- Add validation that these three limits can be packed into a single `u64` namespace layout. +- The validation must be tied directly to `IggyNamespace`, since it packs `stream_id/topic_id/partition_id` into `u64`. +- Keep config loading through the existing config infrastructure, with the extra section layered on top. + +### 2. Server-NG config file + +- Create a dedicated `core/server-ng/config.toml`. +- It should be exactly the same as the existing server config file, plus the extra `server-ng` additions. +- Do not create a reduced or bootstrap-only config file. +- Keep it structurally aligned with the current server config so existing parsing behavior can be reused as much as possible. + +### 3. Bootstrap entrypoint + +- Replace the placeholder `server-ng` main startup with a bootstrap flow. +- Keep startup split into clear phases: + - initialize tracing/logging + - load config + - prepare filesystem paths + - restore metadata + - restore partitions + - build shard +- Leave listeners/timers out for now. +- Add an explicit `TODO` at the bootstrap boundary where transport/runtime services will be started once infrastructure exists. + +### 4. Metadata recovery + +- Reuse the existing recovery code from `core/metadata`. +- Load: + - snapshot + - metadata WAL / journal +- Restore the metadata state machine from snapshot. +- Replay journal entries after the snapshot point. +- Construct `IggyMetadata` from the recovered pieces. + +### 5. Consensus state restoration + +- Initialize it with recovered progress so sequence/commit state matches restored metadata. +- Keep this scoped to single-replica / single-shard bootstrap first. + +### 6. Partition discovery + +- Read restored metadata to determine which partitions exist. +- For each partition, resolve: + - stream id + - topic id + - partition id + - consensus group / namespace info + - persisted storage paths + +### 7. Partition + log loading + +- Reuse the existing partition and log loading code from the current server binary because the on-disk format and directory layout have not changed. +- Use the current `server` binary startup path as the reference implementation. +- In particular, trace the existing logic from the current server `main`/bootstrap flow and adapt that code path into `server-ng`. +- Avoid inventing a parallel restore path unless extraction is required for reuse. +- Scope this part of the work to wiring existing logic into the new bootstrap, not redesigning the storage format. + +### 8. Shard construction + +- Build one shard using: + - restored `IggyMetadata` + - restored `IggyPartitions` + - `message_bus` + - `consensus` +- Keep the initial version fixed to one shard. +- Ensure routing tables and namespace ownership are initialized consistently for that single shard. + +### 9. Runtime services + +- Skip runtime services for now. +- Do not plan TCP listeners, timers, HTTP, QUIC, or related infra in this phase. +- Leave a clear `TODO` in the bootstrap where these services will be attached later. + +### 10. Verification + +- Verify config parsing against `core/server-ng/config.toml`. +- Run `cargo check -p server-ng`. +- Smoke-test startup with: + - empty data directory + - existing metadata snapshot/journal + - existing partition logs + +## Suggested implementation order + +1. `core/server-ng/config.toml` aligned with existing server config plus extras +2. metadata restore path +3. partition restore path by reusing current server bootstrap logic +4. shard construction +5. compile + smoke verification + +## Notes from Feedback + +- Do not introduce a brand new top-level `server-ng` config model if the current config can be reused. +- The only clearly identified new config need right now is namespace-capacity validation for packed `u64` `IggyNamespace`. +- Do not scope this task around listeners or timer startup yet. +- Prefer extracting or reusing existing server bootstrap code over reimplementing storage restore logic from scratch. diff --git a/core/server-ng/config.toml b/core/server-ng/config.toml index e24f73fce..3b8eebf22 100644 --- a/core/server-ng/config.toml +++ b/core/server-ng/config.toml @@ -126,6 +126,12 @@ decoding_secret = "" # `false` means the secret is in plain text. use_base64_secret = false +# Trusted issuers for A2A (Application-to-Application) authentication +[[http.jwt.trusted_issuers]] +issuer = "test-issuer" +jwks_url = "http://127.0.0.1:8081/.well-known/jwks.json" +audience = "iggy.apache.org" + # Metrics configuration for HTTP. [http.metrics] # Enable or disable the metrics endpoint. @@ -583,3 +589,8 @@ enabled = false self_signed = true cert_file = "core/certs/iggy_cert.pem" key_file = "core/certs/iggy_key.pem" + +[extra.namespace] +max_streams = 4096 +max_topics = 4096 +max_partitions = 1_000_000 diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs new file mode 100644 index 000000000..1ddefecfb --- /dev/null +++ b/core/server-ng/src/bootstrap.rs @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::server_error::ServerNgError; +use configs::server::ServerConfig; +use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, sharding::LocalIdx, + variadic, +}; +use journal::Journal; +use journal::prepare_journal::PrepareJournal; +use message_bus::IggyMessageBus; +use metadata::IggyMetadata; +use metadata::MuxStateMachine; +use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; +use metadata::impls::recovery::recover; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::snapshot::Snapshot; +use metadata::stm::stream::{Partition, Streams}; +use metadata::stm::user::Users; +use partitions::{ + IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, +}; +use server::bootstrap::create_directories; +use server::log::logger::Logging; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use server::streaming::segments::storage::create_segment_storage; +use shard::shards_table::PapayaShardsTable; +use shard::{IggyShard, PartitionConsensusConfig, ShardIdentity}; +use std::future::pending; +use std::path::{Path, PathBuf}; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{info, warn}; + +const CLUSTER_ID: u128 = 1; +const SHARD_ID: u16 = 0; +const SHARD_NAME: &str = "server-ng-shard-0"; +const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml"; + +type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +type ServerNgMetadata = IggyMetadata< + VsrConsensus<IggyMessageBus>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, +>; +type ServerNgShard = IggyShard< + IggyMessageBus, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, + PapayaShardsTable, +>; + +pub trait RunServerNg { + fn run(&self) -> impl Future<Output = Result<(), ServerNgError>>; +} + +impl RunServerNg for Rc<ServerNgShard> { + async fn run(&self) -> Result<(), ServerNgError> { + info!( + shard = self.id, + partitions = self.plane.partitions().len(), + "server-ng shard initialized" + ); + warn!("TODO: start listeners, timers, and runtime services once the new infra exists"); + pending::<()>().await; + #[allow(unreachable_code)] + Ok(()) + } +} + +pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, ServerNgError> { + let config = ServerConfig::load_with_path(DEFAULT_CONFIG_PATH, include_str!("../config.toml")) + .await + .map_err(ServerNgError::Config)?; + create_directories(&config.system) + .await + .map_err(ServerNgError::CreateDirectories)?; + logging + .late_init( + config.system.get_system_path(), + &config.system.logging, + &config.telemetry, + ) + .map_err(ServerNgError::Logging)?; + + iggy_common::MemoryPool::init_pool(&config.system.memory_pool.into_other()); + + let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) + .await + .map_err(ServerNgError::MetadataRecovery)?; + let restored_op = recovered + .last_applied_op + .unwrap_or_else(|| recovered.snapshot.sequence_number()); + + let metadata = ServerNgMetadata::new( + Some(restore_metadata_consensus(&recovered.journal, restored_op)), + Some(recovered.journal), + Some(recovered.snapshot), + recovered.mux_stm, + Some(PathBuf::from(&config.system.path)), + ); + let shard = Rc::new(build_single_shard(&config, metadata).await?); + info!(shard = shard.id, "server-ng bootstrap complete"); + + Ok(shard) +} + +fn restore_metadata_consensus( + journal: &PrepareJournal, + restored_op: u64, +) -> VsrConsensus<IggyMessageBus> { + let mut consensus = VsrConsensus::new( + CLUSTER_ID, + SHARD_ID as u8, + 1, + 0, + IggyMessageBus::new(1, SHARD_ID, 0), + LocalPipeline::new(), + ); + + let last_header = journal + .last_op() + .and_then(|op| journal.header(op as usize).map(|header| *header)); + if let Some(header) = last_header { + consensus.set_view(header.view); + } + + consensus.init(); + consensus.sequencer().set_sequence(restored_op); + consensus.advance_commit_max(restored_op); + for op in 1..=restored_op { + consensus.advance_commit_min(op); + } + if let Some(header) = last_header { + consensus.set_last_prepare_checksum(header.checksum); + consensus.set_log_view(header.view); + } + + consensus +} + +async fn build_single_shard( + config: &ServerConfig, + metadata: ServerNgMetadata, +) -> Result<ServerNgShard, ServerNgError> { + let shard_id = ShardId::new(SHARD_ID); + let partition_count = metadata.mux_stm.streams().read(|inner| { + inner + .items + .iter() + .map(|(_, stream)| { + stream + .topics + .iter() + .map(|(_, topic)| topic.partitions.len()) + .sum::<usize>() + }) + .sum() + }); + let mut partitions = IggyPartitions::with_capacity( + shard_id, + PartitionsConfig { + messages_required_to_save: config.system.partition.messages_required_to_save, + size_of_messages_required_to_save: config + .system + .partition + .size_of_messages_required_to_save, + enforce_fsync: config.system.partition.enforce_fsync, + segment_size: config.system.segment.size, + }, + partition_count, + ); + let shards_table = PapayaShardsTable::with_capacity(partition_count); + + let mut namespaces = Vec::with_capacity(partition_count); + let _ = metadata.mux_stm.streams().read(|inner| { + for (_, stream) in inner.items.iter() { + for (topic_id, topic) in stream.topics.iter() { + for partition in &topic.partitions { + namespaces.push((stream.id, topic_id, partition.clone())); + } + } + } + }); + + for (stream_id, topic_id, partition_metadata) in namespaces { + let namespace = IggyNamespace::new(stream_id, topic_id, partition_metadata.id); + let partition = load_partition(config, namespace, &partition_metadata).await?; + let local_idx = partitions.insert(namespace, partition); + shards_table.insert( + namespace, + PartitionLocation::new(shard_id, LocalIdx::new(*local_idx)), + ); + } + + Ok(IggyShard::without_inbox( + ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()), + metadata, + partitions, + shards_table, + PartitionConsensusConfig::new(CLUSTER_ID, 1, IggyMessageBus::new(1, SHARD_ID, 0)), + )) +} + +async fn load_partition( + config: &ServerConfig, + namespace: IggyNamespace, + partition_metadata: &Partition, +) -> Result<IggyPartition<IggyMessageBus>, ServerNgError> { + let stream_id = namespace.stream_id(); + let topic_id = namespace.topic_id(); + let partition_id = namespace.partition_id(); + let stats = Arc::new(PartitionStats::default()); + let consensus = VsrConsensus::new( + CLUSTER_ID, + SHARD_ID as u8, + 1, + namespace.inner(), + IggyMessageBus::new(1, SHARD_ID, namespace.inner()), + LocalPipeline::new(), + ); + consensus.init(); + + let loaded_log = server::bootstrap::load_segments( + &config.system, + stream_id, + topic_id, + partition_id, + config + .system + .get_partition_path(stream_id, topic_id, partition_id), + stats.clone(), + ) + .await + .map_err(|source| ServerNgError::PartitionLogLoad { + stream_id, + topic_id, + partition_id, + source, + })?; + + let mut partition = IggyPartition::new(stats.clone(), consensus); + hydrate_partition_log( + &mut partition, + config, + stream_id, + topic_id, + partition_id, + loaded_log, + ) + .await?; + + let current_offset = partition + .log + .segments() + .iter() + .filter(|segment| segment.size > IggyByteSize::default()) + .map(|segment| segment.end_offset) + .max() + .unwrap_or(0); + partition.created_at = partition_metadata.created_at; + partition.offset.store(current_offset, Ordering::Release); + partition + .dirty_offset + .store(current_offset, Ordering::Relaxed); + partition.should_increment_offset = partition + .log + .segments() + .iter() + .any(|segment| segment.size > IggyByteSize::default()); + partition.stats.set_current_offset(current_offset); + partition + .stats + .increment_segments_count(partition.log.segments().len() as u32); + + configure_consumer_offsets(&mut partition, config, namespace, current_offset)?; + ensure_initial_segment(&mut partition, config, stream_id, topic_id, partition_id).await?; + + Ok(partition) +} + +async fn hydrate_partition_log( + partition: &mut IggyPartition<IggyMessageBus>, + config: &ServerConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, + loaded_log: server::streaming::partitions::log::SegmentedLog< + server::streaming::partitions::journal::MemoryMessageJournal, + >, +) -> Result<(), ServerNgError> { + for (segment, storage) in loaded_log + .segments() + .iter() + .cloned() + .zip(loaded_log.storages().iter().cloned()) + { + partition + .log + .add_persisted_segment(convert_segment(segment), storage, None, None); + } + + if let Some(active_index) = partition.log.segments().len().checked_sub(1) { + let storage = &partition.log.storages()[active_index]; + if let (Some(messages_reader), Some(index_reader)) = ( + storage.messages_reader.as_ref(), + storage.index_reader.as_ref(), + ) { + let index_path = index_reader.path(); + let index_size = std::fs::metadata(&index_path) + .map(|metadata| metadata.len()) + .unwrap_or(0); + partition.log.messages_writers_mut()[active_index] = Some(Rc::new( + MessagesWriter::new( + &messages_reader.path(), + Rc::new(AtomicU64::new(messages_reader.file_size() as u64)), + config.system.partition.enforce_fsync, + true, + ) + .await + .map_err(|source| ServerNgError::MessagesWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )); + partition.log.index_writers_mut()[active_index] = Some(Rc::new( + IggyIndexWriter::new( + &index_path, + Rc::new(AtomicU64::new(index_size)), + config.system.partition.enforce_fsync, + true, + ) + .await + .map_err(|source| ServerNgError::IndexWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )); + } + } else { + let _ = (stream_id, topic_id, partition_id); + } + + Ok(()) +} + +fn convert_segment(segment: iggy_common::Segment) -> Segment { + Segment { + sealed: segment.sealed, + start_timestamp: segment.start_timestamp, + end_timestamp: segment.end_timestamp, + max_timestamp: segment.end_timestamp, + current_position: u64::from(segment.current_position), + start_offset: segment.start_offset, + end_offset: segment.end_offset, + size: segment.size, + max_size: segment.max_size, + } +} + +fn configure_consumer_offsets( + partition: &mut IggyPartition<IggyMessageBus>, + config: &ServerConfig, + namespace: IggyNamespace, + current_offset: u64, +) -> Result<(), ServerNgError> { + let stream_id = namespace.stream_id(); + let topic_id = namespace.topic_id(); + let partition_id = namespace.partition_id(); + let consumer_offsets_path = + config + .system + .get_consumer_offsets_path(stream_id, topic_id, partition_id); + let consumer_group_offsets_path = + config + .system + .get_consumer_group_offsets_path(stream_id, topic_id, partition_id); + + let loaded_consumer_offsets = load_consumer_offsets(&consumer_offsets_path).unwrap_or_default(); + let consumer_offsets = ConsumerOffsets::with_capacity(loaded_consumer_offsets.len()); + { + let guard = consumer_offsets.pin(); + for offset in loaded_consumer_offsets { + if offset.offset.load(Ordering::Relaxed) > current_offset { + offset.offset.store(current_offset, Ordering::Relaxed); + } + guard.insert(offset.consumer_id as usize, offset); + } + } + + let loaded_group_offsets = + load_consumer_group_offsets(&consumer_group_offsets_path).unwrap_or_default(); + let consumer_group_offsets = ConsumerGroupOffsets::with_capacity(loaded_group_offsets.len()); + { + let guard = consumer_group_offsets.pin(); + for (group_id, offset) in loaded_group_offsets { + if offset.offset.load(Ordering::Relaxed) > current_offset { + offset.offset.store(current_offset, Ordering::Relaxed); + } + guard.insert(group_id, offset); + } + } + + partition.configure_consumer_offset_storage( + consumer_offsets_path, + consumer_group_offsets_path, + consumer_offsets, + consumer_group_offsets, + config.system.partition.enforce_fsync, + ); + + Ok(()) +} + +async fn ensure_initial_segment( + partition: &mut IggyPartition<IggyMessageBus>, + config: &ServerConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, +) -> Result<(), ServerNgError> { + if partition.log.has_segments() { + return Ok(()); + } + + let storage = + create_segment_storage(&config.system, stream_id, topic_id, partition_id, 0, 0, 0) + .await + .map_err(|source| ServerNgError::InitialSegmentStorage { + stream_id, + topic_id, + partition_id, + source, + })?; + let messages_path = config + .system + .get_messages_file_path(stream_id, topic_id, partition_id, 0); + let index_path = config + .system + .get_index_path(stream_id, topic_id, partition_id, 0); + partition.log.add_persisted_segment( + Segment::new(0, config.system.segment.size), + storage, + Some(Rc::new( + MessagesWriter::new( + &messages_path, + Rc::new(AtomicU64::new(0)), + config.system.partition.enforce_fsync, + false, + ) + .await + .map_err(|source| ServerNgError::MessagesWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )), + Some(Rc::new( + IggyIndexWriter::new( + &index_path, + Rc::new(AtomicU64::new(0)), + config.system.partition.enforce_fsync, + false, + ) + .await + .map_err(|source| ServerNgError::IndexWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )), + ); + partition.stats.increment_segments_count(1); + + Ok(()) +} diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs index caae94a3a..c8c412834 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/lib.rs @@ -17,5 +17,7 @@ * under the License. */ +pub mod bootstrap; pub mod login_register; +pub mod server_error; pub mod session_manager; diff --git a/core/server-ng/src/main.rs b/core/server-ng/src/main.rs index ffc8ee202..b6a7ef885 100644 --- a/core/server-ng/src/main.rs +++ b/core/server-ng/src/main.rs @@ -17,7 +17,22 @@ * under the License. */ -fn main() { - tracing_subscriber::fmt::init(); - tracing::info!("iggy-server-ng starting..."); +use server_ng::server_error::ServerNgError; + +fn main() -> Result<(), ServerNgError> { + let runtime = compio::runtime::Runtime::new().expect("failed to create compio runtime"); + runtime.block_on(async { + if let Ok(env_path) = std::env::var("IGGY_ENV_PATH") { + let _ = dotenvy::from_path(&env_path); + } else { + let _ = dotenvy::dotenv(); + } + + use server_ng::bootstrap::RunServerNg; + let mut logging = server::log::logger::Logging::new(); + logging.early_init(); + + let shard = server_ng::bootstrap::bootstrap(&mut logging).await?; + shard.run().await + }) } diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs new file mode 100644 index 000000000..c071f1608 --- /dev/null +++ b/core/server-ng/src/server_error.rs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use metadata::impls::recovery::RecoveryError; +use server::server_error::LogError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ServerNgError { + #[error("failed to load server-ng config")] + Config(#[source] configs::ConfigurationError), + #[error("failed to prepare server-ng directories")] + CreateDirectories(#[source] iggy_common::IggyError), + #[error("failed to initialize server-ng logging")] + Logging(#[source] LogError), + #[error("failed to recover metadata snapshot and journal")] + MetadataRecovery(#[source] RecoveryError), + #[error( + "failed to load partition log for stream {stream_id}, topic {topic_id}, partition {partition_id}" + )] + PartitionLogLoad { + stream_id: usize, + topic_id: usize, + partition_id: usize, + #[source] + source: iggy_common::IggyError, + }, + #[error( + "failed to initialize messages writer for stream {stream_id}, topic {topic_id}, partition {partition_id}" + )] + MessagesWriterInit { + stream_id: usize, + topic_id: usize, + partition_id: usize, + #[source] + source: iggy_common::IggyError, + }, + #[error( + "failed to initialize index writer for stream {stream_id}, topic {topic_id}, partition {partition_id}" + )] + IndexWriterInit { + stream_id: usize, + topic_id: usize, + partition_id: usize, + #[source] + source: iggy_common::IggyError, + }, + #[error( + "failed to create initial segment storage for stream {stream_id}, topic {topic_id}, partition {partition_id}" + )] + InitialSegmentStorage { + stream_id: usize, + topic_id: usize, + partition_id: usize, + #[source] + source: iggy_common::IggyError, + }, +}
