hubcio commented on code in PR #3163:
URL: https://github.com/apache/iggy/pull/3163#discussion_r3194622918


##########
core/server-ng/src/bootstrap.rs:
##########
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config_writer::write_current_config;
+use crate::server_error::ServerNgError;
+use configs::server_ng::ServerNgConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::fd_transfer;
+use message_bus::installer;
+use message_bus::installer::ConnectionInstaller;
+use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind};
+use message_bus::replica::io as replica_io;
+use message_bus::replica::listener::{self as replica_listener, MessageHandler};
+use message_bus::transports::quic::server_config_with_cert;
+use message_bus::transports::tls::{
+    TlsServerCredentials, install_default_crypto_provider, load_pem, 
self_signed_for_loopback,
+};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
+    AcceptedWsClientFn, IggyMessageBus, connector,
+};
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
+use shard::shards_table::PapayaShardsTable;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
+use std::path::{Path, PathBuf};
+use std::rc::{Rc, Weak};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const SHARD_INBOX_CAPACITY: usize = 1024;
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<Rc<IggyMessageBus>>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    Rc<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    tcp_tls_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
+struct LocalClientAcceptFns {
+    tcp: AcceptedClientFn,
+    ws: AcceptedWsClientFn,
+    quic: AcceptedQuicClientFn,
+    tcp_tls: AcceptedTlsClientFn,
+}
+
+#[derive(Default)]
+struct BoundClientListeners {
+    tcp: Option<SocketAddr>,
+    tcp_tls: Option<SocketAddr>,
+    ws: Option<SocketAddr>,
+    quic: Option<SocketAddr>,
+}
+
+pub trait RunServerNg {
+    fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fns(&self.bus, 
on_client_request);
+
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
+        Ok(())
+    }
+}
+
+/// Load config, prepare directories, and complete late logging init.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, 
ServerNgError> {
+    let config = ServerNgConfig::load()
+        .await
+        .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    Ok(config)
+}
+
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerNgConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config));
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered.last_applied_op.unwrap_or_else(|| {
+        recovered
+            .snapshot
+            .as_ref()
+            .map_or(0, IggySnapshot::sequence_number)
+    });
+
+    let metadata = ServerNgMetadata::new(
+        Some(restore_metadata_consensus(
+            &recovered.journal,
+            restored_op,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )),
+        Some(recovered.journal),
+        recovered.snapshot,
+        recovered.mux_stm,
+        Some(PathBuf::from(&config.system.path)),
+    );
+    let shard = build_single_shard(config, &topology, metadata, bus).await?;
+    info!(shard = shard.id, "server-ng bootstrap complete");
+
+    Ok(shard)
+}
+
+fn restore_metadata_consensus(
+    journal: &PrepareJournal,
+    restored_op: u64,
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> VsrConsensus<Rc<IggyMessageBus>> {
+    let mut consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        self_replica_id,
+        replica_count,
+        0,
+        bus,
+        LocalPipeline::new(),
+    );
+
+    let last_header = journal
+        .last_op()
+        .and_then(|op| usize::try_from(op).ok())
+        .and_then(|op| journal.header(op).map(|header| *header));
+    if let Some(header) = last_header {
+        consensus.set_view(header.view);
+    }
+
+    consensus.init();
+    consensus.sequencer().set_sequence(restored_op);
+    consensus.restore_commit_state(restored_op, restored_op);
+    if let Some(header) = last_header {
+        consensus.set_last_prepare_checksum(header.checksum);
+        consensus.set_log_view(header.view);
+    }
+
+    consensus
+}
+
+async fn build_single_shard(
+    config: &ServerNgConfig,
+    topology: &TcpTopology,
+    metadata: ServerNgMetadata,
+    bus: Rc<IggyMessageBus>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let shard_id = ShardId::new(SHARD_ID);
+    let partition_count = metadata.mux_stm.streams().read(|inner| {
+        inner
+            .items
+            .iter()
+            .map(|(_, stream)| {
+                stream
+                    .topics
+                    .iter()
+                    .map(|(_, topic)| topic.partitions.len())
+                    .sum::<usize>()
+            })
+            .sum()
+    });
+    let mut partitions = IggyPartitions::with_capacity(
+        shard_id,
+        PartitionsConfig {
+            messages_required_to_save: 
config.system.partition.messages_required_to_save,
+            size_of_messages_required_to_save: config
+                .system
+                .partition
+                .size_of_messages_required_to_save,
+            enforce_fsync: config.system.partition.enforce_fsync,
+            segment_size: config.system.segment.size,
+        },
+        partition_count,
+    );
+    let shards_table = PapayaShardsTable::with_capacity(partition_count);
+
+    let (topic_stats, namespaces) = metadata.mux_stm.streams().read(|inner| {
+        let mut topic_stats = Vec::new();
+        let mut namespaces = Vec::with_capacity(partition_count);
+        for (_, stream) in &inner.items {
+            for (topic_id, topic) in &stream.topics {
+                topic_stats.push(topic.stats.clone());
+                for partition in &topic.partitions {
+                    namespaces.push((stream.id, topic_id, topic.stats.clone(), 
partition.clone()));
+                }
+            }
+        }
+        (topic_stats, namespaces)
+    });
+
+    for topic_stats in topic_stats {
+        topic_stats.zero_out_all();
+    }
+
+    for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces {
+        validate_recovered_namespace(config, stream_id, topic_id, 
partition_metadata.id)?;
+        let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_metadata.id);
+        let partition = load_partition(
+            config,
+            namespace,
+            topic_stats,
+            &partition_metadata,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )
+        .await?;
+        let local_idx = partitions.insert(namespace, partition);
+        shards_table.insert(
+            namespace,
+            PartitionLocation::new(shard_id, LocalIdx::new(*local_idx)),
+        );
+    }
+
+    let (sender, inbox) = shard_channel::<()>(SHARD_ID, SHARD_INBOX_CAPACITY);
+    let senders = vec![sender];
+    let shard_handle = Rc::new(RefCell::new(None));
+    let on_replica_message = 
make_deferred_replica_message_handler(&shard_handle);
+    let on_client_request = 
make_deferred_client_request_handler(&shard_handle);
+    let built = IggyShardBuilder::new(
+        ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()),
+        Rc::clone(&bus),
+        on_replica_message,
+        on_client_request,
+        metadata,
+        partitions,
+        senders,
+        inbox,
+        shards_table,
+        PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, 
Rc::clone(&bus)),
+        CoordinatorConfig::default(),
+        bus.token(),
+    )
+    .build();
+    if let Some(refresh_task) = built.refresh_task {
+        bus.track_background(refresh_task);
+    }
+
+    let shard = Rc::new(built.shard);
+    *shard_handle.borrow_mut() = Some(Rc::downgrade(&shard));
+    Ok(shard)
+}
+
+const fn validate_recovered_namespace(
+    config: &ServerNgConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+) -> Result<(), ServerNgError> {
+    let namespace = &config.extra.namespace;
+    if stream_id < namespace.max_streams
+        && topic_id < namespace.max_topics
+        && partition_id < namespace.max_partitions
+    {
+        return Ok(());
+    }
+
+    Err(ServerNgError::RecoveredNamespaceOutOfBounds {
+        stream_id,
+        topic_id,
+        partition_id,
+        max_streams: namespace.max_streams,
+        max_topics: namespace.max_topics,
+        max_partitions: namespace.max_partitions,
+    })
+}
+
+async fn load_partition(
+    config: &ServerNgConfig,
+    namespace: IggyNamespace,
+    topic_stats: Arc<TopicStats>,
+    partition_metadata: &Partition,
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> Result<IggyPartition<Rc<IggyMessageBus>>, ServerNgError> {
+    let stream_id = namespace.stream_id();
+    let topic_id = namespace.topic_id();
+    let partition_id = namespace.partition_id();
+    let stats = Arc::new(PartitionStats::new(topic_stats));
+    let consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        self_replica_id,
+        replica_count,
+        namespace.inner(),
+        bus,
+        LocalPipeline::new(),
+    );
+    consensus.init();
+
+    // TODO: decouple the loading logic from the `server` crate and load 
directly
+    // into the new `partitions` log/runtime types.
+    let loaded_log = server::bootstrap::load_segments(
+        &config.system,
+        stream_id,
+        topic_id,
+        partition_id,
+        config
+            .system
+            .get_partition_path(stream_id, topic_id, partition_id),
+        stats.clone(),
+    )
+    .await
+    .map_err(|source| ServerNgError::PartitionLogLoad {
+        stream_id,
+        topic_id,
+        partition_id,
+        source,
+    })?;
+
+    let mut partition = IggyPartition::new(stats.clone(), consensus);
+    hydrate_partition_log(
+        &mut partition,
+        config,
+        stream_id,
+        topic_id,
+        partition_id,
+        loaded_log,
+    )
+    .await?;
+
+    let current_offset = partition
+        .log
+        .segments()
+        .iter()
+        .filter(|segment| segment.size > IggyByteSize::default())
+        .map(|segment| segment.end_offset)
+        .max()
+        .unwrap_or(0);
+    partition.created_at = partition_metadata.created_at;
+    partition.offset.store(current_offset, Ordering::Release);
+    partition
+        .dirty_offset
+        .store(current_offset, Ordering::Relaxed);
+    partition.should_increment_offset = partition
+        .log
+        .segments()
+        .iter()
+        .any(|segment| segment.size > IggyByteSize::default());
+    partition.stats.set_current_offset(current_offset);
+
+    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset)?;
+    ensure_initial_segment(&mut partition, config, stream_id, topic_id, 
partition_id).await?;
+
+    Ok(partition)
+}
+
+async fn hydrate_partition_log(
+    partition: &mut IggyPartition<Rc<IggyMessageBus>>,
+    config: &ServerNgConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    loaded_log: server::streaming::partitions::log::SegmentedLog<
+        server::streaming::partitions::journal::MemoryMessageJournal,
+    >,
+) -> Result<(), ServerNgError> {
+    // TODO: decouple the loading logic from the `server` crate. This currently
+    // adapts the old server segmented log into the new `partitions` log.
+    for (segment_index, (segment, storage)) in loaded_log
+        .segments()
+        .iter()
+        .zip(loaded_log.storages().iter().cloned())
+        .enumerate()
+    {
+        validate_recovered_segment(
+            stream_id,
+            topic_id,
+            partition_id,
+            segment,
+            &storage,
+            loaded_log
+                .indexes()
+                .get(segment_index)
+                .and_then(|indexes| indexes.as_ref()),
+        )?;
+        let max_timestamp = match loaded_log
+            .indexes()
+            .get(segment_index)
+            .and_then(|indexes| indexes.as_ref())
+        {
+            Some(indexes) => indexes_max_timestamp(indexes),
+            None => load_segment_max_timestamp(&storage, stream_id, topic_id, 
partition_id).await?,
+        };
+        partition.log.add_persisted_segment(
+            convert_segment(segment, max_timestamp),
+            storage,
+            None,
+            None,
+        );
+    }
+
+    if let Some(active_index) = partition.log.segments().len().checked_sub(1) {
+        let storage = &partition.log.storages()[active_index];
+        if let (Some(messages_reader), Some(index_reader)) = (
+            storage.messages_reader.as_ref(),
+            storage.index_reader.as_ref(),
+        ) {
+            let index_path = index_reader.path();
+            let index_size = std::fs::metadata(&index_path).map_or(0, 
|metadata| metadata.len());
+            partition.log.messages_writers_mut()[active_index] = Some(Rc::new(
+                MessagesWriter::new(
+                    &messages_reader.path(),
+                    
Rc::new(AtomicU64::new(u64::from(messages_reader.file_size()))),
+                    config.system.partition.enforce_fsync,
+                    true,
+                )
+                .await
+                .map_err(|source| ServerNgError::MessagesWriterInit {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    source,
+                })?,
+            ));
+            partition.log.index_writers_mut()[active_index] = Some(Rc::new(
+                IggyIndexWriter::new(
+                    &index_path,
+                    Rc::new(AtomicU64::new(index_size)),
+                    config.system.partition.enforce_fsync,
+                    true,
+                )
+                .await
+                .map_err(|source| ServerNgError::IndexWriterInit {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    source,
+                })?,
+            ));
+        }
+    }
+
+    Ok(())
+}
+
+fn validate_recovered_segment(
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    segment: &iggy_common::Segment,
+    storage: &iggy_common::SegmentStorage,
+    indexes: Option<&server::streaming::segments::IggyIndexesMut>,
+) -> Result<(), ServerNgError> {
+    let messages_size_bytes = storage
+        .messages_reader
+        .as_ref()
+        .map_or(0, |reader| u64::from(reader.file_size()));
+    let indexed_size_bytes = indexes.map_or(0, |indexes| 
u64::from(indexes.messages_size()));
+    if messages_size_bytes == indexed_size_bytes {

Review Comment:
   `messages_size_bytes == indexed_size_bytes` is the wrong invariant. 
`iggy_partition.rs:1228-1244` writes one sparse-index entry per 
`commit_messages` call (gated by `flush_index.is_none()`), recording 
start-of-batch position. messages file accumulates all bytes per batch. after 
any clean shutdown of multi-batch server-ng-written data, `messages_size_bytes 
> indexed_size_bytes` strictly, so this validator returns 
`RecoveredSegmentSizeDivergence` on every restart.
   
   note: legacy `core/server/src/shard/system/messages.rs:597,665` writes one 
index per message, so old-format data still passes, but new server-ng data does 
not. recovery cannot validate its own output.
   
   fix options: drop the equality check, compute expected size from index + 
last batch length, or record file size in segment metadata.



##########
core/server-ng/src/bootstrap.rs:
##########
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config_writer::write_current_config;
+use crate::server_error::ServerNgError;
+use configs::server_ng::ServerNgConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::fd_transfer;
+use message_bus::installer;
+use message_bus::installer::ConnectionInstaller;
+use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind};
+use message_bus::replica::io as replica_io;
+use message_bus::replica::listener::{self as replica_listener, MessageHandler};
+use message_bus::transports::quic::server_config_with_cert;
+use message_bus::transports::tls::{
+    TlsServerCredentials, install_default_crypto_provider, load_pem, 
self_signed_for_loopback,
+};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
+    AcceptedWsClientFn, IggyMessageBus, connector,
+};
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
+use shard::shards_table::PapayaShardsTable;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
+use std::path::{Path, PathBuf};
+use std::rc::{Rc, Weak};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const SHARD_INBOX_CAPACITY: usize = 1024;
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<Rc<IggyMessageBus>>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    Rc<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    tcp_tls_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
+struct LocalClientAcceptFns {
+    tcp: AcceptedClientFn,
+    ws: AcceptedWsClientFn,
+    quic: AcceptedQuicClientFn,
+    tcp_tls: AcceptedTlsClientFn,
+}
+
+#[derive(Default)]
+struct BoundClientListeners {
+    tcp: Option<SocketAddr>,
+    tcp_tls: Option<SocketAddr>,
+    ws: Option<SocketAddr>,
+    quic: Option<SocketAddr>,
+}
+
+pub trait RunServerNg {
+    fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fns(&self.bus, 
on_client_request);
+
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
+        Ok(())
+    }
+}
+
+/// Load config, prepare directories, and complete late logging init.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, 
ServerNgError> {
+    let config = ServerNgConfig::load()
+        .await
+        .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    Ok(config)
+}
+
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerNgConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config));
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered.last_applied_op.unwrap_or_else(|| {
+        recovered
+            .snapshot
+            .as_ref()
+            .map_or(0, IggySnapshot::sequence_number)
+    });
+
+    let metadata = ServerNgMetadata::new(
+        Some(restore_metadata_consensus(
+            &recovered.journal,
+            restored_op,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )),
+        Some(recovered.journal),
+        recovered.snapshot,
+        recovered.mux_stm,
+        Some(PathBuf::from(&config.system.path)),
+    );
+    let shard = build_single_shard(config, &topology, metadata, bus).await?;
+    info!(shard = shard.id, "server-ng bootstrap complete");
+
+    Ok(shard)
+}
+
+fn restore_metadata_consensus(
+    journal: &PrepareJournal,
+    restored_op: u64,
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> VsrConsensus<Rc<IggyMessageBus>> {
+    let mut consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        self_replica_id,
+        replica_count,
+        0,
+        bus,
+        LocalPipeline::new(),
+    );
+
+    let last_header = journal
+        .last_op()
+        .and_then(|op| usize::try_from(op).ok())
+        .and_then(|op| journal.header(op).map(|header| *header));
+    if let Some(header) = last_header {
+        consensus.set_view(header.view);
+    }
+
+    consensus.init();
+    consensus.sequencer().set_sequence(restored_op);
+    consensus.restore_commit_state(restored_op, restored_op);
+    if let Some(header) = last_header {
+        consensus.set_last_prepare_checksum(header.checksum);
+        consensus.set_log_view(header.view);
+    }
+
+    consensus
+}
+
+async fn build_single_shard(
+    config: &ServerNgConfig,
+    topology: &TcpTopology,
+    metadata: ServerNgMetadata,
+    bus: Rc<IggyMessageBus>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let shard_id = ShardId::new(SHARD_ID);
+    let partition_count = metadata.mux_stm.streams().read(|inner| {
+        inner
+            .items
+            .iter()
+            .map(|(_, stream)| {
+                stream
+                    .topics
+                    .iter()
+                    .map(|(_, topic)| topic.partitions.len())
+                    .sum::<usize>()
+            })
+            .sum()
+    });
+    let mut partitions = IggyPartitions::with_capacity(
+        shard_id,
+        PartitionsConfig {
+            messages_required_to_save: 
config.system.partition.messages_required_to_save,
+            size_of_messages_required_to_save: config
+                .system
+                .partition
+                .size_of_messages_required_to_save,
+            enforce_fsync: config.system.partition.enforce_fsync,
+            segment_size: config.system.segment.size,
+        },
+        partition_count,
+    );
+    let shards_table = PapayaShardsTable::with_capacity(partition_count);
+
+    let (topic_stats, namespaces) = metadata.mux_stm.streams().read(|inner| {
+        let mut topic_stats = Vec::new();
+        let mut namespaces = Vec::with_capacity(partition_count);
+        for (_, stream) in &inner.items {
+            for (topic_id, topic) in &stream.topics {
+                topic_stats.push(topic.stats.clone());
+                for partition in &topic.partitions {
+                    namespaces.push((stream.id, topic_id, topic.stats.clone(), 
partition.clone()));
+                }
+            }
+        }
+        (topic_stats, namespaces)
+    });
+
+    for topic_stats in topic_stats {
+        topic_stats.zero_out_all();
+    }
+
+    for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces {
+        validate_recovered_namespace(config, stream_id, topic_id, 
partition_metadata.id)?;
+        let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_metadata.id);
+        let partition = load_partition(
+            config,
+            namespace,
+            topic_stats,
+            &partition_metadata,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )
+        .await?;
+        let local_idx = partitions.insert(namespace, partition);
+        shards_table.insert(
+            namespace,
+            PartitionLocation::new(shard_id, LocalIdx::new(*local_idx)),
+        );
+    }
+
+    let (sender, inbox) = shard_channel::<()>(SHARD_ID, SHARD_INBOX_CAPACITY);
+    let senders = vec![sender];
+    let shard_handle = Rc::new(RefCell::new(None));
+    let on_replica_message = 
make_deferred_replica_message_handler(&shard_handle);
+    let on_client_request = 
make_deferred_client_request_handler(&shard_handle);
+    let built = IggyShardBuilder::new(
+        ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()),
+        Rc::clone(&bus),
+        on_replica_message,
+        on_client_request,
+        metadata,
+        partitions,
+        senders,
+        inbox,
+        shards_table,
+        PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, 
Rc::clone(&bus)),
+        CoordinatorConfig::default(),
+        bus.token(),
+    )
+    .build();
+    if let Some(refresh_task) = built.refresh_task {
+        bus.track_background(refresh_task);
+    }
+
+    let shard = Rc::new(built.shard);
+    *shard_handle.borrow_mut() = Some(Rc::downgrade(&shard));
+    Ok(shard)
+}
+
+const fn validate_recovered_namespace(
+    config: &ServerNgConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+) -> Result<(), ServerNgError> {
+    let namespace = &config.extra.namespace;
+    if stream_id < namespace.max_streams
+        && topic_id < namespace.max_topics
+        && partition_id < namespace.max_partitions
+    {
+        return Ok(());
+    }
+
+    Err(ServerNgError::RecoveredNamespaceOutOfBounds {
+        stream_id,
+        topic_id,
+        partition_id,
+        max_streams: namespace.max_streams,
+        max_topics: namespace.max_topics,
+        max_partitions: namespace.max_partitions,
+    })
+}
+
+async fn load_partition(
+    config: &ServerNgConfig,
+    namespace: IggyNamespace,
+    topic_stats: Arc<TopicStats>,
+    partition_metadata: &Partition,
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> Result<IggyPartition<Rc<IggyMessageBus>>, ServerNgError> {
+    let stream_id = namespace.stream_id();
+    let topic_id = namespace.topic_id();
+    let partition_id = namespace.partition_id();
+    let stats = Arc::new(PartitionStats::new(topic_stats));
+    let consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        self_replica_id,
+        replica_count,
+        namespace.inner(),
+        bus,
+        LocalPipeline::new(),
+    );
+    consensus.init();
+
+    // TODO: decouple the loading logic from the `server` crate and load 
directly
+    // into the new `partitions` log/runtime types.
+    let loaded_log = server::bootstrap::load_segments(
+        &config.system,
+        stream_id,
+        topic_id,
+        partition_id,
+        config
+            .system
+            .get_partition_path(stream_id, topic_id, partition_id),
+        stats.clone(),
+    )
+    .await
+    .map_err(|source| ServerNgError::PartitionLogLoad {
+        stream_id,
+        topic_id,
+        partition_id,
+        source,
+    })?;
+
+    let mut partition = IggyPartition::new(stats.clone(), consensus);
+    hydrate_partition_log(
+        &mut partition,
+        config,
+        stream_id,
+        topic_id,
+        partition_id,
+        loaded_log,
+    )
+    .await?;
+
+    let current_offset = partition
+        .log
+        .segments()
+        .iter()
+        .filter(|segment| segment.size > IggyByteSize::default())
+        .map(|segment| segment.end_offset)
+        .max()
+        .unwrap_or(0);
+    partition.created_at = partition_metadata.created_at;
+    partition.offset.store(current_offset, Ordering::Release);
+    partition
+        .dirty_offset
+        .store(current_offset, Ordering::Relaxed);
+    partition.should_increment_offset = partition
+        .log
+        .segments()
+        .iter()
+        .any(|segment| segment.size > IggyByteSize::default());
+    partition.stats.set_current_offset(current_offset);
+
+    configure_consumer_offsets(&mut partition, config, namespace, 
current_offset)?;
+    ensure_initial_segment(&mut partition, config, stream_id, topic_id, 
partition_id).await?;
+
+    Ok(partition)
+}
+
+async fn hydrate_partition_log(
+    partition: &mut IggyPartition<Rc<IggyMessageBus>>,
+    config: &ServerNgConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    loaded_log: server::streaming::partitions::log::SegmentedLog<
+        server::streaming::partitions::journal::MemoryMessageJournal,
+    >,
+) -> Result<(), ServerNgError> {
+    // TODO: decouple the loading logic from the `server` crate. This currently
+    // adapts the old server segmented log into the new `partitions` log.
+    for (segment_index, (segment, storage)) in loaded_log
+        .segments()
+        .iter()
+        .zip(loaded_log.storages().iter().cloned())
+        .enumerate()
+    {
+        validate_recovered_segment(
+            stream_id,
+            topic_id,
+            partition_id,
+            segment,
+            &storage,
+            loaded_log
+                .indexes()
+                .get(segment_index)
+                .and_then(|indexes| indexes.as_ref()),
+        )?;
+        let max_timestamp = match loaded_log
+            .indexes()
+            .get(segment_index)
+            .and_then(|indexes| indexes.as_ref())
+        {
+            Some(indexes) => indexes_max_timestamp(indexes),
+            None => load_segment_max_timestamp(&storage, stream_id, topic_id, 
partition_id).await?,
+        };
+        partition.log.add_persisted_segment(
+            convert_segment(segment, max_timestamp),
+            storage,
+            None,
+            None,
+        );
+    }
+
+    if let Some(active_index) = partition.log.segments().len().checked_sub(1) {
+        let storage = &partition.log.storages()[active_index];
+        if let (Some(messages_reader), Some(index_reader)) = (
+            storage.messages_reader.as_ref(),
+            storage.index_reader.as_ref(),
+        ) {
+            let index_path = index_reader.path();
+            let index_size = std::fs::metadata(&index_path).map_or(0, 
|metadata| metadata.len());
+            partition.log.messages_writers_mut()[active_index] = Some(Rc::new(
+                MessagesWriter::new(
+                    &messages_reader.path(),
+                    
Rc::new(AtomicU64::new(u64::from(messages_reader.file_size()))),
+                    config.system.partition.enforce_fsync,
+                    true,
+                )
+                .await
+                .map_err(|source| ServerNgError::MessagesWriterInit {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    source,
+                })?,
+            ));
+            partition.log.index_writers_mut()[active_index] = Some(Rc::new(
+                IggyIndexWriter::new(
+                    &index_path,
+                    Rc::new(AtomicU64::new(index_size)),
+                    config.system.partition.enforce_fsync,
+                    true,
+                )
+                .await
+                .map_err(|source| ServerNgError::IndexWriterInit {
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    source,
+                })?,
+            ));
+        }
+    }
+
+    Ok(())
+}
+
+fn validate_recovered_segment(
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    segment: &iggy_common::Segment,
+    storage: &iggy_common::SegmentStorage,
+    indexes: Option<&server::streaming::segments::IggyIndexesMut>,
+) -> Result<(), ServerNgError> {
+    let messages_size_bytes = storage
+        .messages_reader
+        .as_ref()
+        .map_or(0, |reader| u64::from(reader.file_size()));
+    let indexed_size_bytes = indexes.map_or(0, |indexes| 
u64::from(indexes.messages_size()));
+    if messages_size_bytes == indexed_size_bytes {
+        return Ok(());
+    }
+
+    Err(ServerNgError::RecoveredSegmentSizeDivergence {
+        stream_id,
+        topic_id,
+        partition_id,
+        start_offset: segment.start_offset,
+        end_offset: segment.end_offset,
+        messages_size_bytes,
+        indexed_size_bytes,
+    })
+}
+
+fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> 
Segment {
+    Segment {
+        sealed: segment.sealed,
+        start_timestamp: segment.start_timestamp,
+        end_timestamp: segment.end_timestamp,
+        max_timestamp,
+        current_position: u64::from(segment.current_position),
+        start_offset: segment.start_offset,
+        end_offset: segment.end_offset,
+        size: segment.size,
+        max_size: segment.max_size,
+    }
+}
+
+fn indexes_max_timestamp(indexes: 
&server::streaming::segments::IggyIndexesMut) -> u64 {
+    let mut max_timestamp = 0;
+    for index in 0..indexes.count() {
+        if let Some(index_view) = indexes.get(index) {
+            max_timestamp = max_timestamp.max(index_view.timestamp());
+        }
+    }
+
+    max_timestamp
+}
+
+async fn load_segment_max_timestamp(
+    storage: &iggy_common::SegmentStorage,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+) -> Result<u64, ServerNgError> {
+    let Some(index_reader) = storage.index_reader.as_ref() else {
+        return Ok(0);
+    };
+
+    let indexes = index_reader
+        .load_all_indexes_from_disk()
+        .await
+        .map_err(|source| ServerNgError::SegmentIndexesLoad {
+            stream_id,
+            topic_id,
+            partition_id,
+            source,
+        })?;
+    Ok(indexes_max_timestamp(&indexes))
+}
+
+fn configure_consumer_offsets(
+    partition: &mut IggyPartition<Rc<IggyMessageBus>>,
+    config: &ServerNgConfig,
+    namespace: IggyNamespace,
+    current_offset: u64,
+) -> Result<(), ServerNgError> {
+    let stream_id = namespace.stream_id();
+    let topic_id = namespace.topic_id();
+    let partition_id = namespace.partition_id();
+    let consumer_offsets_path =
+        config
+            .system
+            .get_consumer_offsets_path(stream_id, topic_id, partition_id);
+    let consumer_group_offsets_path =
+        config
+            .system
+            .get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id);
+
+    // TODO: decouple consumer offset loading from the `server` crate.
+    let loaded_consumer_offsets = 
load_consumer_offsets(&consumer_offsets_path).unwrap_or_default();

