This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new cca0e04e7 feat(configs): add ServerNgConfig schema with message_bus 
section (#3189)
cca0e04e7 is described below

commit cca0e04e7e2aa17b3abc32228ae8cef8cccfcca8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Apr 28 14:02:05 2026 +0200

    feat(configs): add ServerNgConfig schema with message_bus section (#3189)
    
    Introduces a new on-disk config type ServerNgConfig parallel to the
    legacy ServerConfig, mirroring its full section surface (consumer_group,
    data_maintenance, message_saver, personal_access_token, heartbeat,
    system, quic, tcp, http, websocket, telemetry, cluster) and adding a
    new [message_bus] section that captures the runtime tunables previously
    hardcoded in core::message_bus::config::MessageBusConfig.
    
    Section types are reused verbatim from server_config::* (operator-facing
    schema is identical between server and server-ng); only the new
    MessageBusConfig and the top-level composer are net-new code.
    
    The type defines a loader following ServerConfig::load()'s shape
    (FileConfigProvider + TypedEnvProvider, IGGY_ env prefix,
    include_str!(server-ng/config.toml) embedded default) and a Validatable
    impl that asserts the runtime invariants (max_batch <= IOV_MAX_LIMIT,
    peer_queue_capacity > 0, listen-addr parse-ability) without depending
    on core/message_bus.
    
    Scaffolding only. No binary calls ServerNgConfig::load(); the wiring
    PR for core/server-ng's bootstrap and the message_bus crate's runtime
    type is intentionally a separate change.
    
    core/server-ng/config.toml gains a [message_bus] block with defaults
    matching MessageBusConfig::default() in the runtime crate.
---
 core/configs/src/lib.rs                          |   2 +
 core/configs/src/server_ng_config/defaults.rs    |  94 +++++++++
 core/configs/src/server_ng_config/displays.rs    |  72 +++++++
 core/configs/src/server_ng_config/message_bus.rs | 257 +++++++++++++++++++++++
 core/configs/src/server_ng_config/mod.rs         |  40 ++++
 core/configs/src/server_ng_config/server_ng.rs   | 200 ++++++++++++++++++
 core/configs/src/server_ng_config/validators.rs  | 130 ++++++++++++
 core/server-ng/config.toml                       |  40 ++++
 8 files changed, 835 insertions(+)

diff --git a/core/configs/src/lib.rs b/core/configs/src/lib.rs
index 419c77b8a..3b61c99fb 100644
--- a/core/configs/src/lib.rs
+++ b/core/configs/src/lib.rs
@@ -21,6 +21,7 @@ extern crate self as configs;
 
 mod configs_impl;
 mod server_config;
+mod server_ng_config;
 pub use configs_derive::ConfigEnv;
 pub use configs_impl::{
     ConfigEnvMappings, ConfigProvider, ConfigurationError, ConfigurationType, 
EnvVarMapping,
@@ -30,3 +31,4 @@ pub use server_config::{
     COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server, 
sharding, system,
     tcp, validators, websocket,
 };
+pub use server_ng_config::{COMPONENT_NG, message_bus, server_ng};
diff --git a/core/configs/src/server_ng_config/defaults.rs 
b/core/configs/src/server_ng_config/defaults.rs
new file mode 100644
index 000000000..9218d0828
--- /dev/null
+++ b/core/configs/src/server_ng_config/defaults.rs
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! `Default` impls for the server-ng config surface.
+//!
+//! Section sub-types are reused verbatim from the legacy server config;
+//! their existing `Default` impls (declared in [`crate::defaults`]) read
+//! from `core/server/config.toml` via `static_toml!`. This file
+//! delegates to those impls for every reused section, so drift between
+//! the two TOMLs is intentionally absorbed at the `server-ng` consumer
+//! level (the wiring PR will read `core/server-ng/config.toml`
+//! end-to-end through [`super::server_ng::ServerNgConfig::load`] and
+//! overrides will take effect there).
+//!
+//! Only [`MessageBusConfig`] needs its own `Default` because no legacy
+//! section maps to it. The defaults are hardcoded to match
+//! `core::message_bus::config::MessageBusConfig::default()` byte-for-byte.
+
+use super::message_bus::MessageBusConfig;
+use super::server_ng::ServerNgConfig;
+use crate::server_config::cluster::ClusterConfig;
+use crate::server_config::http::HttpConfig;
+use crate::server_config::quic::QuicConfig;
+use crate::server_config::server::{
+    ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, 
MessageSaverConfig,
+    PersonalAccessTokenConfig, TelemetryConfig,
+};
+use crate::server_config::system::SystemConfig;
+use crate::server_config::tcp::TcpConfig;
+use crate::server_config::websocket::WebSocketConfig;
+use iggy_common::IggyByteSize;
+use std::sync::Arc;
+
+impl Default for ServerNgConfig {
+    fn default() -> ServerNgConfig {
+        ServerNgConfig {
+            consumer_group: ConsumerGroupConfig::default(),
+            data_maintenance: DataMaintenanceConfig::default(),
+            heartbeat: HeartbeatConfig::default(),
+            message_saver: MessageSaverConfig::default(),
+            personal_access_token: PersonalAccessTokenConfig::default(),
+            system: Arc::new(SystemConfig::default()),
+            quic: QuicConfig::default(),
+            tcp: TcpConfig::default(),
+            websocket: WebSocketConfig::default(),
+            http: HttpConfig::default(),
+            telemetry: TelemetryConfig::default(),
+            cluster: ClusterConfig::default(),
+            message_bus: MessageBusConfig::default(),
+        }
+    }
+}
+
+impl Default for MessageBusConfig {
+    fn default() -> MessageBusConfig {
+        // Keep these literals in lock-step with
+        // `core::message_bus::config::MessageBusConfig::default()` and
+        // with the `[message_bus]` block in
+        // `core/server-ng/config.toml`. The unit test
+        // `message_bus::tests::default_validates` proves the values
+        // survive `Validatable::validate`; a future PR may wire a
+        // round-trip test against the embedded TOML once
+        // `ServerNgConfig::load` is exercised end-to-end.
+        MessageBusConfig {
+            max_batch: 256,
+            max_message_size: IggyByteSize::from(64_u64 * 1024 * 1024),
+            peer_queue_capacity: 256,
+            reconnect_period: "5 s".parse().expect("'5 s' parses as 
IggyDuration"),
+            keepalive_idle: "10 s".parse().expect("'10 s' parses as 
IggyDuration"),
+            keepalive_interval: "5 s".parse().expect("'5 s' parses as 
IggyDuration"),
+            keepalive_retries: 3,
+            close_peer_timeout: "2 s".parse().expect("'2 s' parses as 
IggyDuration"),
+            close_grace: "2 s".parse().expect("'2 s' parses as IggyDuration"),
+            tcp_tls_listen_addr: None,
+            wss_listen_addr: None,
+        }
+    }
+}
diff --git a/core/configs/src/server_ng_config/displays.rs 
b/core/configs/src/server_ng_config/displays.rs
new file mode 100644
index 000000000..1c286aac3
--- /dev/null
+++ b/core/configs/src/server_ng_config/displays.rs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! `Display` impls for the server-ng config surface.
+//!
+//! Reused section types pick up [`Display`] from
+//! [`crate::displays`]; this module only adds the top-level
+//! [`ServerNgConfig`] formatter and the new [`MessageBusConfig`]
+//! section formatter.
+
+use super::message_bus::MessageBusConfig;
+use super::server_ng::ServerNgConfig;
+use std::fmt::{Display, Formatter};
+
+impl Display for ServerNgConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ consumer_group: {}, data_maintenance: {}, message_saver: {}, 
heartbeat: {}, \
+             system: {}, quic: {}, tcp: {}, http: {}, telemetry: {}, 
message_bus: {} }}",
+            self.consumer_group,
+            self.data_maintenance,
+            self.message_saver,
+            self.heartbeat,
+            self.system,
+            self.quic,
+            self.tcp,
+            self.http,
+            self.telemetry,
+            self.message_bus,
+        )
+    }
+}
+
+impl Display for MessageBusConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ max_batch: {}, max_message_size: {}, peer_queue_capacity: {}, \
+             reconnect_period: {}, keepalive_idle: {}, keepalive_interval: {}, 
\
+             keepalive_retries: {}, close_peer_timeout: {}, close_grace: {}, \
+             tcp_tls_listen_addr: {:?}, wss_listen_addr: {:?} }}",
+            self.max_batch,
+            self.max_message_size,
+            self.peer_queue_capacity,
+            self.reconnect_period,
+            self.keepalive_idle,
+            self.keepalive_interval,
+            self.keepalive_retries,
+            self.close_peer_timeout,
+            self.close_grace,
+            self.tcp_tls_listen_addr,
+            self.wss_listen_addr,
+        )
+    }
+}
diff --git a/core/configs/src/server_ng_config/message_bus.rs 
b/core/configs/src/server_ng_config/message_bus.rs
new file mode 100644
index 000000000..a76b87e04
--- /dev/null
+++ b/core/configs/src/server_ng_config/message_bus.rs
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! On-disk schema for the inter-shard / inter-replica message bus.
+//!
+//! Mirrors the runtime `core::message_bus::config::MessageBusConfig`
+//! field-for-field, but uses configs-crate idioms:
+//!
+//! - `Duration` -> [`IggyDuration`] (DisplayFromStr-serde, `"5 s"` syntax)
+//! - `usize` / `u32` -> kept as the underlying integer type
+//! - `Option<SocketAddr>` -> `Option<String>` parsed at validate-time
+//! - WebSocket frame-layer tunables (`compio_ws::WebSocketConfig`) are
+//!   NOT part of this section; they live under `[websocket]` (the
+//!   existing [`super::super::server_config::websocket::WebSocketConfig`]).
+//!   The future wiring PR builds the runtime `WebSocketConfig` from that
+//!   section and feeds it into the bus at construction.
+//!
+//! Construction of the runtime type from this struct happens in the
+//! follow-up PR that wires `core/server-ng` to call
+//! [`super::server_ng::ServerNgConfig::load`].
+
+use super::COMPONENT_NG;
+use crate::ConfigurationError;
+use configs::ConfigEnv;
+use iggy_common::{IggyByteSize, IggyDuration, Validatable};
+use serde::{Deserialize, Serialize};
+use serde_with::{DisplayFromStr, serde_as};
+use std::net::SocketAddr;
+
+/// Hard upper bound on [`MessageBusConfig::max_batch`], in iovecs.
+///
+/// Mirrors `core::message_bus::config::IOV_MAX_LIMIT`. Duplicated here
+/// rather than depended on, so `core/configs` does not need a build-time
+/// dependency on `core/message_bus` (the runtime crate is the eventual
+/// consumer of this config; reversing the edge would invert the workspace
+/// graph). The runtime crate re-asserts the invariant inside
+/// `IggyMessageBus::with_config`. A unit test below pins the literal so
+/// any future bump on the runtime side surfaces as a configs-build
+/// failure until both are reconciled.
+pub const IOV_MAX_LIMIT_NG: usize = 512;
+
+/// Tunables for the message bus that ships consensus traffic between
+/// replicas and SDK-client traffic between shards.
+#[serde_as]
+#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
+pub struct MessageBusConfig {
+    /// Maximum number of `BusMessage` entries the writer task coalesces
+    /// into a single `writev(2)` call. Higher values amortise syscalls
+    /// at the cost of tail latency. Capped at [`IOV_MAX_LIMIT_NG`].
+    pub max_batch: usize,
+
+    /// Wire-level cap on a single framed message. Read-side validator;
+    /// undersize or oversize frames are rejected and the connection torn
+    /// down.
+    #[config_env(leaf)]
+    pub max_message_size: IggyByteSize,
+
+    /// Bound on the per-peer mpsc queue. Writer task drains; the
+    /// `send_to_*` path enqueues. Too small drops under burst; too
+    /// large delays backpressure signalling.
+    pub peer_queue_capacity: usize,
+
+    /// Interval between outbound reconnect attempts to peers with
+    /// `peer_id > self_id`.
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub reconnect_period: IggyDuration,
+
+    /// TCP keepalive idle timer. See `tcp(7)` `TCP_KEEPIDLE`.
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub keepalive_idle: IggyDuration,
+
+    /// TCP keepalive probe interval. See `tcp(7)` `TCP_KEEPINTVL`.
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub keepalive_interval: IggyDuration,
+
+    /// TCP keepalive retry count. See `tcp(7)` `TCP_KEEPCNT`. Combined
+    /// with idle + interval this gates the half-open connection
+    /// detection window that VSR view-change timers must accommodate.
+    pub keepalive_retries: u32,
+
+    /// Timeout for per-peer close drain (flush writer, tear down
+    /// reader) before force-cancellation.
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub close_peer_timeout: IggyDuration,
+
+    /// Wall-clock bound on a single `stream.shutdown()` (or
+    /// `ws.close()`) invocation in the safe-shutdown sequence of the
+    /// TLS-family transports. Independent of [`Self::close_peer_timeout`]
+    /// (which bounds the registry-level drain over both reader and
+    /// writer joins).
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub close_grace: IggyDuration,
+
+    /// Optional TCP-TLS client listener address in `host:port` form.
+    /// `None` keeps the plane unbound. Resolved to [`SocketAddr`] by
+    /// [`Validatable::validate`].
+    #[serde(default)]
+    pub tcp_tls_listen_addr: Option<String>,
+
+    /// Optional WSS client listener address in `host:port` form.
+    /// `None` keeps the plane unbound. Resolved to [`SocketAddr`] by
+    /// [`Validatable::validate`].
+    #[serde(default)]
+    pub wss_listen_addr: Option<String>,
+}
+
+impl Validatable<ConfigurationError> for MessageBusConfig {
+    fn validate(&self) -> Result<(), ConfigurationError> {
+        if self.max_batch == 0 {
+            eprintln!("{COMPONENT_NG} message_bus.max_batch must be > 0");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+        if self.max_batch > IOV_MAX_LIMIT_NG {
+            eprintln!(
+                "{COMPONENT_NG} message_bus.max_batch ({}) exceeds 
IOV_MAX_LIMIT ({IOV_MAX_LIMIT_NG})",
+                self.max_batch
+            );
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+        if self.peer_queue_capacity == 0 {
+            eprintln!("{COMPONENT_NG} message_bus.peer_queue_capacity must be 
> 0");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+        if self.max_message_size.as_bytes_u64() == 0 {
+            eprintln!("{COMPONENT_NG} message_bus.max_message_size must be > 
0");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+        if self.keepalive_retries == 0 {
+            eprintln!("{COMPONENT_NG} message_bus.keepalive_retries must be > 
0");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+        // Resolve optional listen addrs eagerly so a typo fails at boot,
+        // not at first connect.
+        if let Some(s) = &self.tcp_tls_listen_addr {
+            s.parse::<SocketAddr>().map_err(|e| {
+                eprintln!("{COMPONENT_NG} message_bus.tcp_tls_listen_addr 
'{s}' is invalid: {e}");
+                ConfigurationError::InvalidConfigurationValue
+            })?;
+        }
+        if let Some(s) = &self.wss_listen_addr {
+            s.parse::<SocketAddr>().map_err(|e| {
+                eprintln!("{COMPONENT_NG} message_bus.wss_listen_addr '{s}' is 
invalid: {e}");
+                ConfigurationError::InvalidConfigurationValue
+            })?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn baseline() -> MessageBusConfig {
+        MessageBusConfig::default()
+    }
+
+    #[test]
+    fn default_validates() {
+        baseline().validate().expect("default config validates");
+    }
+
+    #[test]
+    fn rejects_zero_max_batch() {
+        let mut c = baseline();
+        c.max_batch = 0;
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn rejects_max_batch_above_iov_max() {
+        let mut c = baseline();
+        c.max_batch = IOV_MAX_LIMIT_NG + 1;
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn accepts_max_batch_at_iov_max() {
+        let mut c = baseline();
+        c.max_batch = IOV_MAX_LIMIT_NG;
+        assert!(c.validate().is_ok());
+    }
+
+    #[test]
+    fn rejects_zero_peer_queue_capacity() {
+        let mut c = baseline();
+        c.peer_queue_capacity = 0;
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn rejects_zero_max_message_size() {
+        let mut c = baseline();
+        c.max_message_size = IggyByteSize::from(0_u64);
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn rejects_zero_keepalive_retries() {
+        let mut c = baseline();
+        c.keepalive_retries = 0;
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn rejects_invalid_tcp_tls_listen_addr() {
+        let mut c = baseline();
+        c.tcp_tls_listen_addr = Some("not-a-socket-addr".to_string());
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn rejects_invalid_wss_listen_addr() {
+        let mut c = baseline();
+        c.wss_listen_addr = Some("nope".to_string());
+        assert!(c.validate().is_err());
+    }
+
+    #[test]
+    fn accepts_valid_listen_addrs() {
+        let mut c = baseline();
+        c.tcp_tls_listen_addr = Some("0.0.0.0:9443".to_string());
+        c.wss_listen_addr = Some("127.0.0.1:9444".to_string());
+        c.validate().expect("valid listen addrs accepted");
+    }
+
+    /// Tripwire: pins the local copy of `IOV_MAX_LIMIT` against the
+    /// runtime crate's value. If `core/message_bus` ever bumps its
+    /// `IOV_MAX_LIMIT`, this test fails the configs build until the
+    /// duplicate here is updated. We pin the literal because
+    /// `core/configs` does not depend on `core/message_bus`.
+    #[test]
+    fn iov_max_limit_matches_runtime_crate() {
+        assert_eq!(IOV_MAX_LIMIT_NG, 512);
+    }
+}
diff --git a/core/configs/src/server_ng_config/mod.rs 
b/core/configs/src/server_ng_config/mod.rs
new file mode 100644
index 000000000..34b664fbf
--- /dev/null
+++ b/core/configs/src/server_ng_config/mod.rs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! On-disk schema for the `server-ng` binary.
+//!
+//! Mirrors the legacy server-config section surface verbatim
+//! (operator-facing schema is unchanged) and adds a `message_bus` section
+//! for inter-shard / inter-replica bus tunables previously hardcoded in
+//! the `core/message_bus` runtime crate.
+//!
+//! Scaffolding-only at the time of introduction: the type is defined and
+//! loadable but no binary calls [`server_ng::ServerNgConfig::load`]; the
+//! wiring PR for `core/server-ng`'s bootstrap and the message_bus crate's
+//! runtime type is a separate change.
+
+pub mod defaults;
+pub mod displays;
+pub mod message_bus;
+pub mod server_ng;
+pub mod validators;
+
+/// Component tag used in error messages for the server-ng config surface.
+/// Mirrors [`crate::COMPONENT`] (`"CONFIG"`).
+pub const COMPONENT_NG: &str = "CONFIG_NG";
diff --git a/core/configs/src/server_ng_config/server_ng.rs 
b/core/configs/src/server_ng_config/server_ng.rs
new file mode 100644
index 000000000..1df243ba1
--- /dev/null
+++ b/core/configs/src/server_ng_config/server_ng.rs
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::COMPONENT_NG;
+use super::message_bus::MessageBusConfig;
+use crate::ConfigurationError;
+use crate::server_config::cluster::ClusterConfig;
+use crate::server_config::http::HttpConfig;
+use crate::server_config::quic::QuicConfig;
+use crate::server_config::server::{
+    ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, 
MessageSaverConfig,
+    PersonalAccessTokenConfig, TelemetryConfig,
+};
+use crate::server_config::system::SystemConfig;
+use crate::server_config::tcp::TcpConfig;
+use crate::server_config::websocket::WebSocketConfig;
+use configs::{ConfigEnv, ConfigEnvMappings, ConfigProvider, 
FileConfigProvider, TypedEnvProvider};
+use err_trail::ErrContext;
+use figment::providers::{Format, Toml};
+use figment::value::Dict;
+use figment::{Metadata, Profile, Provider};
+use iggy_common::Validatable;
+use serde::{Deserialize, Serialize};
+use std::env;
+use std::sync::Arc;
+
+const DEFAULT_CONFIG_PATH: &str = "core/server-ng/config.toml";
+
+/// Top-level on-disk config schema for the `server-ng` binary.
+///
+/// Mirrors the legacy [`crate::server::ServerConfig`] section surface
+/// verbatim (operator-facing schema is unchanged) and adds a
+/// [`MessageBusConfig`] section for inter-shard / inter-replica bus
+/// tunables.
+///
+/// Section types are reused directly from the legacy server-config
+/// modules; only [`MessageBusConfig`] and this composer are net-new
+/// code at the time of introduction.
+///
+/// At the time of introduction this type is NOT consumed by
+/// `core/server-ng`'s bootstrap. The wiring PR is a separate change.
+#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
+#[config_env(prefix = "IGGY_", name = "iggy-server-ng-config")]
+pub struct ServerNgConfig {
+    pub consumer_group: ConsumerGroupConfig,
+    pub data_maintenance: DataMaintenanceConfig,
+    pub message_saver: MessageSaverConfig,
+    pub personal_access_token: PersonalAccessTokenConfig,
+    pub heartbeat: HeartbeatConfig,
+    pub system: Arc<SystemConfig>,
+    pub quic: QuicConfig,
+    pub tcp: TcpConfig,
+    pub http: HttpConfig,
+    pub websocket: WebSocketConfig,
+    pub telemetry: TelemetryConfig,
+    pub cluster: ClusterConfig,
+    pub message_bus: MessageBusConfig,
+}
+
+impl ServerNgConfig {
+    /// Load server-ng configuration from file and environment variables.
+    ///
+    /// Mirrors [`crate::server::ServerConfig::load`]: the path comes
+    /// from `IGGY_CONFIG_PATH` or defaults to
+    /// `core/server-ng/config.toml`; missing on-disk paths fall through
+    /// to the embedded default TOML; env-var overrides flow through the
+    /// [`ServerNgConfigEnvProvider`]; the result is validated before
+    /// returning.
+    ///
+    /// # Errors
+    /// Returns [`ConfigurationError`] when the config cannot be parsed
+    /// from the configured source(s) or fails [`Validatable::validate`].
+    pub async fn load() -> Result<ServerNgConfig, ConfigurationError> {
+        let config_path =
+            env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| 
DEFAULT_CONFIG_PATH.to_string());
+        let provider = ServerNgConfig::config_provider(&config_path);
+        let cfg: ServerNgConfig =
+            provider
+                .load_config()
+                .await
+                .error(|e: &configs::ConfigurationError| {
+                    format!("{COMPONENT_NG} (error: {e}) - failed to load 
server-ng config")
+                })?;
+        cfg.validate().error(|e: &configs::ConfigurationError| {
+            format!("{COMPONENT_NG} (error: {e}) - failed to validate 
server-ng config")
+        })?;
+        Ok(cfg)
+    }
+
+    /// Build the file-backed config provider with the embedded default
+    /// TOML and the type-safe env-var provider attached.
+    pub fn config_provider(config_path: &str) -> 
FileConfigProvider<ServerNgConfigEnvProvider> {
+        let default_config = 
Toml::string(include_str!("../../../server-ng/config.toml"));
+        FileConfigProvider::new(
+            config_path.to_string(),
+            ServerNgConfigEnvProvider::default(),
+            true,
+            Some(default_config),
+        )
+    }
+
+    /// All recognised env var names for [`ServerNgConfig`].
+    pub fn all_env_var_names() -> Vec<&'static str> {
+        <ServerNgConfig as ConfigEnvMappings>::all_env_var_names()
+    }
+}
+
+/// Type-safe environment provider for [`ServerNgConfig`].
+///
+/// Uses the [`ConfigEnvMappings`] trait generated by `#[derive(ConfigEnv)]`
+/// to look up known env var names directly, eliminating path ambiguity.
+#[derive(Debug, Clone)]
+pub struct ServerNgConfigEnvProvider {
+    provider: TypedEnvProvider<ServerNgConfig>,
+}
+
+impl Default for ServerNgConfigEnvProvider {
+    fn default() -> Self {
+        Self {
+            provider: 
TypedEnvProvider::from_config(ServerNgConfig::ENV_PREFIX),
+        }
+    }
+}
+
+impl Provider for ServerNgConfigEnvProvider {
+    fn metadata(&self) -> Metadata {
+        Metadata::named(ServerNgConfig::ENV_PROVIDER_NAME)
+    }
+
+    fn data(&self) -> Result<figment::value::Map<Profile, Dict>, 
figment::Error> {
+        self.provider.deserialize().map_err(|e| {
+            figment::Error::from(format!(
+                "Cannot deserialize environment variables for server-ng 
config: {e}"
+            ))
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use figment::Figment;
+
+    /// The embedded default TOML deserializes into a fully populated
+    /// [`ServerNgConfig`] and passes validation. Exercises the
+    /// `include_str!` resolution and the deserialization of every
+    /// section without depending on an async runtime in `dev-deps`.
+    #[test]
+    fn embedded_default_toml_deserializes_and_validates() {
+        let toml_str = include_str!("../../../server-ng/config.toml");
+        let cfg: ServerNgConfig = Figment::new()
+            .merge(Toml::string(toml_str))
+            .extract()
+            .expect("embedded TOML deserializes");
+        cfg.validate().expect("embedded default validates");
+
+        // Spot-check: defaults match the runtime crate's invariants.
+        assert_eq!(cfg.message_bus.max_batch, 256);
+        assert_eq!(cfg.message_bus.peer_queue_capacity, 256);
+        assert_eq!(cfg.message_bus.keepalive_retries, 3);
+        assert!(cfg.message_bus.tcp_tls_listen_addr.is_none());
+        assert!(cfg.message_bus.wss_listen_addr.is_none());
+    }
+
+    #[test]
+    fn default_impl_validates() {
+        let cfg = ServerNgConfig::default();
+        cfg.validate().expect("Default impl validates");
+    }
+
+    #[test]
+    fn env_prefix_is_iggy() {
+        assert_eq!(ServerNgConfig::ENV_PREFIX, "IGGY_");
+    }
+
+    #[test]
+    fn all_env_var_names_include_message_bus_section() {
+        let names = ServerNgConfig::all_env_var_names();
+        assert!(
+            names.iter().any(|n| n.starts_with("IGGY_MESSAGE_BUS_")),
+            "expected at least one IGGY_MESSAGE_BUS_* env var, got: {names:?}"
+        );
+    }
+}
diff --git a/core/configs/src/server_ng_config/validators.rs 
b/core/configs/src/server_ng_config/validators.rs
new file mode 100644
index 000000000..d77c82e61
--- /dev/null
+++ b/core/configs/src/server_ng_config/validators.rs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! [`Validatable`] for [`ServerNgConfig`].
+//!
+//! Mirrors the section-by-section delegation of
+//! [`crate::validators`]'s `impl Validatable for ServerConfig`, plus a
+//! call into [`super::message_bus::MessageBusConfig::validate`] for the
+//! new section. The cross-section invariants (topic vs segment sizing,
+//! JWT gating when HTTP is enabled, server-default expiry sanity) are
+//! mirrored exactly so server-ng inherits the same boot-time safety
+//! net.
+
+use super::COMPONENT_NG;
+use super::server_ng::ServerNgConfig;
+use crate::ConfigurationError;
+use err_trail::ErrContext;
+use iggy_common::{IggyExpiry, MaxTopicSize, Validatable};
+
+impl Validatable<ConfigurationError> for ServerNgConfig {
+    fn validate(&self) -> Result<(), ConfigurationError> {
+        self.system
+            .memory_pool
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
memory pool config")
+            })?;
+        self.data_maintenance
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate data 
maintenance config")
+            })?;
+        self.personal_access_token
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!(
+                    "{COMPONENT_NG} (error: {e}) - failed to validate personal 
access token config"
+                )
+            })?;
+        self.system
+            .segment
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
segment config")
+            })?;
+        self.system
+            .compression
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
compression config")
+            })?;
+        self.telemetry.validate().error(|e: &ConfigurationError| {
+            format!("{COMPONENT_NG} (error: {e}) - failed to validate 
telemetry config")
+        })?;
+        self.system
+            .sharding
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
sharding config")
+            })?;
+        self.cluster.validate().error(|e: &ConfigurationError| {
+            format!("{COMPONENT_NG} (error: {e}) - failed to validate cluster 
config")
+        })?;
+        self.system
+            .logging
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
logging config")
+            })?;
+        self.message_saver
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
message saver config")
+            })?;
+
+        let topic_size = match self.system.topic.max_size {
+            MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
+            MaxTopicSize::Unlimited => Ok(u64::MAX),
+            MaxTopicSize::ServerDefault => {
+                eprintln!("system.topic.max_size cannot be ServerDefault in 
server-ng config");
+                Err(ConfigurationError::InvalidConfigurationValue)
+            }
+        }?;
+
+        if let IggyExpiry::ServerDefault = self.system.topic.message_expiry {
+            eprintln!("system.topic.message_expiry cannot be ServerDefault in 
server-ng config");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        if self.http.enabled
+            && let IggyExpiry::ServerDefault = 
self.http.jwt.access_token_expiry
+        {
+            eprintln!("http.jwt.access_token_expiry cannot be ServerDefault 
when HTTP is enabled");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        if topic_size < self.system.segment.size.as_bytes_u64() {
+            eprintln!(
+                "system.topic.max_size ({} B) must be >= system.segment.size 
({} B)",
+                topic_size,
+                self.system.segment.size.as_bytes_u64()
+            );
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        self.message_bus
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT_NG} (error: {e}) - failed to validate 
message_bus config")
+            })?;
+
+        Ok(())
+    }
+}
diff --git a/core/server-ng/config.toml b/core/server-ng/config.toml
index e24f73fce..2754b787a 100644
--- a/core/server-ng/config.toml
+++ b/core/server-ng/config.toml
@@ -583,3 +583,43 @@ enabled = false
 self_signed = true
 cert_file = "core/certs/iggy_cert.pem"
 key_file = "core/certs/iggy_key.pem"
+
+# Message bus configuration.
+# Tunables for the inter-shard / inter-replica internal bus that ships
+# consensus traffic between replicas and SDK-client traffic between
+# shards. These knobs are consensus-liveness-critical (keepalive +
+# reconnect timers gate VSR view-change latency; max_batch gates
+# throughput under backpressure). Defaults match
+# core::message_bus::config::MessageBusConfig::default().
+[message_bus]
+# Maximum number of BusMessage entries coalesced into a single writev(2)
+# call. Hard upper bound: IOV_MAX/2 = 512 on Linux.
+max_batch = 256
+
+# Wire-level cap on a single framed message.
+max_message_size = "64 MiB"
+
+# Bound on the per-peer mpsc queue. The writer task drains; the
+# send_to_* path enqueues.
+peer_queue_capacity = 256
+
+# Interval between outbound reconnect attempts to peers with peer_id > self_id.
+reconnect_period = "5 s"
+
+# TCP keepalive tunables. See tcp(7).
+keepalive_idle = "10 s"
+keepalive_interval = "5 s"
+keepalive_retries = 3
+
+# Timeout for per-peer close drain (flush writer, tear down reader)
+# before force-cancellation.
+close_peer_timeout = "2 s"
+
+# Wall-clock bound on a single stream.shutdown() / ws.close() in the
+# safe-shutdown sequence of the TLS-family transports.
+close_grace = "2 s"
+
+# Optional listen addresses for the TLS-family transports. Comment out
+# to keep the plane unbound.
+# tcp_tls_listen_addr = "0.0.0.0:9443"
+# wss_listen_addr = "0.0.0.0:9444"


Reply via email to