This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/replica_bootstrap by this push:
     new 59a9231cb slimmer error
59a9231cb is described below

commit 59a9231cb0361311ad863a1a2bf2a5e768eebe12
Author: numinex <[email protected]>
AuthorDate: Thu May 7 15:30:23 2026 +0200

    slimmer error
---
 core/server-ng/src/bootstrap.rs    | 177 ++++++++++++++++++++++++++-----------
 core/server-ng/src/server_error.rs |  62 ++-----------
 2 files changed, 133 insertions(+), 106 deletions(-)

diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 699108ce8..816a464f5 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -72,7 +72,7 @@ use std::path::{Path, PathBuf};
 use std::rc::{Rc, Weak};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
-use tracing::{info, warn};
+use tracing::{error, info, warn};
 
 const CLUSTER_ID: u128 = 1;
 const SHARD_ID: u16 = 0;
@@ -186,9 +186,14 @@ pub async fn load_config(logging: &mut Logging) -> 
Result<ServerNgConfig, Server
         .await
         .map_err(ServerNgError::Config)?;
     // TODO: decouple directory bootstrap from the `server` crate.
-    create_directories(&config.system)
-        .await
-        .map_err(ServerNgError::CreateDirectories)?;
+    create_directories(&config.system).await.map_err(|source| {
+        error!(
+            system_path = %config.system.get_system_path(),
+            error = %source,
+            "failed to prepare server-ng directories"
+        );
+        source
+    })?;
     logging
         .late_init(
             config.system.get_system_path(),
@@ -440,11 +445,15 @@ async fn load_partition(
         stats.clone(),
     )
     .await
-    .map_err(|source| ServerNgError::PartitionLogLoad {
-        stream_id,
-        topic_id,
-        partition_id,
-        source,
+    .map_err(|source| {
+        error!(
+            stream_id,
+            topic_id,
+            partition_id,
+            error = %source,
+            "failed to load partition log during server-ng bootstrap"
+        );
+        source
     })?;
 
     let mut partition = IggyPartition::new(stats.clone(), consensus);
@@ -534,11 +543,16 @@ async fn hydrate_partition_log(
                     true,
                 )
                 .await
-                .map_err(|source| ServerNgError::MessagesWriterInit {
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    source,
+                .map_err(|source| {
+                    error!(
+                        stream_id,
+                        topic_id,
+                        partition_id,
+                        path = %messages_reader.path(),
+                        error = %source,
+                        "failed to initialize persisted messages writer"
+                    );
+                    source
                 })?,
             ));
             partition.log.index_writers_mut()[active_index] = Some(Rc::new(
@@ -549,11 +563,16 @@ async fn hydrate_partition_log(
                     true,
                 )
                 .await
-                .map_err(|source| ServerNgError::IndexWriterInit {
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    source,
+                .map_err(|source| {
+                    error!(
+                        stream_id,
+                        topic_id,
+                        partition_id,
+                        path = %index_path,
+                        error = %source,
+                        "failed to initialize persisted sparse index writer"
+                    );
+                    source
                 })?,
             ));
         }
@@ -600,11 +619,15 @@ async fn load_segment_max_timestamp(
     let indexes = index_reader
         .load_all_indexes_from_disk()
         .await
-        .map_err(|source| ServerNgError::SegmentIndexesLoad {
-            stream_id,
-            topic_id,
-            partition_id,
-            source,
+        .map_err(|source| {
+            error!(
+                stream_id,
+                topic_id,
+                partition_id,
+                error = %source,
+                "failed to load segment indexes while recovering max timestamp"
+            );
+            source
         })?;
     Ok(indexes_max_timestamp(&indexes))
 }
@@ -760,11 +783,15 @@ async fn ensure_initial_segment(
     let storage =
         create_segment_storage(&config.system, stream_id, topic_id, 
partition_id, 0, 0, 0)
             .await
-            .map_err(|source| ServerNgError::InitialSegmentStorage {
-                stream_id,
-                topic_id,
-                partition_id,
-                source,
+            .map_err(|source| {
+                error!(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    error = %source,
+                    "failed to create initial segment storage"
+                );
+                source
             })?;
     let messages_path = config
         .system
@@ -783,11 +810,16 @@ async fn ensure_initial_segment(
                 false,
             )
             .await
-            .map_err(|source| ServerNgError::MessagesWriterInit {
-                stream_id,
-                topic_id,
-                partition_id,
-                source,
+            .map_err(|source| {
+                error!(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    path = %messages_path,
+                    error = %source,
+                    "failed to initialize initial messages writer"
+                );
+                source
             })?,
         )),
         Some(Rc::new(
@@ -798,11 +830,16 @@ async fn ensure_initial_segment(
                 false,
             )
             .await
-            .map_err(|source| ServerNgError::IndexWriterInit {
-                stream_id,
-                topic_id,
-                partition_id,
-                source,
+            .map_err(|source| {
+                error!(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    path = %index_path,
+                    error = %source,
+                    "failed to initialize initial sparse index writer"
+                );
+                source
             })?,
         )),
     );
@@ -1033,7 +1070,15 @@ async fn start_via_replica_io(
         shard.bus.config().reconnect_period,
     )
     .await
-    .map_err(ServerNgError::StartListeners)?;
+    .map_err(|source| {
+        error!(
+            replica_addr = %replica_addr,
+            client_addr = %topology.client_listen_addr,
+            error = %source,
+            "failed to start server-ng listeners via replica_io"
+        );
+        source
+    })?;
     let Some(bound) = bound else {
         return Ok(());
     };
@@ -1083,9 +1128,17 @@ async fn start_manual_runtime(
         let replica_addr = topology
             .replica_listen_addr
             .expect("cluster-enabled topology must include replica listener 
address");
-        let (replica_listener, bound_addr) = 
replica_listener::bind(replica_addr)
-            .await
-            .map_err(ServerNgError::StartListeners)?;
+        let (replica_listener, bound_addr) =
+            replica_listener::bind(replica_addr)
+                .await
+                .map_err(|source| {
+                    error!(
+                        replica_addr = %replica_addr,
+                        error = %source,
+                        "failed to bind replica listener"
+                    );
+                    source
+                })?;
         let token = shard.bus.token();
         let max_message_size = shard.bus.config().max_message_size;
         let handshake_grace = shard.bus.config().handshake_grace;
@@ -1361,7 +1414,14 @@ async fn start_client_listeners(
     if config.tcp.enabled && !config.tcp.tls.enabled {
         let (listener, bound_addr) = 
client_listener::tcp::bind(topology.client_listen_addr)
             .await
-            .map_err(ServerNgError::StartListeners)?;
+            .map_err(|source| {
+                error!(
+                    addr = %topology.client_listen_addr,
+                    error = %source,
+                    "failed to bind TCP client listener"
+                );
+                source
+            })?;
         let token = shard.bus.token();
         let accepted_client = accepted_clients.tcp.clone();
         let client_handle = compio::runtime::spawn(async move {
@@ -1372,9 +1432,11 @@ async fn start_client_listeners(
     }
 
     if let Some(ws_addr) = topology.ws_listen_addr {
-        let (listener, bound_addr) = client_listener::ws::bind(ws_addr)
-            .await
-            .map_err(ServerNgError::StartListeners)?;
+        let (listener, bound_addr) =
+            client_listener::ws::bind(ws_addr).await.map_err(|source| {
+                error!(addr = %ws_addr, error = %source, "failed to bind 
websocket listener");
+                source
+            })?;
         let token = shard.bus.token();
         let accepted_ws = accepted_clients.ws.clone();
         let ws_handle = compio::runtime::spawn(async move {
@@ -1393,13 +1455,17 @@ async fn start_client_listeners(
             &shard.bus.config().quic,
         )
         .map_err(|e| {
-            
ServerNgError::StartListeners(iggy_common::IggyError::IoError(format!(
-                "QUIC server config build failed: {e}"
-            )))
+            let source =
+                iggy_common::IggyError::IoError(format!("QUIC server config 
build failed: {e}"));
+            error!(addr = %quic_addr, error = %source, "failed to build QUIC 
server config");
+            source
         })?;
         let (endpoint, bound_addr) = client_listener::quic::bind(quic_addr, 
server_config)
             .await
-            .map_err(ServerNgError::StartListeners)?;
+            .map_err(|source| {
+                error!(addr = %quic_addr, error = %source, "failed to bind 
QUIC listener");
+                source
+            })?;
         let token = shard.bus.token();
         let handshake_grace = shard.bus.config().handshake_grace;
         let accepted_quic = accepted_clients.quic.clone();
@@ -1415,7 +1481,14 @@ async fn start_client_listeners(
         let (listener, tls_config, bound_addr) =
             client_listener::tcp_tls::bind(topology.client_listen_addr, 
credentials)
                 .await
-                .map_err(ServerNgError::StartListeners)?;
+                .map_err(|source| {
+                    error!(
+                        addr = %topology.client_listen_addr,
+                        error = %source,
+                        "failed to bind TCP TLS listener"
+                    );
+                    source
+                })?;
         let token = shard.bus.token();
         let accepted_tls = accepted_clients.tcp_tls.clone();
         let tls_handle = compio::runtime::spawn(async move {
diff --git a/core/server-ng/src/server_error.rs 
b/core/server-ng/src/server_error.rs
index 954ce26e9..77e17ee1c 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -24,10 +24,10 @@ use thiserror::Error;
 
 #[derive(Debug, Error)]
 pub enum ServerNgError {
+    #[error(transparent)]
+    Iggy(Box<iggy_common::IggyError>),
     #[error("failed to load server-ng config")]
     Config(#[source] configs::ConfigurationError),
-    #[error("failed to prepare server-ng directories")]
-    CreateDirectories(#[source] iggy_common::IggyError),
     #[error("failed to serialize current server-ng config")]
     CurrentConfigSerialize(#[source] toml::ser::Error),
     #[error("failed to write current server-ng config at {path}")]
@@ -90,62 +90,16 @@ pub enum ServerNgError {
         max_topics: usize,
         max_partitions: usize,
     },
-    #[error(
-        "failed to load partition log for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
-    )]
-    PartitionLogLoad {
-        stream_id: usize,
-        topic_id: usize,
-        partition_id: usize,
-        #[source]
-        source: iggy_common::IggyError,
-    },
     #[error("failed to load {transport} listener credentials")]
     ListenerCredentials {
         transport: &'static str,
         #[source]
         source: std::io::Error,
     },
-    #[error("failed to start server-ng listeners")]
-    StartListeners(#[source] iggy_common::IggyError),
-    #[error(
-        "failed to initialize messages writer for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
-    )]
-    MessagesWriterInit {
-        stream_id: usize,
-        topic_id: usize,
-        partition_id: usize,
-        #[source]
-        source: iggy_common::IggyError,
-    },
-    #[error(
-        "failed to initialize index writer for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
-    )]
-    IndexWriterInit {
-        stream_id: usize,
-        topic_id: usize,
-        partition_id: usize,
-        #[source]
-        source: iggy_common::IggyError,
-    },
-    #[error(
-        "failed to load segment indexes for stream {stream_id}, topic 
{topic_id}, partition {partition_id}"
-    )]
-    SegmentIndexesLoad {
-        stream_id: usize,
-        topic_id: usize,
-        partition_id: usize,
-        #[source]
-        source: iggy_common::IggyError,
-    },
-    #[error(
-        "failed to create initial segment storage for stream {stream_id}, 
topic {topic_id}, partition {partition_id}"
-    )]
-    InitialSegmentStorage {
-        stream_id: usize,
-        topic_id: usize,
-        partition_id: usize,
-        #[source]
-        source: iggy_common::IggyError,
-    },
+}
+
+impl From<iggy_common::IggyError> for ServerNgError {
+    fn from(source: iggy_common::IggyError) -> Self {
+        Self::Iggy(Box::new(source))
+    }
 }

Reply via email to