numinnex commented on code in PR #3163:
URL: https://github.com/apache/iggy/pull/3163#discussion_r3201679816


##########
core/server-ng/src/bootstrap.rs:
##########
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config_writer::write_current_config;
+use crate::server_error::ServerNgError;
+use configs::server_ng::ServerNgConfig;
+use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
+use iggy_binary_protocol::RequestHeader;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, 
TopicStats,
+    sharding::LocalIdx, variadic,
+};
+use journal::Journal;
+use journal::prepare_journal::PrepareJournal;
+use message_bus::client_listener::{self, RequestHandler};
+use message_bus::fd_transfer;
+use message_bus::installer;
+use message_bus::installer::ConnectionInstaller;
+use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind};
+use message_bus::replica::io as replica_io;
+use message_bus::replica::listener::{self as replica_listener, MessageHandler};
+use message_bus::transports::quic::server_config_with_cert;
+use message_bus::transports::tls::{
+    TlsServerCredentials, install_default_crypto_provider, load_pem, 
self_signed_for_loopback,
+};
+use message_bus::{
+    AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
+    AcceptedWsClientFn, IggyMessageBus, connector,
+};
+use metadata::IggyMetadata;
+use metadata::MuxStateMachine;
+use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
+use metadata::impls::recovery::recover;
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::snapshot::Snapshot;
+use metadata::stm::stream::{Partition, Streams};
+use metadata::stm::user::Users;
+use partitions::{
+    IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
+};
+// TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use server::bootstrap::create_directories;
+use server::log::logger::Logging;
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use server::streaming::segments::storage::create_segment_storage;
+use shard::builder::IggyShardBuilder;
+use shard::shards_table::PapayaShardsTable;
+use shard::{
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+};
+use std::cell::{Cell, RefCell};
+use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
+use std::path::{Path, PathBuf};
+use std::rc::{Rc, Weak};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
+use tracing::{info, warn};
+
+const CLUSTER_ID: u128 = 1;
+const SHARD_ID: u16 = 0;
+const SHARD_REPLICA_ID: u8 = 0;
+const SHARD_NAME: &str = "server-ng-shard-0";
+const SHARD_INBOX_CAPACITY: usize = 1024;
+
+type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+type ServerNgMetadata = IggyMetadata<
+    VsrConsensus<Rc<IggyMessageBus>>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+>;
+type ServerNgShard = IggyShard<
+    Rc<IggyMessageBus>,
+    PrepareJournal,
+    IggySnapshot,
+    ServerNgMuxStateMachine,
+    PapayaShardsTable,
+>;
+
+type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>;
+
+struct TcpTopology {
+    self_replica_id: u8,
+    replica_count: u8,
+    client_listen_addr: SocketAddr,
+    replica_listen_addr: Option<SocketAddr>,
+    ws_listen_addr: Option<SocketAddr>,
+    quic_listen_addr: Option<SocketAddr>,
+    tcp_tls_listen_addr: Option<SocketAddr>,
+    peers: Vec<(u8, SocketAddr)>,
+}
+
+struct LocalClientAcceptFns {
+    tcp: AcceptedClientFn,
+    ws: AcceptedWsClientFn,
+    quic: AcceptedQuicClientFn,
+    tcp_tls: AcceptedTlsClientFn,
+}
+
+#[derive(Default)]
+struct BoundClientListeners {
+    tcp: Option<SocketAddr>,
+    tcp_tls: Option<SocketAddr>,
+    ws: Option<SocketAddr>,
+    quic: Option<SocketAddr>,
+}
+
+pub trait RunServerNg {
+    fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> impl Future<Output = Result<(), ServerNgError>>;
+}
+
+impl RunServerNg for Rc<ServerNgShard> {
+    /// Run the fully bootstrapped `server-ng` shard.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if TCP listener bootstrap fails or cluster TCP
+    /// addresses cannot be resolved from config.
+    async fn run(
+        &self,
+        config: &ServerNgConfig,
+        current_replica_id: Option<u8>,
+    ) -> Result<(), ServerNgError> {
+        let topology = resolve_tcp_topology(config, current_replica_id)?;
+        let (stop_tx, stop_rx) = channel(1);
+        let message_pump_shard = Self::clone(self);
+        let message_pump_handle = compio::runtime::spawn(async move {
+            message_pump_shard.run_message_pump(stop_rx).await;
+        });
+        self.bus.track_background(message_pump_handle);
+
+        let on_replica_message = make_replica_message_handler(self);
+        let on_client_request = make_client_request_handler(self);
+        let accepted_replica = make_local_replica_accept_fn(&self.bus, 
on_replica_message);
+        let accepted_client = make_local_client_accept_fns(&self.bus, 
on_client_request);
+
+        info!(
+            shard = self.id,
+            partitions = self.plane.partitions().len(),
+            "server-ng shard initialized"
+        );
+
+        if let Err(error) =
+            start_tcp_runtime(self, config, &topology, accepted_replica, 
accepted_client).await
+        {
+            let _ = stop_tx.try_send(());
+            return Err(error);
+        }
+
+        self.bus.token().wait().await;
+        let _ = stop_tx.try_send(());
+        Ok(())
+    }
+}
+
+/// Load config, prepare directories, and complete late logging init.
+///
+/// # Errors
+///
+/// Returns an error if config loading, directory preparation, or logging
+/// setup fails.
+pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, 
ServerNgError> {
+    let config = ServerNgConfig::load()
+        .await
+        .map_err(ServerNgError::Config)?;
+    // TODO: decouple directory bootstrap from the `server` crate.
+    create_directories(&config.system)
+        .await
+        .map_err(ServerNgError::CreateDirectories)?;
+    logging
+        .late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )
+        .map_err(ServerNgError::Logging)?;
+
+    Ok(config)
+}
+
+/// Bootstraps `server-ng` from config and on-disk metadata/partition state.
+///
+/// # Errors
+///
+/// Returns an error if metadata recovery, consensus restoration, or
+/// partition hydration fails.
+pub async fn bootstrap(
+    config: &ServerNgConfig,
+    current_replica_id: Option<u8>,
+) -> Result<Rc<ServerNgShard>, ServerNgError> {
+    let topology = resolve_tcp_topology(config, current_replica_id)?;
+    let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config));
+    let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
+        .await
+        .map_err(ServerNgError::MetadataRecovery)?;
+    let restored_op = recovered.last_applied_op.unwrap_or_else(|| {

Review Comment:
   We don't store the `commit` ranges yet persistently, I've done this with 
assumption that the cluster for now on bootstrap will be starting either with 
empty state, or where all replicas in the cluster are in exactly the same state 
on the shutdown/reboot moment (no state transfer support). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to