Review Comment:
   also at line 691.
   
   `load_consumer_offsets` (and the group variant) at 
`core/server/src/streaming/partitions/storage.rs:165-168,200,208,227-230` 
returns `IggyError::CannotReadConsumerOffsets` / `CannotReadFile` for any read 
failure: ENOENT (legitimate first-boot empty), EACCES, EBUSY, EIO. all collapse 
into `Vec::default()` here. a transient FS failure during boot silently resets 
consumer offsets, so consumers re-read messages already consumed.
   
   fix: propagate via a new `ServerNgError::ConsumerOffsetsLoad` variant. 
distinguish "directory missing" (legitimate empty on first boot) from "load 
failed" if needed.



##########
core/server-ng/src/bootstrap.rs:
##########
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config_writer::write_current_config;
+use crate::server_error::ServerNgError;
+use configs::server_ng::ServerNgConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::fd_transfer;
+use message_bus::installer;
+use message_bus::installer::ConnectionInstaller;
+use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind};
+use message_bus::replica::io as replica_io;
+use message_bus::replica::listener::{self as replica_listener, MessageHandler};
+use message_bus::transports::quic::server_config_with_cert;
+use message_bus::transports::tls::{
+    TlsServerCredentials, install_default_crypto_provider, load_pem, 
self_signed_for_loopback,
+};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
+    AcceptedWsClientFn, IggyMessageBus, connector,
+};
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
+use shard::shards_table::PapayaShardsTable;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
+use std::path::{Path, PathBuf};
+use std::rc::{Rc, Weak};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const SHARD_INBOX_CAPACITY: usize = 1024;
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<Rc<IggyMessageBus>>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    Rc<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    tcp_tls_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
+struct LocalClientAcceptFns {
+    tcp: AcceptedClientFn,
+    ws: AcceptedWsClientFn,
+    quic: AcceptedQuicClientFn,
+    tcp_tls: AcceptedTlsClientFn,
+}
+
+#[derive(Default)]
+struct BoundClientListeners {
+    tcp: Option<SocketAddr>,
+    tcp_tls: Option<SocketAddr>,
+    ws: Option<SocketAddr>,
+    quic: Option<SocketAddr>,
+}
+
+pub trait RunServerNg {
+    fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fns(&self.bus, 
on_client_request);
+
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
+        Ok(())
+    }
+}
+
+/// Load config, prepare directories, and complete late logging init.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, 
ServerNgError> {
+    let config = ServerNgConfig::load()
+        .await
+        .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    Ok(config)
+}
+
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerNgConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config));
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered.last_applied_op.unwrap_or_else(|| {
+        recovered
+            .snapshot
+            .as_ref()
+            .map_or(0, IggySnapshot::sequence_number)
+    });
+
+    let metadata = ServerNgMetadata::new(
+        Some(restore_metadata_consensus(
+            &recovered.journal,
+            restored_op,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )),
+        Some(recovered.journal),
+        recovered.snapshot,
+        recovered.mux_stm,
+        Some(PathBuf::from(&config.system.path)),
+    );
+    let shard = build_single_shard(config, &topology, metadata, bus).await?;
+    info!(shard = shard.id, "server-ng bootstrap complete");
+
+    Ok(shard)
+}
+
+fn restore_metadata_consensus(
+    journal: &PrepareJournal,
+    restored_op: u64,
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> VsrConsensus<Rc<IggyMessageBus>> {
+    let mut consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        self_replica_id,
+        replica_count,
+        0,
+        bus,
+        LocalPipeline::new(),
+    );
+
+    let last_header = journal
+        .last_op()
+        .and_then(|op| usize::try_from(op).ok())
+        .and_then(|op| journal.header(op).map(|header| *header));
+    if let Some(header) = last_header {

Review Comment:
   `set_log_view(header.view)` uses the view in which the primary appended this 
entry locally. VSR invariant per the doc-comment at `impls.rs:506-513`: 
`log_view` is the view in which this replica's log head was quorum-confirmed 
(won DVC or processed SV from new primary). setting it to a write-time view 
over-inflates.
   
   `view_change_quorum.rs:76-84` `dvc_select_winner` picks max `(log_view, 
op)`. a backup with locally-appended-but-uncommitted ops at view V will beat 
replicas with the genuinely-quorum-confirmed lower log_view, then 
`complete_view_change_as_primary` at `impls.rs:1546-1601` forces peers to 
install the contaminated head via SendStartView.
   
   single-node never runs winner selection. multi-replica latent landmine.
   
   fix: persist `log_view_durable` in a superblock; advance only on quorum 
SV/DVC win.



##########
core/common/src/sharding/namespace.rs:
##########
@@ -49,6 +49,47 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1;
 pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
 pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum NamespaceCapacityError {

Review Comment:
   I think we should use derive `Error` from `thiserror` crate.



##########
core/server-ng/src/bootstrap.rs:
##########
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config_writer::write_current_config;
+use crate::server_error::ServerNgError;
+use configs::server_ng::ServerNgConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::fd_transfer;
+use message_bus::installer;
+use message_bus::installer::ConnectionInstaller;
+use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind};
+use message_bus::replica::io as replica_io;
+use message_bus::replica::listener::{self as replica_listener, MessageHandler};
+use message_bus::transports::quic::server_config_with_cert;
+use message_bus::transports::tls::{
+    TlsServerCredentials, install_default_crypto_provider, load_pem, 
self_signed_for_loopback,
+};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
+    AcceptedWsClientFn, IggyMessageBus, connector,
+};
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
+use shard::shards_table::PapayaShardsTable;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
+use std::path::{Path, PathBuf};
+use std::rc::{Rc, Weak};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const SHARD_INBOX_CAPACITY: usize = 1024;
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<Rc<IggyMessageBus>>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    Rc<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    tcp_tls_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
+struct LocalClientAcceptFns {
+    tcp: AcceptedClientFn,
+    ws: AcceptedWsClientFn,
+    quic: AcceptedQuicClientFn,
+    tcp_tls: AcceptedTlsClientFn,
+}
+
+#[derive(Default)]
+struct BoundClientListeners {
+    tcp: Option<SocketAddr>,
+    tcp_tls: Option<SocketAddr>,
+    ws: Option<SocketAddr>,
+    quic: Option<SocketAddr>,
+}
+
+pub trait RunServerNg {
+    fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fns(&self.bus, 
on_client_request);
+
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
+        Ok(())
+    }
+}
+
+/// Load config, prepare directories, and complete late logging init.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, 
ServerNgError> {
+    let config = ServerNgConfig::load()
+        .await
+        .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    Ok(config)
+}
+
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerNgConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config));
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered.last_applied_op.unwrap_or_else(|| {

Review Comment:
   `recovery.rs:139-158` walks the WAL and applies every replayed entry, 
setting `last_applied_op` to journal tail M. bootstrap then calls 
`restore_commit_state(M, M)` declaring `commit_min == commit_max == M`. 
`impls.rs:1110-1130, 1218, 1230` propagate `commit_min` via primary heartbeat 
and DVC; `view_change_quorum.rs:88-95` `dvc_max_commit` propagates further into 
peer state via `complete_view_change_as_primary`.
   
   WAL entries past the last quorum-confirmed op are NOT committed; bootstrap 
declares them so. multi-replica safety violation: a replica with 
uncommitted-but-replayed ops claims they're committed.
   
   single-node trivially correct (quorum=1). latent landmine the moment cluster 
ships.
   
   fix: persist a separate commit watermark (superblock) and only restore 
commit_min up to that watermark. replay-as-applied does not mean 
replay-as-committed.



##########
core/server-ng/src/bootstrap.rs:
##########
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config_writer::write_current_config;
+use crate::server_error::ServerNgError;
+use configs::server_ng::ServerNgConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::fd_transfer;
+use message_bus::installer;
+use message_bus::installer::ConnectionInstaller;
+use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind};
+use message_bus::replica::io as replica_io;
+use message_bus::replica::listener::{self as replica_listener, MessageHandler};
+use message_bus::transports::quic::server_config_with_cert;
+use message_bus::transports::tls::{
+    TlsServerCredentials, install_default_crypto_provider, load_pem, 
self_signed_for_loopback,
+};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
+    AcceptedWsClientFn, IggyMessageBus, connector,
+};
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
+use shard::shards_table::PapayaShardsTable;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
+use std::path::{Path, PathBuf};
+use std::rc::{Rc, Weak};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const SHARD_INBOX_CAPACITY: usize = 1024;
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<Rc<IggyMessageBus>>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    Rc<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    tcp_tls_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
+struct LocalClientAcceptFns {
+    tcp: AcceptedClientFn,
+    ws: AcceptedWsClientFn,
+    quic: AcceptedQuicClientFn,
+    tcp_tls: AcceptedTlsClientFn,
+}
+
+#[derive(Default)]
+struct BoundClientListeners {
+    tcp: Option<SocketAddr>,
+    tcp_tls: Option<SocketAddr>,
+    ws: Option<SocketAddr>,
+    quic: Option<SocketAddr>,
+}
+
+pub trait RunServerNg {
+    fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fns(&self.bus, 
on_client_request);
+
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
+        Ok(())
+    }
+}
+
+/// Load config, prepare directories, and complete late logging init.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, 
ServerNgError> {
+    let config = ServerNgConfig::load()
+        .await
+        .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    Ok(config)
+}
+
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerNgConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config));
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered.last_applied_op.unwrap_or_else(|| {
+        recovered
+            .snapshot
+            .as_ref()
+            .map_or(0, IggySnapshot::sequence_number)
+    });
+
+    let metadata = ServerNgMetadata::new(
+        Some(restore_metadata_consensus(
+            &recovered.journal,
+            restored_op,
+            topology.self_replica_id,
+            topology.replica_count,
+            Rc::clone(&bus),
+        )),
+        Some(recovered.journal),
+        recovered.snapshot,
+        recovered.mux_stm,
+        Some(PathBuf::from(&config.system.path)),
+    );
+    let shard = build_single_shard(config, &topology, metadata, bus).await?;
+    info!(shard = shard.id, "server-ng bootstrap complete");
+
+    Ok(shard)
+}
+
+fn restore_metadata_consensus(
+    journal: &PrepareJournal,
+    restored_op: u64,
+    self_replica_id: u8,
+    replica_count: u8,
+    bus: Rc<IggyMessageBus>,
+) -> VsrConsensus<Rc<IggyMessageBus>> {
+    let mut consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        self_replica_id,
+        replica_count,
+        0,
+        bus,
+        LocalPipeline::new(),
+    );
+
+    let last_header = journal

Review Comment:
   `last_header` is Some only if `journal.last_op()` AND `journal.header(op)` 
both Some. after `journal.drain(0..=N)` (used during snapshot+truncate at 
`prepare_journal.rs:399-405`), slots are cleared but the `last_op` cell is NOT 
updated by drain. so `journal.last_op() = Some(N)` while `journal.header(N) = 
None`, `last_header = None` here, `last_prepare_checksum=0`, `log_view=0`. 
sequencer correctly set to N. next prepare's `parent=0`.
   
   single-node: silent, no peer to detect. on-disk invariant "WAL chains 
continuously across snapshot boundary" broken, matters for offline tooling and 
any future joining peer.
   
   fix: persist `last_prepare_checksum` + `log_view` in the snapshot itself, 
plumb through `restore_metadata_consensus`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to