This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch replica_bootstrap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7fabdb0ca672ab184ac405a97c299edfd56ba49c Author: numinex <[email protected]> AuthorDate: Fri Apr 24 17:11:02 2026 +0200 finito --- core/server-ng/PLAN.md | 127 -------- core/server-ng/src/{lib.rs => args.rs} | 21 +- core/server-ng/src/bootstrap.rs | 577 ++++++++++++++++++++++++++++++--- core/server-ng/src/config_writer.rs | 96 ++++++ core/server-ng/src/lib.rs | 1 + core/server-ng/src/main.rs | 12 +- core/server-ng/src/server_error.rs | 35 ++ core/shard/src/router.rs | 7 +- 8 files changed, 698 insertions(+), 178 deletions(-) diff --git a/core/server-ng/PLAN.md b/core/server-ng/PLAN.md deleted file mode 100644 index 006b0a5bf..000000000 --- a/core/server-ng/PLAN.md +++ /dev/null @@ -1,127 +0,0 @@ -# Server-NG Bootstrap Plan - -## Goal - -Implement startup for the new `server-ng` binary using: - -- `partitions` -- `consensus` -- `message_bus` -- `metadata` -- `journal` - -and support: - -1. new `config.toml` handling in `server-ng` -2. config loading/parsing -3. metadata journal + snapshot loading -4. `IggyMetadata` restoration -5. partition + log loading (`IGGY-131`) -6. single-shard bootstrap only, without transport/runtime infra for now - -## Plan - -### 1. Config surface - -- Reuse the existing server config surface instead of introducing a separate top-level `server-ng` config module. -- Add an extra config section or extension on top of the current config for `server-ng`-specific additions. -- The main new config requirement is broker capacity limits for: - - maximum streams - - maximum topics - - maximum partitions -- Add validation that these three limits can be packed into a single `u64` namespace layout. -- The validation must be tied directly to `IggyNamespace`, since it packs `stream_id/topic_id/partition_id` into `u64`. -- Keep config loading through the existing config infrastructure, with the extra section layered on top. - -### 2. Server-NG config file - -- Create a dedicated `core/server-ng/config.toml`. -- It should be exactly the same as the existing server config file, plus the extra `server-ng` additions. -- Do not create a reduced or bootstrap-only config file. -- Keep it structurally aligned with the current server config so existing parsing behavior can be reused as much as possible. - -### 3. Bootstrap entrypoint - -- Replace the placeholder `server-ng` main startup with a bootstrap flow. -- Keep startup split into clear phases: - - initialize tracing/logging - - load config - - prepare filesystem paths - - restore metadata - - restore partitions - - build shard -- Leave listeners/timers out for now. -- Add an explicit `TODO` at the bootstrap boundary where transport/runtime services will be started once infrastructure exists. - -### 4. Metadata recovery - -- Reuse the existing recovery code from `core/metadata`. -- Load: - - snapshot - - metadata WAL / journal -- Restore the metadata state machine from snapshot. -- Replay journal entries after the snapshot point. -- Construct `IggyMetadata` from the recovered pieces. - -### 5. Consensus state restoration - -- Initialize it with recovered progress so sequence/commit state matches restored metadata. -- Keep this scoped to single-replica / single-shard bootstrap first. - -### 6. Partition discovery - -- Read restored metadata to determine which partitions exist. -- For each partition, resolve: - - stream id - - topic id - - partition id - - consensus group / namespace info - - persisted storage paths - -### 7. Partition + log loading - -- Reuse the existing partition and log loading code from the current server binary because the on-disk format and directory layout have not changed. -- Use the current `server` binary startup path as the reference implementation. -- In particular, trace the existing logic from the current server `main`/bootstrap flow and adapt that code path into `server-ng`. -- Avoid inventing a parallel restore path unless extraction is required for reuse. -- Scope this part of the work to wiring existing logic into the new bootstrap, not redesigning the storage format. - -### 8. Shard construction - -- Build one shard using: - - restored `IggyMetadata` - - restored `IggyPartitions` - - `message_bus` - - `consensus` -- Keep the initial version fixed to one shard. -- Ensure routing tables and namespace ownership are initialized consistently for that single shard. - -### 9. Runtime services - -- Skip runtime services for now. -- Do not plan TCP listeners, timers, HTTP, QUIC, or related infra in this phase. -- Leave a clear `TODO` in the bootstrap where these services will be attached later. - -### 10. Verification - -- Verify config parsing against `core/server-ng/config.toml`. -- Run `cargo check -p server-ng`. -- Smoke-test startup with: - - empty data directory - - existing metadata snapshot/journal - - existing partition logs - -## Suggested implementation order - -1. `core/server-ng/config.toml` aligned with existing server config plus extras -2. metadata restore path -3. partition restore path by reusing current server bootstrap logic -4. shard construction -5. compile + smoke verification - -## Notes from Feedback - -- Do not introduce a brand new top-level `server-ng` config model if the current config can be reused. -- The only clearly identified new config need right now is namespace-capacity validation for packed `u64` `IggyNamespace`. -- Do not scope this task around listeners or timer startup yet. -- Prefer extracting or reusing existing server bootstrap code over reimplementing storage restore logic from scratch. diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/args.rs similarity index 57% copy from core/server-ng/src/lib.rs copy to core/server-ng/src/args.rs index be74cdd6e..d669b0ded 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/args.rs @@ -17,9 +17,20 @@ * under the License. */ -#![allow(clippy::future_not_send)] +use clap::Parser; -pub mod bootstrap; -pub mod login_register; -pub mod server_error; -pub mod session_manager; +#[derive(Parser, Debug)] +#[command( + author = "Apache Iggy (Incubating)", + version, + about = "Apache Iggy server-ng", + long_about = "Apache Iggy server-ng\n\nUse --replica-id <N> together with a shared cluster config to run one binary per cluster node." +)] +pub struct Args { + /// Identifies this node within `cluster.nodes` by its replica ID. + /// + /// Required when `cluster.enabled = true`. The value must match exactly + /// one `cluster.nodes[*].replica_id` entry in the loaded configuration. + #[arg(long, verbatim_doc_comment)] + pub replica_id: Option<u8>, +} diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index a17dfd064..773111008 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -17,9 +17,11 @@ * under the License. */ +use crate::config_writer::write_current_config; use crate::server_error::ServerNgError; use configs::server::ServerConfig; use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_binary_protocol::RequestHeader; use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; use iggy_common::{ ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, @@ -27,7 +29,13 @@ use iggy_common::{ }; use journal::Journal; use journal::prepare_journal::PrepareJournal; -use message_bus::IggyMessageBus; +use message_bus::client_listener::{self, RequestHandler}; +use message_bus::installer; +use message_bus::replica_io; +use message_bus::replica_listener::MessageHandler; +use message_bus::{ + AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, connector, replica_listener, +}; use metadata::IggyMetadata; use metadata::MuxStateMachine; use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; @@ -44,11 +52,16 @@ use server::bootstrap::create_directories; use server::log::logger::Logging; use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; use server::streaming::segments::storage::create_segment_storage; +use shard::builder::IggyShardBuilder; use shard::shards_table::PapayaShardsTable; -use shard::{IggyShard, PartitionConsensusConfig, ShardIdentity}; -use std::future::pending; +use shard::{ + CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, channel, shard_channel, +}; +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tracing::{info, warn}; @@ -58,47 +71,92 @@ const SHARD_ID: u16 = 0; const SHARD_REPLICA_ID: u8 = 0; const SHARD_NAME: &str = "server-ng-shard-0"; const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml"; +const SHARD_INBOX_CAPACITY: usize = 1024; type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; type ServerNgMetadata = IggyMetadata< - VsrConsensus<IggyMessageBus>, + VsrConsensus<Rc<IggyMessageBus>>, PrepareJournal, IggySnapshot, ServerNgMuxStateMachine, >; type ServerNgShard = IggyShard< - IggyMessageBus, + Rc<IggyMessageBus>, PrepareJournal, IggySnapshot, ServerNgMuxStateMachine, PapayaShardsTable, >; +type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>; + +struct TcpTopology { + self_replica_id: u8, + replica_count: u8, + client_listen_addr: SocketAddr, + replica_listen_addr: Option<SocketAddr>, + peers: Vec<(u8, SocketAddr)>, +} + pub trait RunServerNg { - fn run(&self) -> impl Future<Output = Result<(), ServerNgError>>; + fn run( + &self, + config: &ServerConfig, + current_replica_id: Option<u8>, + ) -> impl Future<Output = Result<(), ServerNgError>>; } impl RunServerNg for Rc<ServerNgShard> { - async fn run(&self) -> Result<(), ServerNgError> { + /// Run the fully bootstrapped `server-ng` shard. + /// + /// # Errors + /// + /// Returns an error if TCP listener bootstrap fails or cluster TCP + /// addresses cannot be resolved from config. + async fn run( + &self, + config: &ServerConfig, + current_replica_id: Option<u8>, + ) -> Result<(), ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let (stop_tx, stop_rx) = channel(1); + let message_pump_shard = Self::clone(self); + let message_pump_handle = compio::runtime::spawn(async move { + message_pump_shard.run_message_pump(stop_rx).await; + }); + self.bus.track_background(message_pump_handle); + + let on_replica_message = make_replica_message_handler(self); + let on_client_request = make_client_request_handler(self); + let accepted_replica = make_local_replica_accept_fn(&self.bus, on_replica_message); + let accepted_client = make_local_client_accept_fn(&self.bus, on_client_request); + info!( shard = self.id, partitions = self.plane.partitions().len(), "server-ng shard initialized" ); - warn!("TODO: start listeners, timers, and runtime services once the new infra exists"); - pending::<()>().await; - #[allow(unreachable_code)] + + if let Err(error) = + start_tcp_runtime(self, config, &topology, accepted_replica, accepted_client).await + { + let _ = stop_tx.try_send(()); + return Err(error); + } + + self.bus.token().wait().await; + let _ = stop_tx.try_send(()); Ok(()) } } -/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// Load config, prepare directories, and complete late logging init. /// /// # Errors /// -/// Returns an error if config loading, directory preparation, logging setup, -/// metadata recovery, or partition hydration fails. -pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, ServerNgError> { +/// Returns an error if config loading, directory preparation, or logging +/// setup fails. +pub async fn load_config(logging: &mut Logging) -> Result<ServerConfig, ServerNgError> { let config = ServerConfig::load_with_path(DEFAULT_CONFIG_PATH, include_str!("../config.toml")) .await .map_err(ServerNgError::Config)?; @@ -114,8 +172,21 @@ pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, Serve ) .map_err(ServerNgError::Logging)?; - iggy_common::MemoryPool::init_pool(&config.system.memory_pool.into_other()); + Ok(config) +} +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if metadata recovery, consensus restoration, or +/// partition hydration fails. +pub async fn bootstrap( + config: &ServerConfig, + current_replica_id: Option<u8>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let bus = Rc::new(IggyMessageBus::new(SHARD_ID)); let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) .await .map_err(ServerNgError::MetadataRecovery)?; @@ -124,13 +195,19 @@ pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, Serve .unwrap_or_else(|| recovered.snapshot.sequence_number()); let metadata = ServerNgMetadata::new( - Some(restore_metadata_consensus(&recovered.journal, restored_op)), + Some(restore_metadata_consensus( + &recovered.journal, + restored_op, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + )), Some(recovered.journal), Some(recovered.snapshot), recovered.mux_stm, Some(PathBuf::from(&config.system.path)), ); - let shard = Rc::new(build_single_shard(&config, metadata).await?); + let shard = build_single_shard(config, &topology, metadata, bus).await?; info!(shard = shard.id, "server-ng bootstrap complete"); Ok(shard) @@ -139,13 +216,16 @@ pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, Serve fn restore_metadata_consensus( journal: &PrepareJournal, restored_op: u64, -) -> VsrConsensus<IggyMessageBus> { + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> VsrConsensus<Rc<IggyMessageBus>> { let mut consensus = VsrConsensus::new( CLUSTER_ID, - SHARD_REPLICA_ID, - 1, + self_replica_id, + replica_count, 0, - IggyMessageBus::new(1, SHARD_ID, 0), + bus, LocalPipeline::new(), ); @@ -170,8 +250,10 @@ fn restore_metadata_consensus( async fn build_single_shard( config: &ServerConfig, + topology: &TcpTopology, metadata: ServerNgMetadata, -) -> Result<ServerNgShard, ServerNgError> { + bus: Rc<IggyMessageBus>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { let shard_id = ShardId::new(SHARD_ID); let partition_count = metadata.mux_stm.streams().read(|inner| { inner @@ -222,7 +304,16 @@ async fn build_single_shard( for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces { validate_recovered_namespace(config, stream_id, topic_id, partition_metadata.id)?; let namespace = IggyNamespace::new(stream_id, topic_id, partition_metadata.id); - let partition = load_partition(config, namespace, topic_stats, &partition_metadata).await?; + let partition = load_partition( + config, + namespace, + topic_stats, + &partition_metadata, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + ) + .await?; let local_idx = partitions.insert(namespace, partition); shards_table.insert( namespace, @@ -230,13 +321,33 @@ async fn build_single_shard( ); } - Ok(IggyShard::without_inbox( + let (sender, inbox) = shard_channel::<()>(SHARD_ID, SHARD_INBOX_CAPACITY); + let senders = vec![sender]; + let shard_handle = Rc::new(RefCell::new(None)); + let on_replica_message = make_deferred_replica_message_handler(&shard_handle); + let on_client_request = make_deferred_client_request_handler(&shard_handle); + let built = IggyShardBuilder::new( ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()), + Rc::clone(&bus), + on_replica_message, + on_client_request, metadata, partitions, + senders, + inbox, shards_table, - PartitionConsensusConfig::new(CLUSTER_ID, 1, IggyMessageBus::new(1, SHARD_ID, 0)), - )) + PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, Rc::clone(&bus)), + CoordinatorConfig::default(), + bus.token(), + ) + .build(); + if let Some(refresh_task) = built.refresh_task { + bus.track_background(refresh_task); + } + + let shard = Rc::new(built.shard); + *shard_handle.borrow_mut() = Some(Rc::downgrade(&shard)); + Ok(shard) } const fn validate_recovered_namespace( @@ -268,17 +379,20 @@ async fn load_partition( namespace: IggyNamespace, topic_stats: Arc<TopicStats>, partition_metadata: &Partition, -) -> Result<IggyPartition<IggyMessageBus>, ServerNgError> { + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> Result<IggyPartition<Rc<IggyMessageBus>>, ServerNgError> { let stream_id = namespace.stream_id(); let topic_id = namespace.topic_id(); let partition_id = namespace.partition_id(); let stats = Arc::new(PartitionStats::new(topic_stats)); let consensus = VsrConsensus::new( CLUSTER_ID, - SHARD_REPLICA_ID, - 1, + self_replica_id, + replica_count, namespace.inner(), - IggyMessageBus::new(1, SHARD_ID, namespace.inner()), + bus, LocalPipeline::new(), ); consensus.init(); @@ -341,7 +455,7 @@ async fn load_partition( } async fn hydrate_partition_log( - partition: &mut IggyPartition<IggyMessageBus>, + partition: &mut IggyPartition<Rc<IggyMessageBus>>, config: &ServerConfig, stream_id: usize, topic_id: usize, @@ -352,14 +466,26 @@ async fn hydrate_partition_log( ) -> Result<(), ServerNgError> { // TODO: decouple the loading logic from the `server` crate. This currently // adapts the old server segmented log into the new `partitions` log. - for (segment, storage) in loaded_log + for (segment_index, (segment, storage)) in loaded_log .segments() .iter() .zip(loaded_log.storages().iter().cloned()) + .enumerate() { - partition - .log - .add_persisted_segment(convert_segment(segment), storage, None, None); + let max_timestamp = match loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()) + { + Some(indexes) => indexes_max_timestamp(indexes), + None => load_segment_max_timestamp(&storage, stream_id, topic_id, partition_id).await?, + }; + partition.log.add_persisted_segment( + convert_segment(segment, max_timestamp), + storage, + None, + None, + ); } if let Some(active_index) = partition.log.segments().len().checked_sub(1) { @@ -369,9 +495,7 @@ async fn hydrate_partition_log( storage.index_reader.as_ref(), ) { let index_path = index_reader.path(); - let index_size = std::fs::metadata(&index_path) - .map(|metadata| metadata.len()) - .unwrap_or(0); + let index_size = std::fs::metadata(&index_path).map_or(0, |metadata| metadata.len()); partition.log.messages_writers_mut()[active_index] = Some(Rc::new( MessagesWriter::new( &messages_reader.path(), @@ -408,12 +532,12 @@ async fn hydrate_partition_log( Ok(()) } -fn convert_segment(segment: &iggy_common::Segment) -> Segment { +fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> Segment { Segment { sealed: segment.sealed, start_timestamp: segment.start_timestamp, end_timestamp: segment.end_timestamp, - max_timestamp: segment.end_timestamp, + max_timestamp, current_position: u64::from(segment.current_position), start_offset: segment.start_offset, end_offset: segment.end_offset, @@ -422,8 +546,41 @@ fn convert_segment(segment: &iggy_common::Segment) -> Segment { } } +fn indexes_max_timestamp(indexes: &server::streaming::segments::IggyIndexesMut) -> u64 { + let mut max_timestamp = 0; + for index in 0..indexes.count() { + if let Some(index_view) = indexes.get(index) { + max_timestamp = max_timestamp.max(index_view.timestamp()); + } + } + + max_timestamp +} + +async fn load_segment_max_timestamp( + storage: &iggy_common::SegmentStorage, + stream_id: usize, + topic_id: usize, + partition_id: usize, +) -> Result<u64, ServerNgError> { + let Some(index_reader) = storage.index_reader.as_ref() else { + return Ok(0); + }; + + let indexes = index_reader + .load_all_indexes_from_disk() + .await + .map_err(|source| ServerNgError::SegmentIndexesLoad { + stream_id, + topic_id, + partition_id, + source, + })?; + Ok(indexes_max_timestamp(&indexes)) +} + fn configure_consumer_offsets( - partition: &mut IggyPartition<IggyMessageBus>, + partition: &mut IggyPartition<Rc<IggyMessageBus>>, config: &ServerConfig, namespace: IggyNamespace, current_offset: u64, @@ -477,7 +634,7 @@ fn configure_consumer_offsets( } async fn ensure_initial_segment( - partition: &mut IggyPartition<IggyMessageBus>, + partition: &mut IggyPartition<Rc<IggyMessageBus>>, config: &ServerConfig, stream_id: usize, topic_id: usize, @@ -541,3 +698,337 @@ async fn ensure_initial_segment( Ok(()) } + +fn resolve_tcp_topology( + config: &ServerConfig, + current_replica_id: Option<u8>, +) -> Result<TcpTopology, ServerNgError> { + let default_client_addr = parse_socket_addr("tcp.address", &config.tcp.address)?; + if !config.cluster.enabled { + return Ok(TcpTopology { + // Keep parity with the current server binary and the integration + // harness: `--replica-id` may be passed unconditionally, but in + // single-node mode there is only replica 0. + self_replica_id: SHARD_REPLICA_ID, + replica_count: 1, + client_listen_addr: default_client_addr, + replica_listen_addr: None, + peers: Vec::new(), + }); + } + + let self_replica_id = current_replica_id.ok_or(ServerNgError::MissingReplicaId)?; + + let self_node = config + .cluster + .nodes + .iter() + .find(|node| node.replica_id == self_replica_id) + .ok_or(ServerNgError::ClusterNodeNotFound { + replica_id: self_replica_id, + })?; + let replica_count = u8::try_from(config.cluster.nodes.len()).map_err(|_| { + ServerNgError::ClusterReplicaCountTooLarge { + count: config.cluster.nodes.len(), + } + })?; + let client_port = self_node + .ports + .tcp + .unwrap_or_else(|| default_client_addr.port()); + let client_listen_addr = + socket_addr_from_parts("cluster.nodes[*].ports.tcp", &self_node.ip, client_port)?; + let replica_port = + self_node + .ports + .tcp_replica + .ok_or(ServerNgError::ClusterReplicaPortMissing { + replica_id: self_node.replica_id, + })?; + let replica_listen_addr = Some(socket_addr_from_parts( + "cluster.nodes[*].ports.tcp_replica", + &self_node.ip, + replica_port, + )?); + let mut peers = Vec::with_capacity(config.cluster.nodes.len().saturating_sub(1)); + for node in &config.cluster.nodes { + if node.replica_id == self_replica_id { + continue; + } + let replica_port = + node.ports + .tcp_replica + .ok_or(ServerNgError::ClusterReplicaPortMissing { + replica_id: node.replica_id, + })?; + peers.push(( + node.replica_id, + socket_addr_from_parts("cluster.nodes[*].ports.tcp_replica", &node.ip, replica_port)?, + )); + } + + Ok(TcpTopology { + self_replica_id, + replica_count, + client_listen_addr, + replica_listen_addr, + peers, + }) +} + +async fn start_tcp_runtime( + shard: &Rc<ServerNgShard>, + config: &ServerConfig, + topology: &TcpTopology, + accepted_replica: AcceptedReplicaFn, + accepted_client: AcceptedClientFn, +) -> Result<(), ServerNgError> { + if config.cluster.enabled { + return start_cluster_tcp_runtime( + shard, + config, + topology, + accepted_replica, + accepted_client, + ) + .await; + } + + start_single_node_tcp_runtime(shard, config, topology, accepted_client).await +} + +async fn start_cluster_tcp_runtime( + shard: &Rc<ServerNgShard>, + config: &ServerConfig, + topology: &TcpTopology, + accepted_replica: AcceptedReplicaFn, + accepted_client: AcceptedClientFn, +) -> Result<(), ServerNgError> { + let replica_addr = topology + .replica_listen_addr + .expect("cluster-enabled topology must include replica listener address"); + if config.tcp.enabled { + let bound = replica_io::start_on_shard_zero_default( + &shard.bus, + replica_addr, + topology.client_listen_addr, + CLUSTER_ID, + topology.self_replica_id, + topology.replica_count, + topology.peers.clone(), + accepted_replica, + accepted_client, + ) + .await + .map_err(ServerNgError::StartTcpListeners)?; + if let Some(bound) = bound { + write_current_config( + config, + Some(topology.self_replica_id), + Some(bound.client), + Some(bound.replica), + ) + .await?; + info!( + shard = shard.id, + client = %bound.client, + replica = %bound.replica, + "server-ng TCP listeners started" + ); + } + return Ok(()); + } + + let (replica_listener, bound_addr) = replica_listener::bind(replica_addr) + .await + .map_err(ServerNgError::StartTcpListeners)?; + let token = shard.bus.token(); + let max_message_size = shard.bus.config().max_message_size; + let self_replica_id = topology.self_replica_id; + let replica_count = topology.replica_count; + let accepted_replica_for_listener = accepted_replica.clone(); + let replica_handle = compio::runtime::spawn(async move { + replica_listener::run( + replica_listener, + token, + CLUSTER_ID, + self_replica_id, + replica_count, + accepted_replica_for_listener, + max_message_size, + ) + .await; + }); + shard.bus.track_background(replica_handle); + connector::start( + &shard.bus, + CLUSTER_ID, + topology.self_replica_id, + topology.peers.clone(), + accepted_replica, + shard.bus.config().reconnect_period, + ) + .await; + write_current_config( + config, + Some(topology.self_replica_id), + None, + Some(bound_addr), + ) + .await?; + info!( + shard = shard.id, + replica = %bound_addr, + "server-ng replica TCP listener started" + ); + warn!("TCP client listener is disabled by config; only replica TCP is running"); + + Ok(()) +} + +async fn start_single_node_tcp_runtime( + shard: &Rc<ServerNgShard>, + config: &ServerConfig, + topology: &TcpTopology, + accepted_client: AcceptedClientFn, +) -> Result<(), ServerNgError> { + if !config.tcp.enabled { + warn!("TCP listener is disabled by config"); + return Ok(()); + } + + let (listener, bound_addr) = client_listener::bind(topology.client_listen_addr) + .await + .map_err(ServerNgError::StartTcpListeners)?; + let token = shard.bus.token(); + let client_handle = compio::runtime::spawn(async move { + client_listener::run(listener, token, accepted_client).await; + }); + shard.bus.track_background(client_handle); + write_current_config( + config, + Some(topology.self_replica_id), + Some(bound_addr), + None, + ) + .await?; + info!( + shard = shard.id, + client = %bound_addr, + "server-ng TCP client listener started" + ); + + Ok(()) +} + +fn make_replica_message_handler(shard: &Rc<ServerNgShard>) -> MessageHandler { + let shard = Rc::clone(shard); + Rc::new(move |_replica_id, message| { + shard.dispatch(message); + }) +} + +fn make_client_request_handler(shard: &Rc<ServerNgShard>) -> RequestHandler { + let shard = Rc::clone(shard); + Rc::new(move |client_id, message| { + let request = match message.try_into_typed::<RequestHeader>() { + Ok(request) => request, + Err(error) => { + warn!(client_id, error = %error, "dropping client request with invalid header"); + return; + } + }; + let request = request.transmute_header(|header, new_header: &mut RequestHeader| { + *new_header = header; + new_header.client = client_id; + }); + shard.dispatch(request.into_generic()); + }) +} + +fn make_deferred_replica_message_handler(shard_handle: &ServerNgShardHandle) -> MessageHandler { + let shard_handle = Rc::clone(shard_handle); + Rc::new(move |_replica_id, message| { + if let Some(shard) = upgrade_shard_handle(&shard_handle) { + shard.dispatch(message); + } + }) +} + +fn make_deferred_client_request_handler(shard_handle: &ServerNgShardHandle) -> RequestHandler { + let shard_handle = Rc::clone(shard_handle); + Rc::new(move |client_id, message| { + let Some(shard) = upgrade_shard_handle(&shard_handle) else { + return; + }; + let request = match message.try_into_typed::<RequestHeader>() { + Ok(request) => request, + Err(error) => { + warn!(client_id, error = %error, "dropping client request with invalid header"); + return; + } + }; + let request = request.transmute_header(|header, new_header: &mut RequestHeader| { + *new_header = header; + new_header.client = client_id; + }); + shard.dispatch(request.into_generic()); + }) +} + +fn upgrade_shard_handle(shard_handle: &ServerNgShardHandle) -> Option<Rc<ServerNgShard>> { + shard_handle + .borrow() + .as_ref() + .and_then(std::rc::Weak::upgrade) +} + +fn make_local_replica_accept_fn( + bus: &Rc<IggyMessageBus>, + on_message: MessageHandler, +) -> AcceptedReplicaFn { + let bus = Rc::clone(bus); + Rc::new(move |stream, peer_id| { + installer::install_replica_stream(&bus, peer_id, stream, on_message.clone()); + }) +} + +fn make_local_client_accept_fn( + bus: &Rc<IggyMessageBus>, + on_request: RequestHandler, +) -> AcceptedClientFn { + let bus = Rc::clone(bus); + let counter = Rc::new(Cell::new(1_u128)); + let shard_id = u128::from(bus.shard_id()); + Rc::new(move |stream| { + let seq = counter.get(); + counter.set(seq.wrapping_add(1)); + let client_id = (shard_id << 112) | seq; + installer::install_client_stream(&bus, client_id, stream, on_request.clone()); + }) +} + +fn parse_socket_addr(context: &'static str, address: &str) -> Result<SocketAddr, ServerNgError> { + address + .parse() + .map_err(|source| ServerNgError::SocketAddressParse { + context, + address: address.to_string(), + source, + }) +} + +fn socket_addr_from_parts( + context: &'static str, + host: &str, + port: u16, +) -> Result<SocketAddr, ServerNgError> { + let ip = host + .parse::<IpAddr>() + .map_err(|source| ServerNgError::SocketAddressParse { + context, + address: format!("{host}:{port}"), + source, + })?; + Ok(SocketAddr::new(ip, port)) +} diff --git a/core/server-ng/src/config_writer.rs b/core/server-ng/src/config_writer.rs new file mode 100644 index 000000000..9c05b9976 --- /dev/null +++ b/core/server-ng/src/config_writer.rs @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::server_error::ServerNgError; +use compio::fs::OpenOptions; +use compio::io::AsyncWriteAtExt; +use configs::server::ServerConfig; +use std::net::SocketAddr; + +/// Write the runtime `current_config.toml` file with the effective bound ports. +/// +/// # Errors +/// +/// Returns an error if the config cannot be serialized or if the runtime +/// config file cannot be written and synced. +pub async fn write_current_config( + config: &ServerConfig, + current_replica_id: Option<u8>, + bound_tcp: Option<SocketAddr>, + bound_replica: Option<SocketAddr>, +) -> Result<(), ServerNgError> { + let mut current_config = config.clone(); + + if let Some(bound_tcp) = bound_tcp { + // Keep parity with the current server binary: integration harnesses + // read `tcp.address` from `runtime/current_config.toml` to discover + // the actual port chosen by the OS when binding to port 0. + current_config.tcp.address = bound_tcp.to_string(); + } + + if current_config.cluster.enabled + && let Some(replica_id) = current_replica_id + { + let node = current_config + .cluster + .nodes + .iter_mut() + .find(|node| node.replica_id == replica_id) + .ok_or(ServerNgError::ClusterNodeNotFound { replica_id })?; + if let Some(bound_tcp) = bound_tcp { + node.ports.tcp = Some(bound_tcp.port()); + } + if let Some(bound_replica) = bound_replica { + node.ports.tcp_replica = Some(bound_replica.port()); + } + } + + let runtime_path = current_config.system.get_runtime_path(); + let config_path = format!("{runtime_path}/current_config.toml"); + let content = + toml::to_string(¤t_config).map_err(ServerNgError::CurrentConfigSerialize)?; + + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&config_path) + .await + .map_err(|source| ServerNgError::CurrentConfigWrite { + path: config_path.clone(), + source, + })?; + + file.write_all_at(content.into_bytes(), 0) + .await + .0 + .map_err(|source| ServerNgError::CurrentConfigWrite { + path: config_path.clone(), + source, + })?; + + file.sync_all() + .await + .map_err(|source| ServerNgError::CurrentConfigWrite { + path: config_path, + source, + })?; + + Ok(()) +} diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs index be74cdd6e..745ca8414 100644 --- a/core/server-ng/src/lib.rs +++ b/core/server-ng/src/lib.rs @@ -20,6 +20,7 @@ #![allow(clippy::future_not_send)] pub mod bootstrap; +pub mod config_writer; pub mod login_register; pub mod server_error; pub mod session_manager; diff --git a/core/server-ng/src/main.rs b/core/server-ng/src/main.rs index e7223ff72..169f32f65 100644 --- a/core/server-ng/src/main.rs +++ b/core/server-ng/src/main.rs @@ -19,6 +19,10 @@ #![allow(clippy::future_not_send)] +mod args; + +use args::Args; +use clap::Parser; use server_ng::bootstrap::RunServerNg; use server_ng::server_error::ServerNgError; @@ -47,6 +51,7 @@ fn main() -> Result<(), ServerNgError> { } }; runtime.block_on(async { + let args = Args::parse(); if let Ok(env_path) = std::env::var("IGGY_ENV_PATH") { let _ = dotenvy::from_path(&env_path); } else { @@ -58,7 +63,10 @@ fn main() -> Result<(), ServerNgError> { let mut logging = server::log::logger::Logging::new(); logging.early_init(); - let shard = server_ng::bootstrap::bootstrap(&mut logging).await?; - shard.run().await + let config = server_ng::bootstrap::load_config(&mut logging).await?; + iggy_common::MemoryPool::init_pool(&config.system.memory_pool.into_other()); + + let shard = server_ng::bootstrap::bootstrap(&config, args.replica_id).await?; + shard.run(&config, args.replica_id).await }) } diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index 1a667fc4b..49dd7cc7e 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -28,10 +28,33 @@ pub enum ServerNgError { Config(#[source] configs::ConfigurationError), #[error("failed to prepare server-ng directories")] CreateDirectories(#[source] iggy_common::IggyError), + #[error("failed to serialize current server-ng config")] + CurrentConfigSerialize(#[source] toml::ser::Error), + #[error("failed to write current server-ng config at {path}")] + CurrentConfigWrite { + path: String, + #[source] + source: std::io::Error, + }, #[error("failed to initialize server-ng logging")] Logging(#[source] LogError), #[error("failed to recover metadata snapshot and journal")] MetadataRecovery(#[source] RecoveryError), + #[error("failed to parse {context} socket address '{address}'")] + SocketAddressParse { + context: &'static str, + address: String, + #[source] + source: std::net::AddrParseError, + }, + #[error("cluster enabled but no node is configured for replica {replica_id}")] + ClusterNodeNotFound { replica_id: u8 }, + #[error("cluster node count {count} exceeds supported u8 replica count")] + ClusterReplicaCountTooLarge { count: usize }, + #[error("cluster mode requires --replica-id to identify the current node")] + MissingReplicaId, + #[error("cluster node for replica {replica_id} is missing tcp_replica port")] + ClusterReplicaPortMissing { replica_id: u8 }, #[error( "recovered namespace stream {stream_id}, topic {topic_id}, partition {partition_id} exceeds configured limits (max_streams={max_streams}, max_topics={max_topics}, max_partitions={max_partitions})" )] @@ -53,6 +76,8 @@ pub enum ServerNgError { #[source] source: iggy_common::IggyError, }, + #[error("failed to start server-ng TCP listeners")] + StartTcpListeners(#[source] iggy_common::IggyError), #[error( "failed to initialize messages writer for stream {stream_id}, topic {topic_id}, partition {partition_id}" )] @@ -73,6 +98,16 @@ pub enum ServerNgError { #[source] source: iggy_common::IggyError, }, + #[error( + "failed to load segment indexes for stream {stream_id}, topic {topic_id}, partition {partition_id}" + )] + SegmentIndexesLoad { + stream_id: usize, + topic_id: usize, + partition_id: usize, + #[source] + source: iggy_common::IggyError, + }, #[error( "failed to create initial segment storage for stream {stream_id}, topic {topic_id}, partition {partition_id}" )] diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs index 95c1be665..78e28c465 100644 --- a/core/shard/src/router.rs +++ b/core/shard/src/router.rs @@ -259,12 +259,16 @@ where Error = iggy_common::IggyError, > + StreamsFrontend, { + let mut loopback_buf = Vec::new(); loop { futures::select! { _ = stop.recv().fuse() => break, frame = self.inbox.recv().fuse() => { match frame { - Ok(frame) => self.process_frame(frame).await, + Ok(frame) => { + self.process_frame(frame).await; + self.process_loopback(&mut loopback_buf).await; + } Err(_) => break, } } @@ -274,6 +278,7 @@ where // Drain remaining frames so in-flight requests get a response. while let Ok(frame) = self.inbox.try_recv() { self.process_frame(frame).await; + self.process_loopback(&mut loopback_buf).await; } }
