This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7fabdb0ca672ab184ac405a97c299edfd56ba49c
Author: numinex <[email protected]>
AuthorDate: Fri Apr 24 17:11:02 2026 +0200

    finito
---
 core/server-ng/PLAN.md                 | 127 --------
 core/server-ng/src/{lib.rs => args.rs} |  21 +-
 core/server-ng/src/bootstrap.rs        | 577 ++++++++++++++++++++++++++++++---
 core/server-ng/src/config_writer.rs    |  96 ++++++
 core/server-ng/src/lib.rs              |   1 +
 core/server-ng/src/main.rs             |  12 +-
 core/server-ng/src/server_error.rs     |  35 ++
 core/shard/src/router.rs               |   7 +-
 8 files changed, 698 insertions(+), 178 deletions(-)

diff --git a/core/server-ng/PLAN.md b/core/server-ng/PLAN.md
deleted file mode 100644
index 006b0a5bf..000000000
--- a/core/server-ng/PLAN.md
+++ /dev/null
@@ -1,127 +0,0 @@
-# Server-NG Bootstrap Plan
-
-## Goal
-
-Implement startup for the new `server-ng` binary using:
-
-- `partitions`
-- `consensus`
-- `message_bus`
-- `metadata`
-- `journal`
-
-and support:
-
-1. new `config.toml` handling in `server-ng`
-2. config loading/parsing
-3. metadata journal + snapshot loading
-4. `IggyMetadata` restoration
-5. partition + log loading (`IGGY-131`)
-6. single-shard bootstrap only, without transport/runtime infra for now
-
-## Plan
-
-### 1. Config surface
-
-- Reuse the existing server config surface instead of introducing a separate 
top-level `server-ng` config module.
-- Add an extra config section or extension on top of the current config for 
`server-ng`-specific additions.
-- The main new config requirement is broker capacity limits for:
-  - maximum streams
-  - maximum topics
-  - maximum partitions
-- Add validation that these three limits can be packed into a single `u64` 
namespace layout.
-- The validation must be tied directly to `IggyNamespace`, since it packs 
`stream_id/topic_id/partition_id` into `u64`.
-- Keep config loading through the existing config infrastructure, with the 
extra section layered on top.
-
-### 2. Server-NG config file
-
-- Create a dedicated `core/server-ng/config.toml`.
-- It should be exactly the same as the existing server config file, plus the 
extra `server-ng` additions.
-- Do not create a reduced or bootstrap-only config file.
-- Keep it structurally aligned with the current server config so existing 
parsing behavior can be reused as much as possible.
-
-### 3. Bootstrap entrypoint
-
-- Replace the placeholder `server-ng` main startup with a bootstrap flow.
-- Keep startup split into clear phases:
-  - initialize tracing/logging
-  - load config
-  - prepare filesystem paths
-  - restore metadata
-  - restore partitions
-  - build shard
-- Leave listeners/timers out for now.
-- Add an explicit `TODO` at the bootstrap boundary where transport/runtime 
services will be started once infrastructure exists.
-
-### 4. Metadata recovery
-
-- Reuse the existing recovery code from `core/metadata`.
-- Load:
-  - snapshot
-  - metadata WAL / journal
-- Restore the metadata state machine from snapshot.
-- Replay journal entries after the snapshot point.
-- Construct `IggyMetadata` from the recovered pieces.
-
-### 5. Consensus state restoration
-
-- Initialize it with recovered progress so sequence/commit state matches 
restored metadata.
-- Keep this scoped to single-replica / single-shard bootstrap first.
-
-### 6. Partition discovery
-
-- Read restored metadata to determine which partitions exist.
-- For each partition, resolve:
-  - stream id
-  - topic id
-  - partition id
-  - consensus group / namespace info
-  - persisted storage paths
-
-### 7. Partition + log loading
-
-- Reuse the existing partition and log loading code from the current server 
binary because the on-disk format and directory layout have not changed.
-- Use the current `server` binary startup path as the reference implementation.
-- In particular, trace the existing logic from the current server 
`main`/bootstrap flow and adapt that code path into `server-ng`.
-- Avoid inventing a parallel restore path unless extraction is required for 
reuse.
-- Scope this part of the work to wiring existing logic into the new bootstrap, 
not redesigning the storage format.
-
-### 8. Shard construction
-
-- Build one shard using:
-  - restored `IggyMetadata`
-  - restored `IggyPartitions`
-  - `message_bus`
-  - `consensus`
-- Keep the initial version fixed to one shard.
-- Ensure routing tables and namespace ownership are initialized consistently 
for that single shard.
-
-### 9. Runtime services
-
-- Skip runtime services for now.
-- Do not plan TCP listeners, timers, HTTP, QUIC, or related infra in this 
phase.
-- Leave a clear `TODO` in the bootstrap where these services will be attached 
later.
-
-### 10. Verification
-
-- Verify config parsing against `core/server-ng/config.toml`.
-- Run `cargo check -p server-ng`.
-- Smoke-test startup with:
-  - empty data directory
-  - existing metadata snapshot/journal
-  - existing partition logs
-
-## Suggested implementation order
-
-1. `core/server-ng/config.toml` aligned with existing server config plus extras
-2. metadata restore path
-3. partition restore path by reusing current server bootstrap logic
-4. shard construction
-5. compile + smoke verification
-
-## Notes from Feedback
-
-- Do not introduce a brand new top-level `server-ng` config model if the 
current config can be reused.
-- The only clearly identified new config need right now is namespace-capacity 
validation for packed `u64` `IggyNamespace`.
-- Do not scope this task around listeners or timer startup yet.
-- Prefer extracting or reusing existing server bootstrap code over 
reimplementing storage restore logic from scratch.
diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/args.rs
similarity index 57%
copy from core/server-ng/src/lib.rs
copy to core/server-ng/src/args.rs
index be74cdd6e..d669b0ded 100644
--- a/core/server-ng/src/lib.rs
+++ b/core/server-ng/src/args.rs
@@ -17,9 +17,20 @@
  * under the License.
  */
 
