This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/replica_bootstrap by this push:
new 59a9231cb slimmer error
59a9231cb is described below
commit 59a9231cb0361311ad863a1a2bf2a5e768eebe12
Author: numinex <[email protected]>
AuthorDate: Thu May 7 15:30:23 2026 +0200
slimmer error
---
core/server-ng/src/bootstrap.rs | 177 ++++++++++++++++++++++++++-----------
core/server-ng/src/server_error.rs | 62 ++-----------
2 files changed, 133 insertions(+), 106 deletions(-)
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 699108ce8..816a464f5 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -72,7 +72,7 @@ use std::path::{Path, PathBuf};
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
-use tracing::{info, warn};
+use tracing::{error, info, warn};
const CLUSTER_ID: u128 = 1;
const SHARD_ID: u16 = 0;
@@ -186,9 +186,14 @@ pub async fn load_config(logging: &mut Logging) ->
Result<ServerNgConfig, Server
.await
.map_err(ServerNgError::Config)?;
// TODO: decouple directory bootstrap from the `server` crate.
- create_directories(&config.system)
- .await
- .map_err(ServerNgError::CreateDirectories)?;
+ create_directories(&config.system).await.map_err(|source| {
+ error!(
+ system_path = %config.system.get_system_path(),
+ error = %source,
+ "failed to prepare server-ng directories"
+ );
+ source
+ })?;
logging
.late_init(
config.system.get_system_path(),
@@ -440,11 +445,15 @@ async fn load_partition(
stats.clone(),
)
.await
- .map_err(|source| ServerNgError::PartitionLogLoad {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ error = %source,
+ "failed to load partition log during server-ng bootstrap"
+ );
+ source
})?;
let mut partition = IggyPartition::new(stats.clone(), consensus);
@@ -534,11 +543,16 @@ async fn hydrate_partition_log(
true,
)
.await
- .map_err(|source| ServerNgError::MessagesWriterInit {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ path = %messages_reader.path(),
+ error = %source,
+ "failed to initialize persisted messages writer"
+ );
+ source
})?,
));
partition.log.index_writers_mut()[active_index] = Some(Rc::new(
@@ -549,11 +563,16 @@ async fn hydrate_partition_log(
true,
)
.await
- .map_err(|source| ServerNgError::IndexWriterInit {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ path = %index_path,
+ error = %source,
+ "failed to initialize persisted sparse index writer"
+ );
+ source
})?,
));
}
@@ -600,11 +619,15 @@ async fn load_segment_max_timestamp(
let indexes = index_reader
.load_all_indexes_from_disk()
.await
- .map_err(|source| ServerNgError::SegmentIndexesLoad {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ error = %source,
+ "failed to load segment indexes while recovering max timestamp"
+ );
+ source
})?;
Ok(indexes_max_timestamp(&indexes))
}
@@ -760,11 +783,15 @@ async fn ensure_initial_segment(
let storage =
create_segment_storage(&config.system, stream_id, topic_id,
partition_id, 0, 0, 0)
.await
- .map_err(|source| ServerNgError::InitialSegmentStorage {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ error = %source,
+ "failed to create initial segment storage"
+ );
+ source
})?;
let messages_path = config
.system
@@ -783,11 +810,16 @@ async fn ensure_initial_segment(
false,
)
.await
- .map_err(|source| ServerNgError::MessagesWriterInit {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ path = %messages_path,
+ error = %source,
+ "failed to initialize initial messages writer"
+ );
+ source
})?,
)),
Some(Rc::new(
@@ -798,11 +830,16 @@ async fn ensure_initial_segment(
false,
)
.await
- .map_err(|source| ServerNgError::IndexWriterInit {
- stream_id,
- topic_id,
- partition_id,
- source,
+ .map_err(|source| {
+ error!(
+ stream_id,
+ topic_id,
+ partition_id,
+ path = %index_path,
+ error = %source,
+ "failed to initialize initial sparse index writer"
+ );
+ source
})?,
)),
);
@@ -1033,7 +1070,15 @@ async fn start_via_replica_io(
shard.bus.config().reconnect_period,
)
.await
- .map_err(ServerNgError::StartListeners)?;
+ .map_err(|source| {
+ error!(
+ replica_addr = %replica_addr,
+ client_addr = %topology.client_listen_addr,
+ error = %source,
+ "failed to start server-ng listeners via replica_io"
+ );
+ source
+ })?;
let Some(bound) = bound else {
return Ok(());
};
@@ -1083,9 +1128,17 @@ async fn start_manual_runtime(
let replica_addr = topology
.replica_listen_addr
.expect("cluster-enabled topology must include replica listener
address");
- let (replica_listener, bound_addr) =
replica_listener::bind(replica_addr)
- .await
- .map_err(ServerNgError::StartListeners)?;
+ let (replica_listener, bound_addr) =
+ replica_listener::bind(replica_addr)
+ .await
+ .map_err(|source| {
+ error!(
+ replica_addr = %replica_addr,
+ error = %source,
+ "failed to bind replica listener"
+ );
+ source
+ })?;
let token = shard.bus.token();
let max_message_size = shard.bus.config().max_message_size;
let handshake_grace = shard.bus.config().handshake_grace;
@@ -1361,7 +1414,14 @@ async fn start_client_listeners(
if config.tcp.enabled && !config.tcp.tls.enabled {
let (listener, bound_addr) =
client_listener::tcp::bind(topology.client_listen_addr)
.await
- .map_err(ServerNgError::StartListeners)?;
+ .map_err(|source| {
+ error!(
+ addr = %topology.client_listen_addr,
+ error = %source,
+ "failed to bind TCP client listener"
+ );
+ source
+ })?;
let token = shard.bus.token();
let accepted_client = accepted_clients.tcp.clone();
let client_handle = compio::runtime::spawn(async move {
@@ -1372,9 +1432,11 @@ async fn start_client_listeners(
}
if let Some(ws_addr) = topology.ws_listen_addr {
- let (listener, bound_addr) = client_listener::ws::bind(ws_addr)
- .await
- .map_err(ServerNgError::StartListeners)?;
+ let (listener, bound_addr) =
+ client_listener::ws::bind(ws_addr).await.map_err(|source| {
+ error!(addr = %ws_addr, error = %source, "failed to bind
websocket listener");
+ source
+ })?;
let token = shard.bus.token();
let accepted_ws = accepted_clients.ws.clone();
let ws_handle = compio::runtime::spawn(async move {
@@ -1393,13 +1455,17 @@ async fn start_client_listeners(
&shard.bus.config().quic,
)
.map_err(|e| {
-
ServerNgError::StartListeners(iggy_common::IggyError::IoError(format!(
- "QUIC server config build failed: {e}"
- )))
+ let source =
+ iggy_common::IggyError::IoError(format!("QUIC server config
build failed: {e}"));
+ error!(addr = %quic_addr, error = %source, "failed to build QUIC
server config");
+ source
})?;
let (endpoint, bound_addr) = client_listener::quic::bind(quic_addr,
server_config)
.await
- .map_err(ServerNgError::StartListeners)?;
+ .map_err(|source| {
+ error!(addr = %quic_addr, error = %source, "failed to bind
QUIC listener");
+ source
+ })?;
let token = shard.bus.token();
let handshake_grace = shard.bus.config().handshake_grace;
let accepted_quic = accepted_clients.quic.clone();
@@ -1415,7 +1481,14 @@ async fn start_client_listeners(
let (listener, tls_config, bound_addr) =
client_listener::tcp_tls::bind(topology.client_listen_addr,
credentials)
.await
- .map_err(ServerNgError::StartListeners)?;
+ .map_err(|source| {
+ error!(
+ addr = %topology.client_listen_addr,
+ error = %source,
+ "failed to bind TCP TLS listener"
+ );
+ source
+ })?;
let token = shard.bus.token();
let accepted_tls = accepted_clients.tcp_tls.clone();
let tls_handle = compio::runtime::spawn(async move {
diff --git a/core/server-ng/src/server_error.rs
b/core/server-ng/src/server_error.rs
index 954ce26e9..77e17ee1c 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -24,10 +24,10 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum ServerNgError {
+ #[error(transparent)]
+ Iggy(Box<iggy_common::IggyError>),
#[error("failed to load server-ng config")]
Config(#[source] configs::ConfigurationError),
- #[error("failed to prepare server-ng directories")]
- CreateDirectories(#[source] iggy_common::IggyError),
#[error("failed to serialize current server-ng config")]
CurrentConfigSerialize(#[source] toml::ser::Error),
#[error("failed to write current server-ng config at {path}")]
@@ -90,62 +90,16 @@ pub enum ServerNgError {
max_topics: usize,
max_partitions: usize,
},
- #[error(
- "failed to load partition log for stream {stream_id}, topic
{topic_id}, partition {partition_id}"
- )]
- PartitionLogLoad {
- stream_id: usize,
- topic_id: usize,
- partition_id: usize,
- #[source]
- source: iggy_common::IggyError,
- },
#[error("failed to load {transport} listener credentials")]
ListenerCredentials {
transport: &'static str,
#[source]
source: std::io::Error,
},
- #[error("failed to start server-ng listeners")]
- StartListeners(#[source] iggy_common::IggyError),
- #[error(
- "failed to initialize messages writer for stream {stream_id}, topic
{topic_id}, partition {partition_id}"
- )]
- MessagesWriterInit {
- stream_id: usize,
- topic_id: usize,
- partition_id: usize,
- #[source]
- source: iggy_common::IggyError,
- },
- #[error(
- "failed to initialize index writer for stream {stream_id}, topic
{topic_id}, partition {partition_id}"
- )]
- IndexWriterInit {
- stream_id: usize,
- topic_id: usize,
- partition_id: usize,
- #[source]
- source: iggy_common::IggyError,
- },
- #[error(
- "failed to load segment indexes for stream {stream_id}, topic
{topic_id}, partition {partition_id}"
- )]
- SegmentIndexesLoad {
- stream_id: usize,
- topic_id: usize,
- partition_id: usize,
- #[source]
- source: iggy_common::IggyError,
- },
- #[error(
- "failed to create initial segment storage for stream {stream_id},
topic {topic_id}, partition {partition_id}"
- )]
- InitialSegmentStorage {
- stream_id: usize,
- topic_id: usize,
- partition_id: usize,
- #[source]
- source: iggy_common::IggyError,
- },
+}
+
+impl From<iggy_common::IggyError> for ServerNgError {
+ fn from(source: iggy_common::IggyError) -> Self {
+ Self::Iggy(Box::new(source))
+ }
}