This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch replica_bootstrap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f1279e46c00ae1111dd06eaeed170ddc745a84c8 Author: numinex <[email protected]> AuthorDate: Thu Apr 23 14:29:06 2026 +0200 tmp v2 --- core/common/src/types/streaming_stats.rs | 28 +++ core/consensus/src/impls.rs | 27 +++ core/metadata/src/lib.rs | 2 - core/metadata/src/stats/mod.rs | 381 ------------------------------- core/metadata/src/stm/stream.rs | 5 +- core/partitions/src/iggy_index_writer.rs | 15 ++ core/partitions/src/messages_writer.rs | 16 ++ core/server-ng/src/bootstrap.rs | 97 +++++--- core/server-ng/src/lib.rs | 2 + core/server-ng/src/main.rs | 30 ++- core/server-ng/src/server_error.rs | 12 + 11 files changed, 199 insertions(+), 416 deletions(-) diff --git a/core/common/src/types/streaming_stats.rs b/core/common/src/types/streaming_stats.rs index e1dbbc061..2c5829d13 100644 --- a/core/common/src/types/streaming_stats.rs +++ b/core/common/src/types/streaming_stats.rs @@ -85,6 +85,20 @@ impl StreamStats { self.zero_out_messages_count(); self.zero_out_segments_count(); } + + pub fn load_for_snapshot(&self) -> (u64, u64, u32) { + ( + self.size_bytes.load(Ordering::Relaxed), + self.messages_count.load(Ordering::Relaxed), + self.segments_count.load(Ordering::Relaxed), + ) + } + + pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, segments_count: u32) { + self.size_bytes.store(size_bytes, Ordering::Relaxed); + self.messages_count.store(messages_count, Ordering::Relaxed); + self.segments_count.store(segments_count, Ordering::Relaxed); + } } #[derive(Default, Debug)] @@ -199,6 +213,20 @@ impl TopicStats { self.zero_out_messages_count(); self.zero_out_segments_count(); } + + pub fn load_for_snapshot(&self) -> (u64, u64, u32) { + ( + self.size_bytes.load(Ordering::Relaxed), + self.messages_count.load(Ordering::Relaxed), + self.segments_count.load(Ordering::Relaxed), + ) + } + + pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, segments_count: u32) { + self.size_bytes.store(size_bytes, Ordering::Relaxed); + self.messages_count.store(messages_count, Ordering::Relaxed); + self.segments_count.store(segments_count, Ordering::Relaxed); + } } #[derive(Default, Debug)] diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 8cc8d1c2a..8b45286cb 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -662,6 +662,33 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { assert!(self.commit_max.get() >= self.commit_min.get()); } + /// Restore local commit progress from already-applied state during bootstrap. + /// + /// Unlike `advance_commit_min`, this is intended for recovery paths where the + /// state machine has already been restored up to the supplied commit point. + /// + /// # Panics + /// - If `commit_min > commit_max`. + /// - If commit progress has already been initialized on this consensus instance. + pub fn restore_commit_state(&self, commit_min: u64, commit_max: u64) { + assert!( + commit_min <= commit_max, + "commit_min ({commit_min}) must be <= commit_max ({commit_max})" + ); + assert_eq!( + self.commit_min.get(), + 0, + "restore_commit_state must only be used on a fresh consensus instance" + ); + assert_eq!( + self.commit_max.get(), + 0, + "restore_commit_state must only be used on a fresh consensus instance" + ); + self.commit_max.set(commit_max); + self.commit_min.set(commit_min); + } + /// Maximum number of faulty replicas that can be tolerated. /// For a cluster of 2f+1 replicas, this returns f. #[must_use] diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs index 7ae61f217..a50cf4f27 100644 --- a/core/metadata/src/lib.rs +++ b/core/metadata/src/lib.rs @@ -21,8 +21,6 @@ pub mod impls; pub mod permissioner; pub mod stm; -mod stats; - // Re-export IggyMetadata for use in other modules pub use impls::metadata::IggyMetadata; diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs deleted file mode 100644 index 28dd5c5a4..000000000 --- a/core/metadata/src/stats/mod.rs +++ /dev/null @@ -1,381 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![expect( - unused, - reason = "Methods are part of the state API and will be used once the implementation is complete" -)] -use std::sync::{ - Arc, - atomic::{AtomicU32, AtomicU64, Ordering}, -}; - -#[derive(Default, Debug)] -pub struct StreamStats { - size_bytes: AtomicU64, - messages_count: AtomicU64, - segments_count: AtomicU32, -} - -impl StreamStats { - pub fn increment_size_bytes(&self, size_bytes: u64) { - self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel); - } - - pub fn increment_messages_count(&self, messages_count: u64) { - self.messages_count - .fetch_add(messages_count, Ordering::AcqRel); - } - - pub fn increment_segments_count(&self, segments_count: u32) { - self.segments_count - .fetch_add(segments_count, Ordering::AcqRel); - } - - pub fn decrement_size_bytes(&self, size_bytes: u64) { - self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel); - } - - pub fn decrement_messages_count(&self, messages_count: u64) { - self.messages_count - .fetch_sub(messages_count, Ordering::AcqRel); - } - - pub fn decrement_segments_count(&self, segments_count: u32) { - self.segments_count - .fetch_sub(segments_count, Ordering::AcqRel); - } - - pub fn size_bytes_inconsistent(&self) -> u64 { - self.size_bytes.load(Ordering::Relaxed) - } - - pub fn messages_count_inconsistent(&self) -> u64 { - self.messages_count.load(Ordering::Relaxed) - } - - pub fn segments_count_inconsistent(&self) -> u32 { - self.segments_count.load(Ordering::Relaxed) - } - - pub fn zero_out_size_bytes(&self) { - self.size_bytes.store(0, Ordering::Relaxed); - } - - pub fn zero_out_messages_count(&self) { - self.messages_count.store(0, Ordering::Relaxed); - } - - pub fn zero_out_segments_count(&self) { - self.segments_count.store(0, Ordering::Relaxed); - } - - pub fn zero_out_all(&self) { - self.zero_out_size_bytes(); - self.zero_out_messages_count(); - self.zero_out_segments_count(); - } - - pub fn load_for_snapshot(&self) -> (u64, u64, u32) { - ( - self.size_bytes.load(Ordering::Relaxed), - self.messages_count.load(Ordering::Relaxed), - self.segments_count.load(Ordering::Relaxed), - ) - } - - pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, segments_count: u32) { - self.size_bytes.store(size_bytes, Ordering::Relaxed); - self.messages_count.store(messages_count, Ordering::Relaxed); - self.segments_count.store(segments_count, Ordering::Relaxed); - } -} - -#[derive(Default, Debug)] -pub struct TopicStats { - parent: Arc<StreamStats>, - size_bytes: AtomicU64, - messages_count: AtomicU64, - segments_count: AtomicU32, -} - -impl TopicStats { - pub const fn new(parent: Arc<StreamStats>) -> Self { - Self { - parent, - size_bytes: AtomicU64::new(0), - messages_count: AtomicU64::new(0), - segments_count: AtomicU32::new(0), - } - } - - pub fn parent(&self) -> Arc<StreamStats> { - self.parent.clone() - } - - pub fn increment_parent_size_bytes(&self, size_bytes: u64) { - self.parent.increment_size_bytes(size_bytes); - } - - pub fn increment_parent_messages_count(&self, messages_count: u64) { - self.parent.increment_messages_count(messages_count); - } - - pub fn increment_parent_segments_count(&self, segments_count: u32) { - self.parent.increment_segments_count(segments_count); - } - - pub fn increment_size_bytes(&self, size_bytes: u64) { - self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel); - self.increment_parent_size_bytes(size_bytes); - } - - pub fn increment_messages_count(&self, messages_count: u64) { - self.messages_count - .fetch_add(messages_count, Ordering::AcqRel); - self.increment_parent_messages_count(messages_count); - } - - pub fn increment_segments_count(&self, segments_count: u32) { - self.segments_count - .fetch_add(segments_count, Ordering::AcqRel); - self.increment_parent_segments_count(segments_count); - } - - pub fn decrement_parent_size_bytes(&self, size_bytes: u64) { - self.parent.decrement_size_bytes(size_bytes); - } - - pub fn decrement_parent_messages_count(&self, messages_count: u64) { - self.parent.decrement_messages_count(messages_count); - } - - pub fn decrement_parent_segments_count(&self, segments_count: u32) { - self.parent.decrement_segments_count(segments_count); - } - - pub fn decrement_size_bytes(&self, size_bytes: u64) { - self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel); - self.decrement_parent_size_bytes(size_bytes); - } - - pub fn decrement_messages_count(&self, messages_count: u64) { - self.messages_count - .fetch_sub(messages_count, Ordering::AcqRel); - self.decrement_parent_messages_count(messages_count); - } - - pub fn decrement_segments_count(&self, segments_count: u32) { - self.segments_count - .fetch_sub(segments_count, Ordering::AcqRel); - self.decrement_parent_segments_count(segments_count); - } - - pub fn size_bytes_inconsistent(&self) -> u64 { - self.size_bytes.load(Ordering::Relaxed) - } - - pub fn messages_count_inconsistent(&self) -> u64 { - self.messages_count.load(Ordering::Relaxed) - } - - pub fn segments_count_inconsistent(&self) -> u32 { - self.segments_count.load(Ordering::Relaxed) - } - - pub fn zero_out_parent_size_bytes(&self) { - self.parent.zero_out_size_bytes(); - } - - pub fn zero_out_parent_messages_count(&self) { - self.parent.zero_out_messages_count(); - } - - pub fn zero_out_parent_segments_count(&self) { - self.parent.zero_out_segments_count(); - } - - pub fn zero_out_parent_all(&self) { - self.parent.zero_out_all(); - } - - pub fn zero_out_size_bytes(&self) { - self.size_bytes.store(0, Ordering::Relaxed); - self.zero_out_parent_size_bytes(); - } - - pub fn zero_out_messages_count(&self) { - self.messages_count.store(0, Ordering::Relaxed); - self.zero_out_parent_messages_count(); - } - - pub fn zero_out_segments_count(&self) { - self.segments_count.store(0, Ordering::Relaxed); - self.zero_out_parent_segments_count(); - } - - pub fn zero_out_all(&self) { - self.zero_out_size_bytes(); - self.zero_out_messages_count(); - self.zero_out_segments_count(); - } - - pub fn load_for_snapshot(&self) -> (u64, u64, u32) { - ( - self.size_bytes.load(Ordering::Relaxed), - self.messages_count.load(Ordering::Relaxed), - self.segments_count.load(Ordering::Relaxed), - ) - } - - pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, segments_count: u32) { - self.size_bytes.store(size_bytes, Ordering::Relaxed); - self.messages_count.store(messages_count, Ordering::Relaxed); - self.segments_count.store(segments_count, Ordering::Relaxed); - } -} - -#[derive(Default, Debug)] -pub struct PartitionStats { - parent: Arc<TopicStats>, - messages_count: AtomicU64, - size_bytes: AtomicU64, - segments_count: AtomicU32, -} - -impl PartitionStats { - pub const fn new(parent_stats: Arc<TopicStats>) -> Self { - Self { - parent: parent_stats, - messages_count: AtomicU64::new(0), - size_bytes: AtomicU64::new(0), - segments_count: AtomicU32::new(0), - } - } - - pub fn parent(&self) -> Arc<TopicStats> { - self.parent.clone() - } - - pub fn increment_size_bytes(&self, size_bytes: u64) { - self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel); - self.increment_parent_size_bytes(size_bytes); - } - - pub fn increment_messages_count(&self, messages_count: u64) { - self.messages_count - .fetch_add(messages_count, Ordering::AcqRel); - self.increment_parent_messages_count(messages_count); - } - - pub fn increment_segments_count(&self, segments_count: u32) { - self.segments_count - .fetch_add(segments_count, Ordering::AcqRel); - self.increment_parent_segments_count(segments_count); - } - - pub fn increment_parent_size_bytes(&self, size_bytes: u64) { - self.parent.increment_size_bytes(size_bytes); - } - - pub fn increment_parent_messages_count(&self, messages_count: u64) { - self.parent.increment_messages_count(messages_count); - } - - pub fn increment_parent_segments_count(&self, segments_count: u32) { - self.parent.increment_segments_count(segments_count); - } - - pub fn decrement_size_bytes(&self, size_bytes: u64) { - self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel); - self.decrement_parent_size_bytes(size_bytes); - } - - pub fn decrement_messages_count(&self, messages_count: u64) { - self.messages_count - .fetch_sub(messages_count, Ordering::AcqRel); - self.decrement_parent_messages_count(messages_count); - } - - pub fn decrement_segments_count(&self, segments_count: u32) { - self.segments_count - .fetch_sub(segments_count, Ordering::AcqRel); - self.decrement_parent_segments_count(segments_count); - } - - pub fn decrement_parent_size_bytes(&self, size_bytes: u64) { - self.parent.decrement_size_bytes(size_bytes); - } - - pub fn decrement_parent_messages_count(&self, messages_count: u64) { - self.parent.decrement_messages_count(messages_count); - } - - pub fn decrement_parent_segments_count(&self, segments_count: u32) { - self.parent.decrement_segments_count(segments_count); - } - - pub fn size_bytes_inconsistent(&self) -> u64 { - self.size_bytes.load(Ordering::Relaxed) - } - - pub fn messages_count_inconsistent(&self) -> u64 { - self.messages_count.load(Ordering::Relaxed) - } - - pub fn segments_count_inconsistent(&self) -> u32 { - self.segments_count.load(Ordering::Relaxed) - } - - pub fn zero_out_parent_size_bytes(&self) { - self.parent.zero_out_size_bytes(); - } - - pub fn zero_out_parent_messages_count(&self) { - self.parent.zero_out_messages_count(); - } - - pub fn zero_out_parent_segments_count(&self) { - self.parent.zero_out_segments_count(); - } - - pub fn zero_out_parent_all(&self) { - self.parent.zero_out_all(); - } - - pub fn zero_out_size_bytes(&self) { - self.size_bytes.store(0, Ordering::Relaxed); - self.zero_out_parent_size_bytes(); - } - - pub fn zero_out_messages_count(&self) { - self.messages_count.store(0, Ordering::Relaxed); - self.zero_out_parent_messages_count(); - } - - pub fn zero_out_segments_count(&self) { - self.segments_count.store(0, Ordering::Relaxed); - self.zero_out_parent_segments_count(); - } - - pub fn zero_out_all(&self) { - self.zero_out_size_bytes(); - self.zero_out_messages_count(); - self.zero_out_segments_count(); - self.zero_out_parent_all(); - } -} diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 0e2f574c2..d91f67df6 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::stats::{StreamStats, TopicStats}; use crate::stm::StateHandler; use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; @@ -31,7 +30,9 @@ use iggy_binary_protocol::requests::streams::{ use iggy_binary_protocol::requests::topics::{ CreateTopicWithAssignmentsRequest, DeleteTopicRequest, PurgeTopicRequest, UpdateTopicRequest, }; -use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; +use iggy_common::{ + CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize, StreamStats, TopicStats, +}; use serde::{Deserialize, Serialize}; use slab::Slab; use std::sync::Arc; diff --git a/core/partitions/src/iggy_index_writer.rs b/core/partitions/src/iggy_index_writer.rs index 3aa05ac5b..8d2eb9c1b 100644 --- a/core/partitions/src/iggy_index_writer.rs +++ b/core/partitions/src/iggy_index_writer.rs @@ -31,6 +31,11 @@ pub struct IggyIndexWriter { } impl IggyIndexWriter { + /// Creates an index writer backed by the sparse index file at `file_path`. + /// + /// # Errors + /// + /// Returns an error if the file cannot be opened, synchronized, or queried for metadata. pub async fn new( file_path: &str, index_size_bytes: Rc<AtomicU64>, @@ -72,6 +77,11 @@ impl IggyIndexWriter { }) } + /// Appends encoded sparse index bytes to the backing file. + /// + /// # Errors + /// + /// Returns an error if the index bytes cannot be written or synced to disk. pub async fn save_indexes(&self, indexes: Vec<u8>) -> Result<(), IggyError> { if indexes.is_empty() { return Ok(()); @@ -103,6 +113,11 @@ impl IggyIndexWriter { Ok(()) } + /// Flushes buffered index file contents to disk. + /// + /// # Errors + /// + /// Returns an error if the file cannot be synchronized. pub async fn fsync(&self) -> Result<(), IggyError> { self.file .sync_all() diff --git a/core/partitions/src/messages_writer.rs b/core/partitions/src/messages_writer.rs index 4af82d48a..517a036c9 100644 --- a/core/partitions/src/messages_writer.rs +++ b/core/partitions/src/messages_writer.rs @@ -38,6 +38,11 @@ pub struct MessagesWriter { } impl MessagesWriter { + /// Creates a messages writer backed by the segment file at `file_path`. + /// + /// # Errors + /// + /// Returns an error if the file cannot be opened, synchronized, or queried for metadata. pub async fn new( file_path: &str, messages_size_bytes: Rc<AtomicU64>, @@ -74,6 +79,11 @@ impl MessagesWriter { }) } + /// Appends a batch of frozen message buffers to the segment file. + /// + /// # Errors + /// + /// Returns an error if any chunk cannot be written or synced to disk. pub async fn save_frozen_batches<const ALIGN: usize>( &self, buffers: &[Frozen<ALIGN>], @@ -97,10 +107,16 @@ impl MessagesWriter { Ok(IggyByteSize::from(messages_size)) } + #[must_use] pub fn path(&self) -> String { self.file_path.clone() } + /// Flushes buffered segment file contents to disk. + /// + /// # Errors + /// + /// Returns an error if the file cannot be synchronized. pub async fn fsync(&self) -> Result<(), IggyError> { self.file .sync_all() diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 1ddefecfb..a17dfd064 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -22,8 +22,8 @@ use configs::server::ServerConfig; use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; use iggy_common::{ - ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, sharding::LocalIdx, - variadic, + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, + sharding::LocalIdx, variadic, }; use journal::Journal; use journal::prepare_journal::PrepareJournal; @@ -39,6 +39,7 @@ use metadata::stm::user::Users; use partitions::{ IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, }; +// TODO: decouple bootstrap/storage helpers and logging from the `server` crate. use server::bootstrap::create_directories; use server::log::logger::Logging; use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; @@ -54,6 +55,7 @@ use tracing::{info, warn}; const CLUSTER_ID: u128 = 1; const SHARD_ID: u16 = 0; +const SHARD_REPLICA_ID: u8 = 0; const SHARD_NAME: &str = "server-ng-shard-0"; const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml"; @@ -90,10 +92,17 @@ impl RunServerNg for Rc<ServerNgShard> { } } +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if config loading, directory preparation, logging setup, +/// metadata recovery, or partition hydration fails. pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, ServerNgError> { let config = ServerConfig::load_with_path(DEFAULT_CONFIG_PATH, include_str!("../config.toml")) .await .map_err(ServerNgError::Config)?; + // TODO: decouple directory bootstrap from the `server` crate. create_directories(&config.system) .await .map_err(ServerNgError::CreateDirectories)?; @@ -133,7 +142,7 @@ fn restore_metadata_consensus( ) -> VsrConsensus<IggyMessageBus> { let mut consensus = VsrConsensus::new( CLUSTER_ID, - SHARD_ID as u8, + SHARD_REPLICA_ID, 1, 0, IggyMessageBus::new(1, SHARD_ID, 0), @@ -142,17 +151,15 @@ fn restore_metadata_consensus( let last_header = journal .last_op() - .and_then(|op| journal.header(op as usize).map(|header| *header)); + .and_then(|op| usize::try_from(op).ok()) + .and_then(|op| journal.header(op).map(|header| *header)); if let Some(header) = last_header { consensus.set_view(header.view); } consensus.init(); consensus.sequencer().set_sequence(restored_op); - consensus.advance_commit_max(restored_op); - for op in 1..=restored_op { - consensus.advance_commit_min(op); - } + consensus.restore_commit_state(restored_op, restored_op); if let Some(header) = last_header { consensus.set_last_prepare_checksum(header.checksum); consensus.set_log_view(header.view); @@ -194,20 +201,28 @@ async fn build_single_shard( ); let shards_table = PapayaShardsTable::with_capacity(partition_count); - let mut namespaces = Vec::with_capacity(partition_count); - let _ = metadata.mux_stm.streams().read(|inner| { - for (_, stream) in inner.items.iter() { - for (topic_id, topic) in stream.topics.iter() { + let (topic_stats, namespaces) = metadata.mux_stm.streams().read(|inner| { + let mut topic_stats = Vec::new(); + let mut namespaces = Vec::with_capacity(partition_count); + for (_, stream) in &inner.items { + for (topic_id, topic) in &stream.topics { + topic_stats.push(topic.stats.clone()); for partition in &topic.partitions { - namespaces.push((stream.id, topic_id, partition.clone())); + namespaces.push((stream.id, topic_id, topic.stats.clone(), partition.clone())); } } } + (topic_stats, namespaces) }); - for (stream_id, topic_id, partition_metadata) in namespaces { + for topic_stats in topic_stats { + topic_stats.zero_out_all(); + } + + for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces { + validate_recovered_namespace(config, stream_id, topic_id, partition_metadata.id)?; let namespace = IggyNamespace::new(stream_id, topic_id, partition_metadata.id); - let partition = load_partition(config, namespace, &partition_metadata).await?; + let partition = load_partition(config, namespace, topic_stats, &partition_metadata).await?; let local_idx = partitions.insert(namespace, partition); shards_table.insert( namespace, @@ -224,18 +239,43 @@ async fn build_single_shard( )) } +const fn validate_recovered_namespace( + config: &ServerConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, +) -> Result<(), ServerNgError> { + let namespace = &config.extra.namespace; + if stream_id < namespace.max_streams + && topic_id < namespace.max_topics + && partition_id < namespace.max_partitions + { + return Ok(()); + } + + Err(ServerNgError::RecoveredNamespaceOutOfBounds { + stream_id, + topic_id, + partition_id, + max_streams: namespace.max_streams, + max_topics: namespace.max_topics, + max_partitions: namespace.max_partitions, + }) +} + async fn load_partition( config: &ServerConfig, namespace: IggyNamespace, + topic_stats: Arc<TopicStats>, partition_metadata: &Partition, ) -> Result<IggyPartition<IggyMessageBus>, ServerNgError> { let stream_id = namespace.stream_id(); let topic_id = namespace.topic_id(); let partition_id = namespace.partition_id(); - let stats = Arc::new(PartitionStats::default()); + let stats = Arc::new(PartitionStats::new(topic_stats)); let consensus = VsrConsensus::new( CLUSTER_ID, - SHARD_ID as u8, + SHARD_REPLICA_ID, 1, namespace.inner(), IggyMessageBus::new(1, SHARD_ID, namespace.inner()), @@ -243,6 +283,8 @@ async fn load_partition( ); consensus.init(); + // TODO: decouple the loading logic from the `server` crate and load directly + // into the new `partitions` log/runtime types. let loaded_log = server::bootstrap::load_segments( &config.system, stream_id, @@ -291,11 +333,8 @@ async fn load_partition( .iter() .any(|segment| segment.size > IggyByteSize::default()); partition.stats.set_current_offset(current_offset); - partition - .stats - .increment_segments_count(partition.log.segments().len() as u32); - configure_consumer_offsets(&mut partition, config, namespace, current_offset)?; + configure_consumer_offsets(&mut partition, config, namespace, current_offset); ensure_initial_segment(&mut partition, config, stream_id, topic_id, partition_id).await?; Ok(partition) @@ -311,10 +350,11 @@ async fn hydrate_partition_log( server::streaming::partitions::journal::MemoryMessageJournal, >, ) -> Result<(), ServerNgError> { + // TODO: decouple the loading logic from the `server` crate. This currently + // adapts the old server segmented log into the new `partitions` log. for (segment, storage) in loaded_log .segments() .iter() - .cloned() .zip(loaded_log.storages().iter().cloned()) { partition @@ -335,7 +375,7 @@ async fn hydrate_partition_log( partition.log.messages_writers_mut()[active_index] = Some(Rc::new( MessagesWriter::new( &messages_reader.path(), - Rc::new(AtomicU64::new(messages_reader.file_size() as u64)), + Rc::new(AtomicU64::new(u64::from(messages_reader.file_size()))), config.system.partition.enforce_fsync, true, ) @@ -363,14 +403,12 @@ async fn hydrate_partition_log( })?, )); } - } else { - let _ = (stream_id, topic_id, partition_id); } Ok(()) } -fn convert_segment(segment: iggy_common::Segment) -> Segment { +fn convert_segment(segment: &iggy_common::Segment) -> Segment { Segment { sealed: segment.sealed, start_timestamp: segment.start_timestamp, @@ -389,7 +427,7 @@ fn configure_consumer_offsets( config: &ServerConfig, namespace: IggyNamespace, current_offset: u64, -) -> Result<(), ServerNgError> { +) { let stream_id = namespace.stream_id(); let topic_id = namespace.topic_id(); let partition_id = namespace.partition_id(); @@ -402,6 +440,7 @@ fn configure_consumer_offsets( .system .get_consumer_group_offsets_path(stream_id, topic_id, partition_id); + // TODO: decouple consumer offset loading from the `server` crate. let loaded_consumer_offsets = load_consumer_offsets(&consumer_offsets_path).unwrap_or_default(); let consumer_offsets = ConsumerOffsets::with_capacity(loaded_consumer_offsets.len()); { @@ -414,6 +453,7 @@ fn configure_consumer_offsets( } } + // TODO: decouple consumer group offset loading from the `server` crate. let loaded_group_offsets = load_consumer_group_offsets(&consumer_group_offsets_path).unwrap_or_default(); let consumer_group_offsets = ConsumerGroupOffsets::with_capacity(loaded_group_offsets.len()); @@ -434,8 +474,6 @@ fn configure_consumer_offsets( consumer_group_offsets, config.system.partition.enforce_fsync, ); - - Ok(()) } async fn ensure_initial_segment( @@ -449,6 +487,7 @@ async fn ensure_initial_segment( return Ok(()); } + // TODO: decouple segment storage creation from the `server` crate. let storage = create_segment_storage(&config.system, stream_id, topic_id, partition_id, 0, 0, 0) .await diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs index c8c412834..be74cdd6e 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/lib.rs @@ -17,6 +17,8 @@ * under the License. */ +#![allow(clippy::future_not_send)] + pub mod bootstrap; pub mod login_register; pub mod server_error; diff --git a/core/server-ng/src/main.rs b/core/server-ng/src/main.rs index b6a7ef885..e7223ff72 100644 --- a/core/server-ng/src/main.rs +++ b/core/server-ng/src/main.rs @@ -17,10 +17,35 @@ * under the License. */ +#![allow(clippy::future_not_send)] + +use server_ng::bootstrap::RunServerNg; use server_ng::server_error::ServerNgError; fn main() -> Result<(), ServerNgError> { - let runtime = compio::runtime::Runtime::new().expect("failed to create compio runtime"); + // TODO: decouple runtime creation from the `server` crate and move the shared + // compio executor setup into a lower-level crate/module used by both binaries. + let runtime = match server::bootstrap::create_shard_executor() { + Ok(rt) => rt, + Err(e) => { + match e.kind() { + std::io::ErrorKind::InvalidInput => { + // TODO: decouple io_uring diagnostics from the `server` crate. + server::diagnostics::print_invalid_io_uring_args_info(); + } + std::io::ErrorKind::OutOfMemory => { + // TODO: decouple io_uring diagnostics from the `server` crate. + server::diagnostics::print_locked_memory_limit_info(); + } + std::io::ErrorKind::PermissionDenied => { + // TODO: decouple io_uring diagnostics from the `server` crate. + server::diagnostics::print_io_uring_permission_info(); + } + _ => {} + } + panic!("Cannot create server-ng executor: {e}"); + } + }; runtime.block_on(async { if let Ok(env_path) = std::env::var("IGGY_ENV_PATH") { let _ = dotenvy::from_path(&env_path); @@ -28,7 +53,8 @@ fn main() -> Result<(), ServerNgError> { let _ = dotenvy::dotenv(); } - use server_ng::bootstrap::RunServerNg; + // TODO: decouple logging from the `server` crate and move the shared + // logging bootstrap into a lower-level crate/module. let mut logging = server::log::logger::Logging::new(); logging.early_init(); diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index c071f1608..1a667fc4b 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -18,6 +18,7 @@ */ use metadata::impls::recovery::RecoveryError; +// TODO: decouple logging errors from the `server` crate. use server::server_error::LogError; use thiserror::Error; @@ -31,6 +32,17 @@ pub enum ServerNgError { Logging(#[source] LogError), #[error("failed to recover metadata snapshot and journal")] MetadataRecovery(#[source] RecoveryError), + #[error( + "recovered namespace stream {stream_id}, topic {topic_id}, partition {partition_id} exceeds configured limits (max_streams={max_streams}, max_topics={max_topics}, max_partitions={max_partitions})" + )] + RecoveredNamespaceOutOfBounds { + stream_id: usize, + topic_id: usize, + partition_id: usize, + max_streams: usize, + max_topics: usize, + max_partitions: usize, + }, #[error( "failed to load partition log for stream {stream_id}, topic {topic_id}, partition {partition_id}" )]