-#![allow(clippy::future_not_send)]
+use clap::Parser;
 
-pub mod bootstrap;
-pub mod login_register;
-pub mod server_error;
-pub mod session_manager;
+#[derive(Parser, Debug)]
+#[command(
+    author = "Apache Iggy (Incubating)",
+    version,
+    about = "Apache Iggy server-ng",
+    long_about = "Apache Iggy server-ng\n\nUse --replica-id <N> together with 
a shared cluster config to run one binary per cluster node."
+)]
+pub struct Args {
+    /// Identifies this node within `cluster.nodes` by its replica ID.
+    ///
+    /// Required when `cluster.enabled = true`. The value must match exactly
+    /// one `cluster.nodes[*].replica_id` entry in the loaded configuration.
+    #[arg(long, verbatim_doc_comment)]
+    pub replica_id: Option<u8>,
+}
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index a17dfd064..773111008 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -17,9 +17,11 @@
  * under the License.
  */
 
+use crate::config_writer::write_current_config;
 use crate::server_error::ServerNgError;
 use configs::server::ServerConfig;
 use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
 use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
 use iggy_common::{
     ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
@@ -27,7 +29,13 @@ use iggy_common::{
 };
 use journal::Journal;
 use journal::prepare_journal::PrepareJournal;
-use message_bus::IggyMessageBus;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::installer;
+use message_bus::replica_io;
+use message_bus::replica_listener::MessageHandler;
+use message_bus::{
+    AcceptedClientFn, AcceptedReplicaFn, IggyMessageBus, connector, 
replica_listener,
+};
 use metadata::IggyMetadata;
 use metadata::MuxStateMachine;
 use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
@@ -44,11 +52,16 @@ use server::bootstrap::create_directories;
 use server::log::logger::Logging;
 use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
 use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
 use shard::shards_table::PapayaShardsTable;
-use shard::{IggyShard, PartitionConsensusConfig, ShardIdentity};
-use std::future::pending;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
 use std::path::{Path, PathBuf};
-use std::rc::Rc;
+use std::rc::{Rc, Weak};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
 use tracing::{info, warn};
@@ -58,47 +71,92 @@ const SHARD_ID: u16 = 0;
 const SHARD_REPLICA_ID: u8 = 0;
 const SHARD_NAME: &str = "server-ng-shard-0";
 const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml";
+const SHARD_INBOX_CAPACITY: usize = 1024;
 
 type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
 type ServerNgMetadata = IggyMetadata<
-    VsrConsensus<IggyMessageBus>,
+    VsrConsensus<Rc<IggyMessageBus>>,
     PrepareJournal,
     IggySnapshot,
     ServerNgMuxStateMachine,
 >;
 type ServerNgShard = IggyShard<
-    IggyMessageBus,
+    Rc<IggyMessageBus>,
     PrepareJournal,
     IggySnapshot,
     ServerNgMuxStateMachine,
     PapayaShardsTable,
 >;
 
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
 pub trait RunServerNg {
-    fn run(&self) -> impl Future<Output = Result<(), ServerNgError>>;
+    fn run(
+        &self,
+        config: &ServerConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
 }
 
 impl RunServerNg for Rc<ServerNgShard> {
-    async fn run(&self) -> Result<(), ServerNgError> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fn(&self.bus, 
on_client_request);
+
         info!(
             shard = self.id,
             partitions = self.plane.partitions().len(),
             "server-ng shard initialized"
         );
-        warn!("TODO: start listeners, timers, and runtime services once the 
new infra exists");
-        pending::<()>().await;
-        #[allow(unreachable_code)]
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
         Ok(())
     }
 }
 
-/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+/// Load config, prepare directories, and complete late logging init.
 ///
 /// # Errors
 ///
-/// Returns an error if config loading, directory preparation, logging setup,
-/// metadata recovery, or partition hydration fails.
-pub async fn bootstrap(logging: &mut Logging) -> Result<Rc<ServerNgShard>, 
ServerNgError> {
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerConfig, 
ServerNgError> {
     let config = ServerConfig::load_with_path(DEFAULT_CONFIG_PATH, 
include_str!("../config.toml"))
         .await
         .map_err(ServerNgError::Config)?;
@@ -114,8 +172,21 @@ pub async fn bootstrap(logging: &mut Logging) -> 
Result<Rc<ServerNgShard>, Serve
         )
         .map_err(ServerNgError::Logging)?;
 
-    
iggy_common::MemoryPool::init_pool(&config.system.memory_pool.into_other());
+    Ok(config)
+}
 
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::new(SHARD_ID));
     let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
         .await
         .map_err(ServerNgError::MetadataRecovery)?;
@@ -124,13 +195,19 @@ pub async fn bootstrap(logging: &mut Logging) -> 
Result<Rc<ServerNgShard>, Serve
         .unwrap_or_else(|| recovered.snapshot.sequence_number());
 
     let metadata = ServerNgMetadata::new(
-        Some(restore_metadata_consensus(&recovered.journal, restored_op)),
+        Some(restore_metadata_consensus(
+            &recovered.journal,
+            restored_op,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )),
         Some(recovered.journal),
         Some(recovered.snapshot),
         recovered.mux_stm,
         Some(PathBuf::from(&config.system.path)),
     );
-    let shard = Rc::new(build_single_shard(&config, metadata).await?);
+    let shard = build_single_shard(config, &topology, metadata, bus).await?;
     info!(shard = shard.id, "server-ng bootstrap complete");
 
     Ok(shard)
@@ -139,13 +216,16 @@ pub async fn bootstrap(logging: &mut Logging) -> 
Result<Rc<ServerNgShard>, Serve
 fn restore_metadata_consensus(
     journal: &PrepareJournal,
     restored_op: u64,
-) -> VsrConsensus<IggyMessageBus> {
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> VsrConsensus<Rc<IggyMessageBus>> {
     let mut consensus = VsrConsensus::new(
         CLUSTER_ID,
-        SHARD_REPLICA_ID,
-        1,
+        self_replica_id,
+        replica_count,
         0,
-        IggyMessageBus::new(1, SHARD_ID, 0),
+        bus,
         LocalPipeline::new(),
     );
 
@@ -170,8 +250,10 @@ fn restore_metadata_consensus(
 
 async fn build_single_shard(
     config: &ServerConfig,
+    topology: &TcpTopology,
     metadata: ServerNgMetadata,
-) -> Result<ServerNgShard, ServerNgError> {
+    bus: Rc<IggyMessageBus>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
     let shard_id = ShardId::new(SHARD_ID);
     let partition_count = metadata.mux_stm.streams().read(|inner| {
         inner
@@ -222,7 +304,16 @@ async fn build_single_shard(
     for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces {
         validate_recovered_namespace(config, stream_id, topic_id, 
partition_metadata.id)?;
         let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_metadata.id);
-        let partition = load_partition(config, namespace, topic_stats, 
&partition_metadata).await?;
+        let partition = load_partition(
+            config,
+            namespace,
+            topic_stats,
+            &partition_metadata,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )
+        .await?;
         let local_idx = partitions.insert(namespace, partition);
         shards_table.insert(
             namespace,
@@ -230,13 +321,33 @@ async fn build_single_shard(
         );
     }
 
-    Ok(IggyShard::without_inbox(
+    let (sender, inbox) = shard_channel::<()>(SHARD_ID, SHARD_INBOX_CAPACITY);
+    let senders = vec![sender];
+    let shard_handle = Rc::new(RefCell::new(None));
+    let on_replica_message = 
make_deferred_replica_message_handler(&shard_handle);
+    let on_client_request = 
make_deferred_client_request_handler(&shard_handle);
+    let built = IggyShardBuilder::new(
         ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()),
+        Rc::clone(&bus),
+        on_replica_message,
+        on_client_request,
         metadata,
         partitions,
+        senders,
+        inbox,
         shards_table,
-        PartitionConsensusConfig::new(CLUSTER_ID, 1, IggyMessageBus::new(1, 
SHARD_ID, 0)),
-    ))
+        PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, 
Rc::clone(&bus)),
+        CoordinatorConfig::default(),
+        bus.token(),
+    )
+    .build();
+    if let Some(refresh_task) = built.refresh_task {
+        bus.track_background(refresh_task);
+    }
+
+    let shard = Rc::new(built.shard);
+    *shard_handle.borrow_mut() = Some(Rc::downgrade(&shard));
+    Ok(shard)
 }
 
 const fn validate_recovered_namespace(
@@ -268,17 +379,20 @@ async fn load_partition(
     namespace: IggyNamespace,
     topic_stats: Arc<TopicStats>,
     partition_metadata: &Partition,
-) -> Result<IggyPartition<IggyMessageBus>, ServerNgError> {
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> Result<IggyPartition<Rc<IggyMessageBus>>, ServerNgError> {
     let stream_id = namespace.stream_id();
     let topic_id = namespace.topic_id();
     let partition_id = namespace.partition_id();
     let stats = Arc::new(PartitionStats::new(topic_stats));
     let consensus = VsrConsensus::new(
         CLUSTER_ID,
-        SHARD_REPLICA_ID,
-        1,
+        self_replica_id,
+        replica_count,
         namespace.inner(),
-        IggyMessageBus::new(1, SHARD_ID, namespace.inner()),
+        bus,
         LocalPipeline::new(),
     );
     consensus.init();
@@ -341,7 +455,7 @@ async fn load_partition(
 }
 
 async fn hydrate_partition_log(
-    partition: &mut IggyPartition<IggyMessageBus>,
+    partition: &mut IggyPartition<Rc<IggyMessageBus>>,
     config: &ServerConfig,
     stream_id: usize,
     topic_id: usize,
@@ -352,14 +466,26 @@ async fn hydrate_partition_log(
 ) -> Result<(), ServerNgError> {
     // TODO: decouple the loading logic from the `server` crate. This currently
     // adapts the old server segmented log into the new `partitions` log.
-    for (segment, storage) in loaded_log
+    for (segment_index, (segment, storage)) in loaded_log
         .segments()
         .iter()
         .zip(loaded_log.storages().iter().cloned())
+        .enumerate()
     {
-        partition
-            .log
-            .add_persisted_segment(convert_segment(segment), storage, None, 
None);
+        let max_timestamp = match loaded_log
+            .indexes()
+            .get(segment_index)
+            .and_then(|indexes| indexes.as_ref())
+        {
+            Some(indexes) => indexes_max_timestamp(indexes),
+            None => load_segment_max_timestamp(&storage, stream_id, topic_id, 
partition_id).await?,
+        };
+        partition.log.add_persisted_segment(
+            convert_segment(segment, max_timestamp),
+            storage,
+            None,
+            None,
+        );
     }
 
     if let Some(active_index) = partition.log.segments().len().checked_sub(1) {
@@ -369,9 +495,7 @@ async fn hydrate_partition_log(
             storage.index_reader.as_ref(),
         ) {
             let index_path = index_reader.path();
-            let index_size = std::fs::metadata(&index_path)
-                .map(|metadata| metadata.len())
-                .unwrap_or(0);
+            let index_size = std::fs::metadata(&index_path).map_or(0, 
|metadata| metadata.len());
             partition.log.messages_writers_mut()[active_index] = Some(Rc::new(
                 MessagesWriter::new(
                     &messages_reader.path(),
@@ -408,12 +532,12 @@ async fn hydrate_partition_log(
     Ok(())
 }
 
-fn convert_segment(segment: &iggy_common::Segment) -> Segment {
+fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> 
Segment {
     Segment {
         sealed: segment.sealed,
         start_timestamp: segment.start_timestamp,
         end_timestamp: segment.end_timestamp,
-        max_timestamp: segment.end_timestamp,
+        max_timestamp,
         current_position: u64::from(segment.current_position),
         start_offset: segment.start_offset,
         end_offset: segment.end_offset,
@@ -422,8 +546,41 @@ fn convert_segment(segment: &iggy_common::Segment) -> 
Segment {
     }
 }
 
+fn indexes_max_timestamp(indexes: 
&server::streaming::segments::IggyIndexesMut) -> u64 {
+    let mut max_timestamp = 0;
+    for index in 0..indexes.count() {
+        if let Some(index_view) = indexes.get(index) {
+            max_timestamp = max_timestamp.max(index_view.timestamp());
+        }
+    }
+
+    max_timestamp
+}
+
+async fn load_segment_max_timestamp(
+    storage: &iggy_common::SegmentStorage,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+) -> Result<u64, ServerNgError> {
+    let Some(index_reader) = storage.index_reader.as_ref() else {
+        return Ok(0);
+    };
+
+    let indexes = index_reader
+        .load_all_indexes_from_disk()
+        .await
+        .map_err(|source| ServerNgError::SegmentIndexesLoad {
+            stream_id,
+            topic_id,
+            partition_id,
+            source,
+        })?;
+    Ok(indexes_max_timestamp(&indexes))
+}
+
 fn configure_consumer_offsets(
-    partition: &mut IggyPartition<IggyMessageBus>,
+    partition: &mut IggyPartition<Rc<IggyMessageBus>>,
     config: &ServerConfig,
     namespace: IggyNamespace,
     current_offset: u64,
@@ -477,7 +634,7 @@ fn configure_consumer_offsets(
 }
 
 async fn ensure_initial_segment(
-    partition: &mut IggyPartition<IggyMessageBus>,
+    partition: &mut IggyPartition<Rc<IggyMessageBus>>,
     config: &ServerConfig,
     stream_id: usize,
     topic_id: usize,
@@ -541,3 +698,337 @@ async fn ensure_initial_segment(
 
     Ok(())
 }
+
+fn resolve_tcp_topology(
+    config: &ServerConfig,
+    current_replica_id: Option<u8>,
+) -> Result<TcpTopology, ServerNgError> {
+    let default_client_addr = parse_socket_addr("tcp.address", 
&config.tcp.address)?;
+    if !config.cluster.enabled {
+        return Ok(TcpTopology {
+            // Keep parity with the current server binary and the integration
+            // harness: `--replica-id` may be passed unconditionally, but in
+            // single-node mode there is only replica 0.
+            self_replica_id: SHARD_REPLICA_ID,
+            replica_count: 1,
+            client_listen_addr: default_client_addr,
+            replica_listen_addr: None,
+            peers: Vec::new(),
+        });
+    }
+
+    let self_replica_id = 
current_replica_id.ok_or(ServerNgError::MissingReplicaId)?;
+
+    let self_node = config
+        .cluster
+        .nodes
+        .iter()
+        .find(|node| node.replica_id == self_replica_id)
+        .ok_or(ServerNgError::ClusterNodeNotFound {
+            replica_id: self_replica_id,
+        })?;
+    let replica_count = u8::try_from(config.cluster.nodes.len()).map_err(|_| {
+        ServerNgError::ClusterReplicaCountTooLarge {
+            count: config.cluster.nodes.len(),
+        }
+    })?;
+    let client_port = self_node
+        .ports
+        .tcp
+        .unwrap_or_else(|| default_client_addr.port());
+    let client_listen_addr =
+        socket_addr_from_parts("cluster.nodes[*].ports.tcp", &self_node.ip, 
client_port)?;
+    let replica_port =
+        self_node
+            .ports
+            .tcp_replica
+            .ok_or(ServerNgError::ClusterReplicaPortMissing {
+                replica_id: self_node.replica_id,
+            })?;
+    let replica_listen_addr = Some(socket_addr_from_parts(
+        "cluster.nodes[*].ports.tcp_replica",
+        &self_node.ip,
+        replica_port,
+    )?);
+    let mut peers = 
Vec::with_capacity(config.cluster.nodes.len().saturating_sub(1));
+    for node in &config.cluster.nodes {
+        if node.replica_id == self_replica_id {
+            continue;
+        }
+        let replica_port =
+            node.ports
+                .tcp_replica
+                .ok_or(ServerNgError::ClusterReplicaPortMissing {
+                    replica_id: node.replica_id,
+                })?;
+        peers.push((
+            node.replica_id,
+            socket_addr_from_parts("cluster.nodes[*].ports.tcp_replica", 
&node.ip, replica_port)?,
+        ));
+    }
+
+    Ok(TcpTopology {
+        self_replica_id,
+        replica_count,
+        client_listen_addr,
+        replica_listen_addr,
+        peers,
+    })
+}
+
+async fn start_tcp_runtime(
+    shard: &Rc<ServerNgShard>,
+    config: &ServerConfig,
+    topology: &TcpTopology,
+    accepted_replica: AcceptedReplicaFn,
+    accepted_client: AcceptedClientFn,
+) -> Result<(), ServerNgError> {
+    if config.cluster.enabled {
+        return start_cluster_tcp_runtime(
+            shard,
+            config,
+            topology,
+            accepted_replica,
+            accepted_client,
+        )
+        .await;
+    }
+
+    start_single_node_tcp_runtime(shard, config, topology, 
accepted_client).await
+}
+
+async fn start_cluster_tcp_runtime(
+    shard: &Rc<ServerNgShard>,
+    config: &ServerConfig,
+    topology: &TcpTopology,
+    accepted_replica: AcceptedReplicaFn,
+    accepted_client: AcceptedClientFn,
+) -> Result<(), ServerNgError> {
+    let replica_addr = topology
+        .replica_listen_addr
+        .expect("cluster-enabled topology must include replica listener 
address");
+    if config.tcp.enabled {
+        let bound = replica_io::start_on_shard_zero_default(
+            &shard.bus,
+            replica_addr,
+            topology.client_listen_addr,
+            CLUSTER_ID,
+            topology.self_replica_id,
+            topology.replica_count,
+            topology.peers.clone(),
+            accepted_replica,
+            accepted_client,
+        )
+        .await
+        .map_err(ServerNgError::StartTcpListeners)?;
+        if let Some(bound) = bound {
+            write_current_config(
+                config,
+                Some(topology.self_replica_id),
+                Some(bound.client),
+                Some(bound.replica),
+            )
+            .await?;
+            info!(
+                shard = shard.id,
+                client = %bound.client,
+                replica = %bound.replica,
+                "server-ng TCP listeners started"
+            );
+        }
+        return Ok(());
+    }
+
+    let (replica_listener, bound_addr) = replica_listener::bind(replica_addr)
+        .await
+        .map_err(ServerNgError::StartTcpListeners)?;
+    let token = shard.bus.token();
+    let max_message_size = shard.bus.config().max_message_size;
+    let self_replica_id = topology.self_replica_id;
+    let replica_count = topology.replica_count;
+    let accepted_replica_for_listener = accepted_replica.clone();
+    let replica_handle = compio::runtime::spawn(async move {
+        replica_listener::run(
+            replica_listener,
+            token,
+            CLUSTER_ID,
+            self_replica_id,
+            replica_count,
+            accepted_replica_for_listener,
+            max_message_size,
+        )
+        .await;
+    });
+    shard.bus.track_background(replica_handle);
+    connector::start(
+        &shard.bus,
+        CLUSTER_ID,
+        topology.self_replica_id,
+        topology.peers.clone(),
+        accepted_replica,
+        shard.bus.config().reconnect_period,
+    )
+    .await;
+    write_current_config(
+        config,
+        Some(topology.self_replica_id),
+        None,
+        Some(bound_addr),
+    )
+    .await?;
+    info!(
+        shard = shard.id,
+        replica = %bound_addr,
+        "server-ng replica TCP listener started"
+    );
+    warn!("TCP client listener is disabled by config; only replica TCP is 
running");
+
+    Ok(())
+}
+
+async fn start_single_node_tcp_runtime(
+    shard: &Rc<ServerNgShard>,
+    config: &ServerConfig,
+    topology: &TcpTopology,
+    accepted_client: AcceptedClientFn,
+) -> Result<(), ServerNgError> {
+    if !config.tcp.enabled {
+        warn!("TCP listener is disabled by config");
+        return Ok(());
+    }
+
+    let (listener, bound_addr) = 
client_listener::bind(topology.client_listen_addr)
+        .await
+        .map_err(ServerNgError::StartTcpListeners)?;
+    let token = shard.bus.token();
+    let client_handle = compio::runtime::spawn(async move {
+        client_listener::run(listener, token, accepted_client).await;
+    });
+    shard.bus.track_background(client_handle);
+    write_current_config(
+        config,
+        Some(topology.self_replica_id),
+        Some(bound_addr),
+        None,
+    )
+    .await?;
+    info!(
+        shard = shard.id,
+        client = %bound_addr,
+        "server-ng TCP client listener started"
+    );
+
+    Ok(())
+}
+
+fn make_replica_message_handler(shard: &Rc<ServerNgShard>) -> MessageHandler {
+    let shard = Rc::clone(shard);
+    Rc::new(move |_replica_id, message| {
+        shard.dispatch(message);
+    })
+}
+
+fn make_client_request_handler(shard: &Rc<ServerNgShard>) -> RequestHandler {
+    let shard = Rc::clone(shard);
+    Rc::new(move |client_id, message| {
+        let request = match message.try_into_typed::<RequestHeader>() {
+            Ok(request) => request,
+            Err(error) => {
+                warn!(client_id, error = %error, "dropping client request with 
invalid header");
+                return;
+            }
+        };
+        let request = request.transmute_header(|header, new_header: &mut 
RequestHeader| {
+            *new_header = header;
+            new_header.client = client_id;
+        });
+        shard.dispatch(request.into_generic());
+    })
+}
+
+fn make_deferred_replica_message_handler(shard_handle: &ServerNgShardHandle) 
-> MessageHandler {
+    let shard_handle = Rc::clone(shard_handle);
+    Rc::new(move |_replica_id, message| {
+        if let Some(shard) = upgrade_shard_handle(&shard_handle) {
+            shard.dispatch(message);
+        }
+    })
+}
+
+fn make_deferred_client_request_handler(shard_handle: &ServerNgShardHandle) -> 
RequestHandler {
+    let shard_handle = Rc::clone(shard_handle);
+    Rc::new(move |client_id, message| {
+        let Some(shard) = upgrade_shard_handle(&shard_handle) else {
+            return;
+        };
+        let request = match message.try_into_typed::<RequestHeader>() {
+            Ok(request) => request,
+            Err(error) => {
+                warn!(client_id, error = %error, "dropping client request with 
invalid header");
+                return;
+            }
+        };
+        let request = request.transmute_header(|header, new_header: &mut 
RequestHeader| {
+            *new_header = header;
+            new_header.client = client_id;
+        });
+        shard.dispatch(request.into_generic());
+    })
+}
+
+fn upgrade_shard_handle(shard_handle: &ServerNgShardHandle) -> 
Option<Rc<ServerNgShard>> {
+    shard_handle
+        .borrow()
+        .as_ref()
+        .and_then(std::rc::Weak::upgrade)
+}
+
+fn make_local_replica_accept_fn(
+    bus: &Rc<IggyMessageBus>,
+    on_message: MessageHandler,
+) -> AcceptedReplicaFn {
+    let bus = Rc::clone(bus);
+    Rc::new(move |stream, peer_id| {
+        installer::install_replica_stream(&bus, peer_id, stream, 
on_message.clone());
+    })
+}
+
+fn make_local_client_accept_fn(
+    bus: &Rc<IggyMessageBus>,
+    on_request: RequestHandler,
+) -> AcceptedClientFn {
+    let bus = Rc::clone(bus);
+    let counter = Rc::new(Cell::new(1_u128));
+    let shard_id = u128::from(bus.shard_id());
+    Rc::new(move |stream| {
+        let seq = counter.get();
+        counter.set(seq.wrapping_add(1));
+        let client_id = (shard_id << 112) | seq;
+        installer::install_client_stream(&bus, client_id, stream, 
on_request.clone());
+    })
+}
+
+fn parse_socket_addr(context: &'static str, address: &str) -> 
Result<SocketAddr, ServerNgError> {
+    address
+        .parse()
+        .map_err(|source| ServerNgError::SocketAddressParse {
+            context,
+            address: address.to_string(),
+            source,
+        })
+}
+
+fn socket_addr_from_parts(
+    context: &'static str,
+    host: &str,
+    port: u16,
+) -> Result<SocketAddr, ServerNgError> {
+    let ip = host
+        .parse::<IpAddr>()
+        .map_err(|source| ServerNgError::SocketAddressParse {
+            context,
+            address: format!("{host}:{port}"),
+            source,
+        })?;
+    Ok(SocketAddr::new(ip, port))
+}
diff --git a/core/server-ng/src/config_writer.rs 
b/core/server-ng/src/config_writer.rs
new file mode 100644
index 000000000..9c05b9976
--- /dev/null
+++ b/core/server-ng/src/config_writer.rs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server_error::ServerNgError;
+use compio::fs::OpenOptions;
+use compio::io::AsyncWriteAtExt;
+use configs::server::ServerConfig;
+use std::net::SocketAddr;
+
+/// Write the runtime `current_config.toml` file with the effective bound 
ports.
+///
+/// # Errors
+///
+/// Returns an error if the config cannot be serialized or if the runtime
+/// config file cannot be written and synced.
+pub async fn write_current_config(
+    config: &ServerConfig,
+    current_replica_id: Option<u8>,
+    bound_tcp: Option<SocketAddr>,
+    bound_replica: Option<SocketAddr>,
+) -> Result<(), ServerNgError> {
+    let mut current_config = config.clone();
+
+    if let Some(bound_tcp) = bound_tcp {
+        // Keep parity with the current server binary: integration harnesses
+        // read `tcp.address` from `runtime/current_config.toml` to discover
+        // the actual port chosen by the OS when binding to port 0.
+        current_config.tcp.address = bound_tcp.to_string();
+    }
+
+    if current_config.cluster.enabled
+        && let Some(replica_id) = current_replica_id
+    {
+        let node = current_config
+            .cluster
+            .nodes
+            .iter_mut()
+            .find(|node| node.replica_id == replica_id)
+            .ok_or(ServerNgError::ClusterNodeNotFound { replica_id })?;
+        if let Some(bound_tcp) = bound_tcp {
+            node.ports.tcp = Some(bound_tcp.port());
+        }
+        if let Some(bound_replica) = bound_replica {
+            node.ports.tcp_replica = Some(bound_replica.port());
+        }
+    }
+
+    let runtime_path = current_config.system.get_runtime_path();
+    let config_path = format!("{runtime_path}/current_config.toml");
+    let content =
+        
toml::to_string(&current_config).map_err(ServerNgError::CurrentConfigSerialize)?;
+
+    let mut file = OpenOptions::new()
+        .write(true)
+        .create(true)
+        .truncate(true)
+        .open(&config_path)
+        .await
+        .map_err(|source| ServerNgError::CurrentConfigWrite {
+            path: config_path.clone(),
+            source,
+        })?;
+
+    file.write_all_at(content.into_bytes(), 0)
+        .await
+        .0
+        .map_err(|source| ServerNgError::CurrentConfigWrite {
+            path: config_path.clone(),
+            source,
+        })?;
+
+    file.sync_all()
+        .await
+        .map_err(|source| ServerNgError::CurrentConfigWrite {
+            path: config_path,
+            source,
+        })?;
+
+    Ok(())
+}
diff --git a/core/server-ng/src/lib.rs b/core/server-ng/src/lib.rs
index be74cdd6e..745ca8414 100644
--- a/core/server-ng/src/lib.rs
+++ b/core/server-ng/src/lib.rs
@@ -20,6 +20,7 @@
 #![allow(clippy::future_not_send)]
 
 pub mod bootstrap;
+pub mod config_writer;
 pub mod login_register;
 pub mod server_error;
 pub mod session_manager;
diff --git a/core/server-ng/src/main.rs b/core/server-ng/src/main.rs
index e7223ff72..169f32f65 100644
--- a/core/server-ng/src/main.rs
+++ b/core/server-ng/src/main.rs
@@ -19,6 +19,10 @@
 
 #![allow(clippy::future_not_send)]
 
+mod args;
+
+use args::Args;
+use clap::Parser;
 use server_ng::bootstrap::RunServerNg;
 use server_ng::server_error::ServerNgError;
 
@@ -47,6 +51,7 @@ fn main() -> Result<(), ServerNgError> {
         }
     };
     runtime.block_on(async {
+        let args = Args::parse();
         if let Ok(env_path) = std::env::var("IGGY_ENV_PATH") {
             let _ = dotenvy::from_path(&env_path);
         } else {
@@ -58,7 +63,10 @@ fn main() -> Result<(), ServerNgError> {
         let mut logging = server::log::logger::Logging::new();
         logging.early_init();
 
-        let shard = server_ng::bootstrap::bootstrap(&mut logging).await?;
-        shard.run().await
+        let config = server_ng::bootstrap::load_config(&mut logging).await?;
+        
iggy_common::MemoryPool::init_pool(&config.system.memory_pool.into_other());
+
+        let shard = server_ng::bootstrap::bootstrap(&config, 
args.replica_id).await?;
+        shard.run(&config, args.replica_id).await
     })
 }
diff --git a/core/server-ng/src/server_error.rs 
b/core/server-ng/src/server_error.rs
index 1a667fc4b..49dd7cc7e 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -28,10 +28,33 @@ pub enum ServerNgError {
     Config(#[source] configs::ConfigurationError),
     #[error("failed to prepare server-ng directories")]
     CreateDirectories(#[source] iggy_common::IggyError),
+    #[error("failed to serialize current server-ng config")]
+    CurrentConfigSerialize(#[source] toml::ser::Error),
+    #[error("failed to write current server-ng config at {path}")]
+    CurrentConfigWrite {
+        path: String,
+        #[source]
+        source: std::io::Error,
+    },
     #[error("failed to initialize server-ng logging")]
     Logging(#[source] LogError),
     #[error("failed to recover metadata snapshot and journal")]
     MetadataRecovery(#[source] RecoveryError),
+    #[error("failed to parse {context} socket address '{address}'")]
+    SocketAddressParse {
+        context: &'static str,
+        address: String,
+        #[source]
+        source: std::net::AddrParseError,
+    },
+    #[error("cluster enabled but no node is configured for replica 
{replica_id}")]
+    ClusterNodeNotFound { replica_id: u8 },
+    #[error("cluster node count {count} exceeds supported u8 replica count")]
+    ClusterReplicaCountTooLarge { count: usize },
+    #[error("cluster mode requires --replica-id to identify the current node")]
+    MissingReplicaId,
+    #[error("cluster node for replica {replica_id} is missing tcp_replica 
port")]
+    ClusterReplicaPortMissing { replica_id: u8 },
     #[error(
         "recovered namespace stream {stream_id}, topic {topic_id}, partition 
{partition_id} exceeds configured limits (max_streams={max_streams}, 
max_topics={max_topics}, max_partitions={max_partitions})"
     )]
@@ -53,6 +76,8 @@ pub enum ServerNgError {
         #[source]
         source: iggy_common::IggyError,
     },
+    #[error("failed to start server-ng TCP listeners")]
+    StartTcpListeners(#[source] iggy_common::IggyError),
     #[error(
         "failed to initialize messages writer for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
     )]
@@ -73,6 +98,16 @@ pub enum ServerNgError {
         #[source]
         source: iggy_common::IggyError,
     },
+    #[error(
+        "failed to load segment indexes for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
+    )]
+    SegmentIndexesLoad {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        #[source]
+        source: iggy_common::IggyError,
+    },
     #[error(
         "failed to create initial segment storage for stream {stream_id}, 
topic {topic_id}, partition {partition_id}"
     )]
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 95c1be665..78e28c465 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -259,12 +259,16 @@ where
                 Error = iggy_common::IggyError,
             > + StreamsFrontend,
     {
+        let mut loopback_buf = Vec::new();
         loop {
             futures::select! {
                 _ = stop.recv().fuse() => break,
                 frame = self.inbox.recv().fuse() => {
                     match frame {
-                        Ok(frame) => self.process_frame(frame).await,
+                        Ok(frame) => {
+                            self.process_frame(frame).await;
+                            self.process_loopback(&mut loopback_buf).await;
+                        }
                         Err(_) => break,
                     }
                 }
@@ -274,6 +278,7 @@ where
         // Drain remaining frames so in-flight requests get a response.
         while let Ok(frame) = self.inbox.try_recv() {
             self.process_frame(frame).await;
+            self.process_loopback(&mut loopback_buf).await;
         }
     }
 


Reply via email to