This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new cca0e04e7 feat(configs): add ServerNgConfig schema with message_bus
section (#3189)
cca0e04e7 is described below
commit cca0e04e7e2aa17b3abc32228ae8cef8cccfcca8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Apr 28 14:02:05 2026 +0200
feat(configs): add ServerNgConfig schema with message_bus section (#3189)
Introduces a new on-disk config type ServerNgConfig parallel to the
legacy ServerConfig, mirroring its full section surface (consumer_group,
data_maintenance, message_saver, personal_access_token, heartbeat,
system, quic, tcp, http, websocket, telemetry, cluster) and adding a
new [message_bus] section that captures the runtime tunables previously
hardcoded in core::message_bus::config::MessageBusConfig.
Section types are reused verbatim from server_config::* (operator-facing
schema is identical between server and server-ng); only the new
MessageBusConfig and the top-level composer are net-new code.
The type defines a loader following ServerConfig::load()'s shape
(FileConfigProvider + TypedEnvProvider, IGGY_ env prefix,
include_str!(server-ng/config.toml) embedded default) and a Validatable
impl that asserts the runtime invariants (max_batch <= IOV_MAX_LIMIT,
peer_queue_capacity > 0, listen-addr parse-ability) without depending
on core/message_bus.
Scaffolding only. No binary calls ServerNgConfig::load(); the wiring
PR for core/server-ng's bootstrap and the message_bus crate's runtime
type is intentionally a separate change.
core/server-ng/config.toml gains a [message_bus] block with defaults
matching MessageBusConfig::default() in the runtime crate.
---
core/configs/src/lib.rs | 2 +
core/configs/src/server_ng_config/defaults.rs | 94 +++++++++
core/configs/src/server_ng_config/displays.rs | 72 +++++++
core/configs/src/server_ng_config/message_bus.rs | 257 +++++++++++++++++++++++
core/configs/src/server_ng_config/mod.rs | 40 ++++
core/configs/src/server_ng_config/server_ng.rs | 200 ++++++++++++++++++
core/configs/src/server_ng_config/validators.rs | 130 ++++++++++++
core/server-ng/config.toml | 40 ++++
8 files changed, 835 insertions(+)
diff --git a/core/configs/src/lib.rs b/core/configs/src/lib.rs
index 419c77b8a..3b61c99fb 100644
--- a/core/configs/src/lib.rs
+++ b/core/configs/src/lib.rs
@@ -21,6 +21,7 @@ extern crate self as configs;
mod configs_impl;
mod server_config;
+mod server_ng_config;
pub use configs_derive::ConfigEnv;
pub use configs_impl::{
ConfigEnvMappings, ConfigProvider, ConfigurationError, ConfigurationType,
EnvVarMapping,
@@ -30,3 +31,4 @@ pub use server_config::{
COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server,
sharding, system,
tcp, validators, websocket,
};
+pub use server_ng_config::{COMPONENT_NG, message_bus, server_ng};
diff --git a/core/configs/src/server_ng_config/defaults.rs
b/core/configs/src/server_ng_config/defaults.rs
new file mode 100644
index 000000000..9218d0828
--- /dev/null
+++ b/core/configs/src/server_ng_config/defaults.rs
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! `Default` impls for the server-ng config surface.
+//!
+//! Section sub-types are reused verbatim from the legacy server config;
+//! their existing `Default` impls (declared in [`crate::defaults`]) read
+//! from `core/server/config.toml` via `static_toml!`. This file
+//! delegates to those impls for every reused section, so drift between
+//! the two TOMLs is intentionally absorbed at the `server-ng` consumer
+//! level (the wiring PR will read `core/server-ng/config.toml`
+//! end-to-end through [`super::server_ng::ServerNgConfig::load`] and
+//! overrides will take effect there).
+//!
+//! Only [`MessageBusConfig`] needs its own `Default` because no legacy
+//! section maps to it. The defaults are hardcoded to match
+//! `core::message_bus::config::MessageBusConfig::default()` byte-for-byte.
+
+use super::message_bus::MessageBusConfig;
+use super::server_ng::ServerNgConfig;
+use crate::server_config::cluster::ClusterConfig;
+use crate::server_config::http::HttpConfig;
+use crate::server_config::quic::QuicConfig;
+use crate::server_config::server::{
+ ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig,
MessageSaverConfig,
+ PersonalAccessTokenConfig, TelemetryConfig,
+};
+use crate::server_config::system::SystemConfig;
+use crate::server_config::tcp::TcpConfig;
+use crate::server_config::websocket::WebSocketConfig;
+use iggy_common::IggyByteSize;
+use std::sync::Arc;
+
+impl Default for ServerNgConfig {
+ fn default() -> ServerNgConfig {
+ ServerNgConfig {
+ consumer_group: ConsumerGroupConfig::default(),
+ data_maintenance: DataMaintenanceConfig::default(),
+ heartbeat: HeartbeatConfig::default(),
+ message_saver: MessageSaverConfig::default(),
+ personal_access_token: PersonalAccessTokenConfig::default(),
+ system: Arc::new(SystemConfig::default()),
+ quic: QuicConfig::default(),
+ tcp: TcpConfig::default(),
+ websocket: WebSocketConfig::default(),
+ http: HttpConfig::default(),
+ telemetry: TelemetryConfig::default(),
+ cluster: ClusterConfig::default(),
+ message_bus: MessageBusConfig::default(),
+ }
+ }
+}
+
+impl Default for MessageBusConfig {
+ fn default() -> MessageBusConfig {
+ // Keep these literals in lock-step with
+ // `core::message_bus::config::MessageBusConfig::default()` and
+ // with the `[message_bus]` block in
+ // `core/server-ng/config.toml`. The unit test
+ // `message_bus::tests::default_validates` proves the values
+ // survive `Validatable::validate`; a future PR may wire a
+ // round-trip test against the embedded TOML once
+ // `ServerNgConfig::load` is exercised end-to-end.
+ MessageBusConfig {
+ max_batch: 256,
+ max_message_size: IggyByteSize::from(64_u64 * 1024 * 1024),
+ peer_queue_capacity: 256,
+ reconnect_period: "5 s".parse().expect("'5 s' parses as
IggyDuration"),
+ keepalive_idle: "10 s".parse().expect("'10 s' parses as
IggyDuration"),
+ keepalive_interval: "5 s".parse().expect("'5 s' parses as
IggyDuration"),
+ keepalive_retries: 3,
+ close_peer_timeout: "2 s".parse().expect("'2 s' parses as
IggyDuration"),
+ close_grace: "2 s".parse().expect("'2 s' parses as IggyDuration"),
+ tcp_tls_listen_addr: None,
+ wss_listen_addr: None,
+ }
+ }
+}
diff --git a/core/configs/src/server_ng_config/displays.rs
b/core/configs/src/server_ng_config/displays.rs
new file mode 100644
index 000000000..1c286aac3
--- /dev/null
+++ b/core/configs/src/server_ng_config/displays.rs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! `Display` impls for the server-ng config surface.
+//!
+//! Reused section types pick up [`Display`] from
+//! [`crate::displays`]; this module only adds the top-level
+//! [`ServerNgConfig`] formatter and the new [`MessageBusConfig`]
+//! section formatter.
+
+use super::message_bus::MessageBusConfig;
+use super::server_ng::ServerNgConfig;
+use std::fmt::{Display, Formatter};
+
+impl Display for ServerNgConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ consumer_group: {}, data_maintenance: {}, message_saver: {},
heartbeat: {}, \
+ system: {}, quic: {}, tcp: {}, http: {}, telemetry: {},
message_bus: {} }}",
+ self.consumer_group,
+ self.data_maintenance,
+ self.message_saver,
+ self.heartbeat,
+ self.system,
+ self.quic,
+ self.tcp,
+ self.http,
+ self.telemetry,
+ self.message_bus,
+ )
+ }
+}
+
+impl Display for MessageBusConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ max_batch: {}, max_message_size: {}, peer_queue_capacity: {}, \
+ reconnect_period: {}, keepalive_idle: {}, keepalive_interval: {},
\
+ keepalive_retries: {}, close_peer_timeout: {}, close_grace: {}, \
+ tcp_tls_listen_addr: {:?}, wss_listen_addr: {:?} }}",
+ self.max_batch,
+ self.max_message_size,
+ self.peer_queue_capacity,
+ self.reconnect_period,
+ self.keepalive_idle,
+ self.keepalive_interval,
+ self.keepalive_retries,
+ self.close_peer_timeout,
+ self.close_grace,
+ self.tcp_tls_listen_addr,
+ self.wss_listen_addr,
+ )
+ }
+}
diff --git a/core/configs/src/server_ng_config/message_bus.rs
b/core/configs/src/server_ng_config/message_bus.rs
new file mode 100644
index 000000000..a76b87e04
--- /dev/null
+++ b/core/configs/src/server_ng_config/message_bus.rs
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! On-disk schema for the inter-shard / inter-replica message bus.
+//!
+//! Mirrors the runtime `core::message_bus::config::MessageBusConfig`
+//! field-for-field, but uses configs-crate idioms:
+//!
+//! - `Duration` -> [`IggyDuration`] (DisplayFromStr-serde, `"5 s"` syntax)
+//! - `usize` / `u32` -> kept as the underlying integer type
+//! - `Option<SocketAddr>` -> `Option<String>` parsed at validate-time
+//! - WebSocket frame-layer tunables (`compio_ws::WebSocketConfig`) are
+//! NOT part of this section; they live under `[websocket]` (the
+//! existing [`super::super::server_config::websocket::WebSocketConfig`]).
+//! The future wiring PR builds the runtime `WebSocketConfig` from that
+//! section and feeds it into the bus at construction.
+//!
+//! Construction of the runtime type from this struct happens in the
+//! follow-up PR that wires `core/server-ng` to call
+//! [`super::server_ng::ServerNgConfig::load`].
+
+use super::COMPONENT_NG;
+use crate::ConfigurationError;
+use configs::ConfigEnv;
+use iggy_common::{IggyByteSize, IggyDuration, Validatable};
+use serde::{Deserialize, Serialize};
+use serde_with::{DisplayFromStr, serde_as};
+use std::net::SocketAddr;
+
+/// Hard upper bound on [`MessageBusConfig::max_batch`], in iovecs.
+///
+/// Mirrors `core::message_bus::config::IOV_MAX_LIMIT`. Duplicated here
+/// rather than depended on, so `core/configs` does not need a build-time
+/// dependency on `core/message_bus` (the runtime crate is the eventual
+/// consumer of this config; reversing the edge would invert the workspace
+/// graph). The runtime crate re-asserts the invariant inside
+/// `IggyMessageBus::with_config`. A unit test below pins the literal so
+/// any future bump on the runtime side surfaces as a configs-build
+/// failure until both are reconciled.
+pub const IOV_MAX_LIMIT_NG: usize = 512;
+
+/// Tunables for the message bus that ships consensus traffic between
+/// replicas and SDK-client traffic between shards.
+#[serde_as]
+#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
+pub struct MessageBusConfig {
+ /// Maximum number of `BusMessage` entries the writer task coalesces
+ /// into a single `writev(2)` call. Higher values amortise syscalls
+ /// at the cost of tail latency. Capped at [`IOV_MAX_LIMIT_NG`].
+ pub max_batch: usize,
+
+ /// Wire-level cap on a single framed message. Read-side validator;
+ /// undersize or oversize frames are rejected and the connection torn
+ /// down.
+ #[config_env(leaf)]
+ pub max_message_size: IggyByteSize,
+
+ /// Bound on the per-peer mpsc queue. Writer task drains; the
+ /// `send_to_*` path enqueues. Too small drops under burst; too
+ /// large delays backpressure signalling.
+ pub peer_queue_capacity: usize,
+
+ /// Interval between outbound reconnect attempts to peers with
+ /// `peer_id > self_id`.
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub reconnect_period: IggyDuration,
+
+ /// TCP keepalive idle timer. See `tcp(7)` `TCP_KEEPIDLE`.
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub keepalive_idle: IggyDuration,
+
+ /// TCP keepalive probe interval. See `tcp(7)` `TCP_KEEPINTVL`.
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub keepalive_interval: IggyDuration,
+
+ /// TCP keepalive retry count. See `tcp(7)` `TCP_KEEPCNT`. Combined
+ /// with idle + interval this gates the half-open connection
+ /// detection window that VSR view-change timers must accommodate.
+ pub keepalive_retries: u32,
+
+ /// Timeout for per-peer close drain (flush writer, tear down
+ /// reader) before force-cancellation.
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub close_peer_timeout: IggyDuration,
+
+ /// Wall-clock bound on a single `stream.shutdown()` (or
+ /// `ws.close()`) invocation in the safe-shutdown sequence of the
+ /// TLS-family transports. Independent of [`Self::close_peer_timeout`]
+ /// (which bounds the registry-level drain over both reader and
+ /// writer joins).
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub close_grace: IggyDuration,
+
+ /// Optional TCP-TLS client listener address in `host:port` form.
+ /// `None` keeps the plane unbound. Resolved to [`SocketAddr`] by
+ /// [`Validatable::validate`].
+ #[serde(default)]
+ pub tcp_tls_listen_addr: Option<String>,
+
+ /// Optional WSS client listener address in `host:port` form.
+ /// `None` keeps the plane unbound. Resolved to [`SocketAddr`] by
+ /// [`Validatable::validate`].
+ #[serde(default)]
+ pub wss_listen_addr: Option<String>,
+}
+
+impl Validatable<ConfigurationError> for MessageBusConfig {
+ fn validate(&self) -> Result<(), ConfigurationError> {
+ if self.max_batch == 0 {
+ eprintln!("{COMPONENT_NG} message_bus.max_batch must be > 0");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+ if self.max_batch > IOV_MAX_LIMIT_NG {
+ eprintln!(
+ "{COMPONENT_NG} message_bus.max_batch ({}) exceeds
IOV_MAX_LIMIT ({IOV_MAX_LIMIT_NG})",
+ self.max_batch
+ );
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+ if self.peer_queue_capacity == 0 {
+ eprintln!("{COMPONENT_NG} message_bus.peer_queue_capacity must be
> 0");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+ if self.max_message_size.as_bytes_u64() == 0 {
+ eprintln!("{COMPONENT_NG} message_bus.max_message_size must be >
0");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+ if self.keepalive_retries == 0 {
+ eprintln!("{COMPONENT_NG} message_bus.keepalive_retries must be >
0");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+ // Resolve optional listen addrs eagerly so a typo fails at boot,
+ // not at first connect.
+ if let Some(s) = &self.tcp_tls_listen_addr {
+ s.parse::<SocketAddr>().map_err(|e| {
+ eprintln!("{COMPONENT_NG} message_bus.tcp_tls_listen_addr
'{s}' is invalid: {e}");
+ ConfigurationError::InvalidConfigurationValue
+ })?;
+ }
+ if let Some(s) = &self.wss_listen_addr {
+ s.parse::<SocketAddr>().map_err(|e| {
+ eprintln!("{COMPONENT_NG} message_bus.wss_listen_addr '{s}' is
invalid: {e}");
+ ConfigurationError::InvalidConfigurationValue
+ })?;
+ }
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn baseline() -> MessageBusConfig {
+ MessageBusConfig::default()
+ }
+
+ #[test]
+ fn default_validates() {
+ baseline().validate().expect("default config validates");
+ }
+
+ #[test]
+ fn rejects_zero_max_batch() {
+ let mut c = baseline();
+ c.max_batch = 0;
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn rejects_max_batch_above_iov_max() {
+ let mut c = baseline();
+ c.max_batch = IOV_MAX_LIMIT_NG + 1;
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn accepts_max_batch_at_iov_max() {
+ let mut c = baseline();
+ c.max_batch = IOV_MAX_LIMIT_NG;
+ assert!(c.validate().is_ok());
+ }
+
+ #[test]
+ fn rejects_zero_peer_queue_capacity() {
+ let mut c = baseline();
+ c.peer_queue_capacity = 0;
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn rejects_zero_max_message_size() {
+ let mut c = baseline();
+ c.max_message_size = IggyByteSize::from(0_u64);
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn rejects_zero_keepalive_retries() {
+ let mut c = baseline();
+ c.keepalive_retries = 0;
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn rejects_invalid_tcp_tls_listen_addr() {
+ let mut c = baseline();
+ c.tcp_tls_listen_addr = Some("not-a-socket-addr".to_string());
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn rejects_invalid_wss_listen_addr() {
+ let mut c = baseline();
+ c.wss_listen_addr = Some("nope".to_string());
+ assert!(c.validate().is_err());
+ }
+
+ #[test]
+ fn accepts_valid_listen_addrs() {
+ let mut c = baseline();
+ c.tcp_tls_listen_addr = Some("0.0.0.0:9443".to_string());
+ c.wss_listen_addr = Some("127.0.0.1:9444".to_string());
+ c.validate().expect("valid listen addrs accepted");
+ }
+
+ /// Tripwire: pins the local copy of `IOV_MAX_LIMIT` against the
+ /// runtime crate's value. If `core/message_bus` ever bumps its
+ /// `IOV_MAX_LIMIT`, this test fails the configs build until the
+ /// duplicate here is updated. We pin the literal because
+ /// `core/configs` does not depend on `core/message_bus`.
+ #[test]
+ fn iov_max_limit_matches_runtime_crate() {
+ assert_eq!(IOV_MAX_LIMIT_NG, 512);
+ }
+}
diff --git a/core/configs/src/server_ng_config/mod.rs
b/core/configs/src/server_ng_config/mod.rs
new file mode 100644
index 000000000..34b664fbf
--- /dev/null
+++ b/core/configs/src/server_ng_config/mod.rs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! On-disk schema for the `server-ng` binary.
+//!
+//! Mirrors the legacy server-config section surface verbatim
+//! (operator-facing schema is unchanged) and adds a `message_bus` section
+//! for inter-shard / inter-replica bus tunables previously hardcoded in
+//! the `core/message_bus` runtime crate.
+//!
+//! Scaffolding-only at the time of introduction: the type is defined and
+//! loadable but no binary calls [`server_ng::ServerNgConfig::load`]; the
+//! wiring PR for `core/server-ng`'s bootstrap and the message_bus crate's
+//! runtime type is a separate change.
+
+pub mod defaults;
+pub mod displays;
+pub mod message_bus;
+pub mod server_ng;
+pub mod validators;
+
+/// Component tag used in error messages for the server-ng config surface.
+/// Mirrors [`crate::COMPONENT`] (`"CONFIG"`).
+pub const COMPONENT_NG: &str = "CONFIG_NG";
diff --git a/core/configs/src/server_ng_config/server_ng.rs
b/core/configs/src/server_ng_config/server_ng.rs
new file mode 100644
index 000000000..1df243ba1
--- /dev/null
+++ b/core/configs/src/server_ng_config/server_ng.rs
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::COMPONENT_NG;
+use super::message_bus::MessageBusConfig;
+use crate::ConfigurationError;
+use crate::server_config::cluster::ClusterConfig;
+use crate::server_config::http::HttpConfig;
+use crate::server_config::quic::QuicConfig;
+use crate::server_config::server::{
+ ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig,
MessageSaverConfig,
+ PersonalAccessTokenConfig, TelemetryConfig,
+};
+use crate::server_config::system::SystemConfig;
+use crate::server_config::tcp::TcpConfig;
+use crate::server_config::websocket::WebSocketConfig;
+use configs::{ConfigEnv, ConfigEnvMappings, ConfigProvider,
FileConfigProvider, TypedEnvProvider};
+use err_trail::ErrContext;
+use figment::providers::{Format, Toml};
+use figment::value::Dict;
+use figment::{Metadata, Profile, Provider};
+use iggy_common::Validatable;
+use serde::{Deserialize, Serialize};
+use std::env;
+use std::sync::Arc;
+
+const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml";
+
+/// Top-level on-disk config schema for the `server-ng` binary.
+///
+/// Mirrors the legacy [`crate::server::ServerConfig`] section surface
+/// verbatim (operator-facing schema is unchanged) and adds a
+/// [`MessageBusConfig`] section for inter-shard / inter-replica bus
+/// tunables.
+///
+/// Section types are reused directly from the legacy server-config
+/// modules; only [`MessageBusConfig`] and this composer are net-new
+/// code at the time of introduction.
+///
+/// At the time of introduction this type is NOT consumed by
+/// `core/server-ng`'s bootstrap. The wiring PR is a separate change.
+#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
+#[config_env(prefix = "IGGY_", name = "iggy-server-ng-config")]
+pub struct ServerNgConfig {
+ pub consumer_group: ConsumerGroupConfig,
+ pub data_maintenance: DataMaintenanceConfig,
+ pub message_saver: MessageSaverConfig,
+ pub personal_access_token: PersonalAccessTokenConfig,
+ pub heartbeat: HeartbeatConfig,
+ pub system: Arc<SystemConfig>,
+ pub quic: QuicConfig,
+ pub tcp: TcpConfig,
+ pub http: HttpConfig,
+ pub websocket: WebSocketConfig,
+ pub telemetry: TelemetryConfig,
+ pub cluster: ClusterConfig,
+ pub message_bus: MessageBusConfig,
+}
+
+impl ServerNgConfig {
+ /// Load server-ng configuration from file and environment variables.
+ ///
+ /// Mirrors [`crate::server::ServerConfig::load`]: the path comes
+ /// from `IGGY_CONFIG_PATH` or defaults to
+ /// `core/server-ng/config.toml`; missing on-disk paths fall through
+ /// to the embedded default TOML; env-var overrides flow through the
+ /// [`ServerNgConfigEnvProvider`]; the result is validated before
+ /// returning.
+ ///
+ /// # Errors
+ /// Returns [`ConfigurationError`] when the config cannot be parsed
+ /// from the configured source(s) or fails [`Validatable::validate`].
+ pub async fn load() -> Result<ServerNgConfig, ConfigurationError> {
+ let config_path =
+ env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_|
DEFAULT_CONFIG_PATH.to_string());
+ let provider = ServerNgConfig::config_provider(&config_path);
+ let cfg: ServerNgConfig =
+ provider
+ .load_config()
+ .await
+ .error(|e: &configs::ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to load
server-ng config")
+ })?;
+ cfg.validate().error(|e: &configs::ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
server-ng config")
+ })?;
+ Ok(cfg)
+ }
+
+ /// Build the file-backed config provider with the embedded default
+ /// TOML and the type-safe env-var provider attached.
+ pub fn config_provider(config_path: &str) ->
FileConfigProvider<ServerNgConfigEnvProvider> {
+ let default_config =
Toml::string(include_str!("../../../server-ng/config.toml"));
+ FileConfigProvider::new(
+ config_path.to_string(),
+ ServerNgConfigEnvProvider::default(),
+ true,
+ Some(default_config),
+ )
+ }
+
+ /// All recognised env var names for [`ServerNgConfig`].
+ pub fn all_env_var_names() -> Vec<&'static str> {
+ <ServerNgConfig as ConfigEnvMappings>::all_env_var_names()
+ }
+}
+
+/// Type-safe environment provider for [`ServerNgConfig`].
+///
+/// Uses the [`ConfigEnvMappings`] trait generated by `#[derive(ConfigEnv)]`
+/// to look up known env var names directly, eliminating path ambiguity.
+#[derive(Debug, Clone)]
+pub struct ServerNgConfigEnvProvider {
+ provider: TypedEnvProvider<ServerNgConfig>,
+}
+
+impl Default for ServerNgConfigEnvProvider {
+ fn default() -> Self {
+ Self {
+ provider:
TypedEnvProvider::from_config(ServerNgConfig::ENV_PREFIX),
+ }
+ }
+}
+
+impl Provider for ServerNgConfigEnvProvider {
+ fn metadata(&self) -> Metadata {
+ Metadata::named(ServerNgConfig::ENV_PROVIDER_NAME)
+ }
+
+ fn data(&self) -> Result<figment::value::Map<Profile, Dict>,
figment::Error> {
+ self.provider.deserialize().map_err(|e| {
+ figment::Error::from(format!(
+ "Cannot deserialize environment variables for server-ng
config: {e}"
+ ))
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use figment::Figment;
+
+ /// The embedded default TOML deserializes into a fully populated
+ /// [`ServerNgConfig`] and passes validation. Exercises the
+ /// `include_str!` resolution and the deserialization of every
+ /// section without depending on an async runtime in `dev-deps`.
+ #[test]
+ fn embedded_default_toml_deserializes_and_validates() {
+ let toml_str = include_str!("../../../server-ng/config.toml");
+ let cfg: ServerNgConfig = Figment::new()
+ .merge(Toml::string(toml_str))
+ .extract()
+ .expect("embedded TOML deserializes");
+ cfg.validate().expect("embedded default validates");
+
+ // Spot-check: defaults match the runtime crate's invariants.
+ assert_eq!(cfg.message_bus.max_batch, 256);
+ assert_eq!(cfg.message_bus.peer_queue_capacity, 256);
+ assert_eq!(cfg.message_bus.keepalive_retries, 3);
+ assert!(cfg.message_bus.tcp_tls_listen_addr.is_none());
+ assert!(cfg.message_bus.wss_listen_addr.is_none());
+ }
+
+ #[test]
+ fn default_impl_validates() {
+ let cfg = ServerNgConfig::default();
+ cfg.validate().expect("Default impl validates");
+ }
+
+ #[test]
+ fn env_prefix_is_iggy() {
+ assert_eq!(ServerNgConfig::ENV_PREFIX, "IGGY_");
+ }
+
+ #[test]
+ fn all_env_var_names_include_message_bus_section() {
+ let names = ServerNgConfig::all_env_var_names();
+ assert!(
+ names.iter().any(|n| n.starts_with("IGGY_MESSAGE_BUS_")),
+ "expected at least one IGGY_MESSAGE_BUS_* env var, got: {names:?}"
+ );
+ }
+}
diff --git a/core/configs/src/server_ng_config/validators.rs
b/core/configs/src/server_ng_config/validators.rs
new file mode 100644
index 000000000..d77c82e61
--- /dev/null
+++ b/core/configs/src/server_ng_config/validators.rs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! [`Validatable`] for [`ServerNgConfig`].
+//!
+//! Mirrors the section-by-section delegation of
+//! [`crate::validators`]'s `impl Validatable for ServerConfig`, plus a
+//! call into [`super::message_bus::MessageBusConfig::validate`] for the
+//! new section. The cross-section invariants (topic vs segment sizing,
+//! JWT gating when HTTP is enabled, server-default expiry sanity) are
+//! mirrored exactly so server-ng inherits the same boot-time safety
+//! net.
+
+use super::COMPONENT_NG;
+use super::server_ng::ServerNgConfig;
+use crate::ConfigurationError;
+use err_trail::ErrContext;
+use iggy_common::{IggyExpiry, MaxTopicSize, Validatable};
+
+impl Validatable<ConfigurationError> for ServerNgConfig {
+ fn validate(&self) -> Result<(), ConfigurationError> {
+ self.system
+ .memory_pool
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
memory pool config")
+ })?;
+ self.data_maintenance
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate data
maintenance config")
+ })?;
+ self.personal_access_token
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!(
+ "{COMPONENT_NG} (error: {e}) - failed to validate personal
access token config"
+ )
+ })?;
+ self.system
+ .segment
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
segment config")
+ })?;
+ self.system
+ .compression
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
compression config")
+ })?;
+ self.telemetry.validate().error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
telemetry config")
+ })?;
+ self.system
+ .sharding
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
sharding config")
+ })?;
+ self.cluster.validate().error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate cluster
config")
+ })?;
+ self.system
+ .logging
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
logging config")
+ })?;
+ self.message_saver
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
message saver config")
+ })?;
+
+ let topic_size = match self.system.topic.max_size {
+ MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
+ MaxTopicSize::Unlimited => Ok(u64::MAX),
+ MaxTopicSize::ServerDefault => {
+ eprintln!("system.topic.max_size cannot be ServerDefault in
server-ng config");
+ Err(ConfigurationError::InvalidConfigurationValue)
+ }
+ }?;
+
+ if let IggyExpiry::ServerDefault = self.system.topic.message_expiry {
+ eprintln!("system.topic.message_expiry cannot be ServerDefault in
server-ng config");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ if self.http.enabled
+ && let IggyExpiry::ServerDefault =
self.http.jwt.access_token_expiry
+ {
+ eprintln!("http.jwt.access_token_expiry cannot be ServerDefault
when HTTP is enabled");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ if topic_size < self.system.segment.size.as_bytes_u64() {
+ eprintln!(
+ "system.topic.max_size ({} B) must be >= system.segment.size
({} B)",
+ topic_size,
+ self.system.segment.size.as_bytes_u64()
+ );
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ self.message_bus
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT_NG} (error: {e}) - failed to validate
message_bus config")
+ })?;
+
+ Ok(())
+ }
+}
diff --git a/core/server-ng/config.toml b/core/server-ng/config.toml
index e24f73fce..2754b787a 100644
--- a/core/server-ng/config.toml
+++ b/core/server-ng/config.toml
@@ -583,3 +583,43 @@ enabled = false
self_signed = true
cert_file = "core/certs/iggy_cert.pem"
key_file = "core/certs/iggy_key.pem"
+
+# Message bus configuration.
+# Tunables for the inter-shard / inter-replica internal bus that ships
+# consensus traffic between replicas and SDK-client traffic between
+# shards. These knobs are consensus-liveness-critical (keepalive +
+# reconnect timers gate VSR view-change latency; max_batch gates
+# throughput under backpressure). Defaults match
+# core::message_bus::config::MessageBusConfig::default().
+[message_bus]
+# Maximum number of BusMessage entries coalesced into a single writev(2)
+# call. Hard upper bound: IOV_MAX/2 = 512 on Linux.
+max_batch = 256
+
+# Wire-level cap on a single framed message.
+max_message_size = "64 MiB"
+
+# Bound on the per-peer mpsc queue. The writer task drains; the
+# send_to_* path enqueues.
+peer_queue_capacity = 256
+
+# Interval between outbound reconnect attempts to peers with peer_id > self_id.
+reconnect_period = "5 s"
+
+# TCP keepalive tunables. See tcp(7).
+keepalive_idle = "10 s"
+keepalive_interval = "5 s"
+keepalive_retries = 3
+
+# Timeout for per-peer close drain (flush writer, tear down reader)
+# before force-cancellation.
+close_peer_timeout = "2 s"
+
+# Wall-clock bound on a single stream.shutdown() / ws.close() in the
+# safe-shutdown sequence of the TLS-family transports.
+close_grace = "2 s"
+
+# Optional listen addresses for the TLS-family transports. Comment out
+# to keep the plane unbound.
+# tcp_tls_listen_addr = "0.0.0.0:9443"
+# wss_listen_addr = "0.0.0.0:9444"