This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch feat/message-bus-transports
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit aebf77d0392c36ade03e52e7a16b6cacc873c728
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Apr 24 12:35:37 2026 +0200

    feat(message_bus): IGGY_CLUSTER_SECRET env loader (IGGY-112)
    
    Add auth_config module that resolves the replica-plane 32-byte cluster
    secret from either IGGY_CLUSTER_SECRET (64-char hex literal) or
    IGGY_CLUSTER_SECRET_FILE (path to hex-only file), mutually exclusive.
    
    - SecretLoadError distinguishes Missing / BothSet / WrongLength /
      InvalidHex(offset) / FileRead so boot-time errors point operators at
      the exact failure mode.
    - Pure resolve() is factored out so unit tests exercise every branch
      without touching process env state (which races under cargo test).
    - cluster_token_source() returns Rc<dyn TokenSource> directly so call
      sites such as replica_io::start_on_shard_zero avoid the unsize
      coercion dance.
    - Inline 0-dep hex nibble decoder; no new workspace crate. Whitespace
      is trimmed so file contents with a trailing newline decode cleanly.
    
    The secret intentionally does not live on MessageBusConfig: it has a
    different lifecycle (boot-resolved, no Clone-into-debug-traces) and
    Default is not meaningful. MessageBusConfig and StaticSharedSecret
    docstrings now cross-reference auth_config. Config section in the
    local CLAUDE.md is updated; zeroize-on-drop is a follow-up.
    
    Tests: 10 new unit tests covering happy path (both literal and file),
    whitespace trim, uppercase hex, each error variant, plus an
    end-to-end round trip through cluster_token_source(). Adds tempfile
    as a dev-dep for the filesystem round-trip test. 110 pass total on
    cargo nextest run -p message_bus -p server-ng.
---
 Cargo.lock                          |   1 +
 core/message_bus/Cargo.toml         |   3 +
 core/message_bus/src/auth.rs        |   8 +-
 core/message_bus/src/auth_config.rs | 268 ++++++++++++++++++++++++++++++++++++
 core/message_bus/src/config.rs      |   6 +
 core/message_bus/src/lib.rs         |   1 +
 6 files changed, 284 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2c66fb79c..af920f2d3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7017,6 +7017,7 @@ dependencies = [
  "libc",
  "rand 0.10.1",
  "socket2 0.6.3",
+ "tempfile",
  "thiserror 2.0.18",
  "tracing",
 ]
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index d0487db0f..6fa1ae1f4 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -41,6 +41,9 @@ socket2 = { workspace = true, features = ["all"] }
 thiserror = { workspace = true }
 tracing = { workspace = true }
 
+[dev-dependencies]
+tempfile = { workspace = true }
+
 [lints.clippy]
 enum_glob_use = "deny"
 pedantic = "deny"
diff --git a/core/message_bus/src/auth.rs b/core/message_bus/src/auth.rs
index 6c2353a6b..1205a4578 100644
--- a/core/message_bus/src/auth.rs
+++ b/core/message_bus/src/auth.rs
@@ -156,9 +156,11 @@ pub trait TokenSource {
     fn verify(&self, challenge: &AuthChallenge, tag: &AuthTag) -> Result<(), 
AuthError>;
 }
 
-/// `TokenSource` that holds a single 32-byte cluster-wide secret. All
-/// replicas share one secret on the replica plane; the integration-test
-/// harness instantiates this with a fixed zero-byte secret.
+/// `TokenSource` that holds a single 32-byte cluster-wide secret.
+///
+/// All replicas share one secret on the replica plane; the integration-test
+/// harness instantiates this with a fixed zero-byte secret, and production
+/// bootstraps resolve one via [`crate::auth_config`].
 pub struct StaticSharedSecret {
     secret: [u8; SECRET_SIZE],
 }
diff --git a/core/message_bus/src/auth_config.rs 
b/core/message_bus/src/auth_config.rs
new file mode 100644
index 000000000..af715c6d3
--- /dev/null
+++ b/core/message_bus/src/auth_config.rs
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Boot-time loader for the replica-plane cluster secret.
+//!
+//! Two environment variables drive the loader; exactly one must be set:
+//!
+//! - `IGGY_CLUSTER_SECRET` — 64-character hex-encoded 32-byte secret,
+//!   passed inline. Convenient for `docker-compose` / `systemd`
+//!   `Environment=` but visible in `/proc/$pid/environ`.
+//! - `IGGY_CLUSTER_SECRET_FILE` — path to a file whose contents are the
+//!   64-character hex secret. Leading / trailing whitespace is trimmed.
+//!
+//! Both variants decode to the same `[u8; 32]` used by
+//! [`crate::auth::StaticSharedSecret`]. `StaticSharedSecret::zero()`
+//! remains the integration-test default; production deployments must
+//! choose one of the two env vars above.
+//!
+//! The loader is a one-shot startup event; synchronous `std::fs` is
+//! intentional. Do not call it on the data plane.
+
+use std::env;
+use std::fs;
+use std::rc::Rc;
+
+use thiserror::Error;
+
+use crate::auth::{SECRET_SIZE, StaticSharedSecret, TokenSource};
+
+/// Env var carrying the hex-encoded secret literal.
+pub const CLUSTER_SECRET_ENV: &str = "IGGY_CLUSTER_SECRET";
+
+/// Env var carrying the path to a file containing the hex-encoded secret.
+pub const CLUSTER_SECRET_FILE_ENV: &str = "IGGY_CLUSTER_SECRET_FILE";
+
+/// Length of the hex-encoded form of a [`SECRET_SIZE`]-byte secret.
+pub const HEX_SECRET_LEN: usize = SECRET_SIZE * 2;
+
+/// Failure modes for secret resolution. Surfaced distinctly so operators
+/// get a pointed error at boot rather than a generic "invalid config".
+#[derive(Debug, Error)]
+pub enum SecretLoadError {
+    #[error(
+        "neither {CLUSTER_SECRET_ENV} nor {CLUSTER_SECRET_FILE_ENV} is set; \
+         one is required to boot the replica plane"
+    )]
+    Missing,
+    #[error(
+        "both {CLUSTER_SECRET_ENV} and {CLUSTER_SECRET_FILE_ENV} are set; \
+         exactly one is required"
+    )]
+    BothSet,
+    #[error("secret has {got} hex characters; expected {HEX_SECRET_LEN}")]
+    WrongLength { got: usize },
+    #[error("secret contains non-hex character at byte offset {0}")]
+    InvalidHex(usize),
+    #[error("failed to read secret file {path}")]
+    FileRead {
+        path: String,
+        #[source]
+        source: std::io::Error,
+    },
+}
+
+/// Resolve the 32-byte secret from the process environment.
+///
+/// # Errors
+///
+/// See [`SecretLoadError`].
+pub fn load_cluster_secret() -> Result<[u8; SECRET_SIZE], SecretLoadError> {
+    let literal = env::var(CLUSTER_SECRET_ENV).ok();
+    let file = env::var(CLUSTER_SECRET_FILE_ENV).ok();
+    resolve(literal.as_deref(), file.as_deref())
+}
+
+/// Resolve the secret and wrap it in a reference-counted
+/// [`StaticSharedSecret`]. Call sites that expect
+/// `Rc<dyn TokenSource>` get it via unsize coercion:
+///
+/// ```ignore
+/// let ts: Rc<dyn TokenSource> = auth_config::cluster_token_source()?;
+/// ```
+///
+/// # Errors
+///
+/// See [`SecretLoadError`].
+pub fn cluster_token_source() -> Result<Rc<dyn TokenSource>, SecretLoadError> {
+    let secret = load_cluster_secret()?;
+    Ok(Rc::new(StaticSharedSecret::new(secret)))
+}
+
+/// Pure resolution rule, factored out of [`load_cluster_secret`] so tests
+/// can drive it without touching the shared process environment (env-var
+/// tests race under `cargo test`).
+///
+/// # Errors
+///
+/// See [`SecretLoadError`].
+pub fn resolve(
+    literal: Option<&str>,
+    file_path: Option<&str>,
+) -> Result<[u8; SECRET_SIZE], SecretLoadError> {
+    match (literal, file_path) {
+        (Some(hex), None) => decode_hex_secret(hex.trim()),
+        (None, Some(path)) => {
+            let raw = fs::read_to_string(path).map_err(|source| 
SecretLoadError::FileRead {
+                path: path.to_owned(),
+                source,
+            })?;
+            decode_hex_secret(raw.trim())
+        }
+        (Some(_), Some(_)) => Err(SecretLoadError::BothSet),
+        (None, None) => Err(SecretLoadError::Missing),
+    }
+}
+
+fn decode_hex_secret(input: &str) -> Result<[u8; SECRET_SIZE], 
SecretLoadError> {
+    if input.len() != HEX_SECRET_LEN {
+        return Err(SecretLoadError::WrongLength { got: input.len() });
+    }
+    let bytes = input.as_bytes();
+    let mut out = [0u8; SECRET_SIZE];
+    for (i, chunk) in bytes.chunks_exact(2).enumerate() {
+        let hi = 
decode_hex_nibble(chunk[0]).ok_or(SecretLoadError::InvalidHex(i * 2))?;
+        let lo = 
decode_hex_nibble(chunk[1]).ok_or(SecretLoadError::InvalidHex(i * 2 + 1))?;
+        out[i] = (hi << 4) | lo;
+    }
+    Ok(out)
+}
+
+const fn decode_hex_nibble(c: u8) -> Option<u8> {
+    match c {
+        b'0'..=b'9' => Some(c - b'0'),
+        b'a'..=b'f' => Some(10 + c - b'a'),
+        b'A'..=b'F' => Some(10 + c - b'A'),
+        _ => None,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    const SAMPLE_HEX: &str = 
"000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f";
+    const SAMPLE_BYTES: [u8; SECRET_SIZE] = [
+        0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 
0x0b, 0x0c, 0x0d, 0x0e,
+        0x0f, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 
0x1a, 0x1b, 0x1c, 0x1d,
+        0x1e, 0x1f,
+    ];
+
+    #[test]
+    fn resolve_literal_happy_path() {
+        let got = resolve(Some(SAMPLE_HEX), None).expect("resolve literal");
+        assert_eq!(got, SAMPLE_BYTES);
+    }
+
+    #[test]
+    fn resolve_literal_trims_whitespace() {
+        let padded = format!("  {SAMPLE_HEX}\n");
+        let got = resolve(Some(&padded), None).expect("resolve literal with 
whitespace");
+        assert_eq!(got, SAMPLE_BYTES);
+    }
+
+    #[test]
+    fn resolve_literal_uppercase_ok() {
+        let upper = SAMPLE_HEX.to_uppercase();
+        let got = resolve(Some(&upper), None).expect("resolve uppercase 
literal");
+        assert_eq!(got, SAMPLE_BYTES);
+    }
+
+    #[test]
+    fn neither_set_is_missing() {
+        let err = resolve(None, None).unwrap_err();
+        assert!(matches!(err, SecretLoadError::Missing), "got {err:?}");
+    }
+
+    #[test]
+    fn both_set_is_both_set() {
+        let err = resolve(Some(SAMPLE_HEX), Some("/dev/null")).unwrap_err();
+        assert!(matches!(err, SecretLoadError::BothSet), "got {err:?}");
+    }
+
+    #[test]
+    fn wrong_length_is_wrong_length() {
+        let short = &SAMPLE_HEX[..SAMPLE_HEX.len() - 2];
+        let err = resolve(Some(short), None).unwrap_err();
+        match err {
+            SecretLoadError::WrongLength { got } => assert_eq!(got, 
HEX_SECRET_LEN - 2),
+            other => panic!("unexpected error: {other:?}"),
+        }
+    }
+
+    #[test]
+    fn non_hex_is_invalid_hex() {
+        let mut bad = SAMPLE_HEX.to_owned();
+        // Replace the 5th hex char with a non-hex rune.
+        bad.replace_range(4..5, "Z");
+        let err = resolve(Some(&bad), None).unwrap_err();
+        match err {
+            SecretLoadError::InvalidHex(offset) => assert_eq!(offset, 4),
+            other => panic!("unexpected error: {other:?}"),
+        }
+    }
+
+    #[test]
+    fn missing_file_is_file_read() {
+        // A path that definitely does not exist; /proc/self/ is real but
+        // the basename is a random ULID-shaped string.
+        let bogus = "/proc/self/this-path-will-not-exist-01HZZZ";
+        let err = resolve(None, Some(bogus)).unwrap_err();
+        match err {
+            SecretLoadError::FileRead { path, .. } => assert_eq!(path, bogus),
+            other => panic!("unexpected error: {other:?}"),
+        }
+    }
+
+    #[test]
+    fn file_round_trip_with_tempfile() {
+        use std::io::Write;
+
+        let dir = tempfile::tempdir().expect("tempdir");
+        let path = dir.path().join("secret.hex");
+        let mut f = fs::File::create(&path).expect("create");
+        writeln!(f, "  {SAMPLE_HEX}  ").expect("write");
+        drop(f);
+
+        let got =
+            resolve(None, Some(path.to_str().expect("utf-8 
path"))).expect("resolve from tempfile");
+        assert_eq!(got, SAMPLE_BYTES);
+    }
+
+    #[test]
+    fn cluster_token_source_produces_matching_secret() {
+        // Exercise the Rc<dyn TokenSource> return path end-to-end: resolve,
+        // sign a challenge with the loader's secret, verify with a
+        // hand-built StaticSharedSecret over the same bytes.
+        use crate::auth::{AuthChallenge, TokenSource};
+
+        let source = Rc::new(StaticSharedSecret::new(
+            resolve(Some(SAMPLE_HEX), None).expect("resolve"),
+        )) as Rc<dyn TokenSource>;
+        let reference = StaticSharedSecret::new(SAMPLE_BYTES);
+        let challenge = AuthChallenge {
+            cluster: 1,
+            peer_id: 2,
+            timestamp_ns: 3,
+            release: 4,
+            nonce: 5,
+        };
+        let from_loader = source.sign(&challenge);
+        let from_reference = reference.sign(&challenge);
+        assert_eq!(from_loader, from_reference);
+    }
+}
diff --git a/core/message_bus/src/config.rs b/core/message_bus/src/config.rs
index d99b3fb49..f82114461 100644
--- a/core/message_bus/src/config.rs
+++ b/core/message_bus/src/config.rs
@@ -26,6 +26,12 @@
 //! constructs [`crate::IggyMessageBus`] from `ServerConfig` lands.
 //! Keeping the type in-crate for now avoids churning the configs crate
 //! ahead of that wiring.
+//!
+//! The replica-plane auth secret intentionally does NOT live on this
+//! struct: it has a different lifecycle (boot-time resolution, no
+//! `Clone` in debug traces) and is constructed via
+//! [`crate::auth_config`] at process start and threaded as an
+//! `Rc<dyn TokenSource>` into [`crate::replica_io::start_on_shard_zero`].
 
 use std::time::Duration;
 
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 9d95cc63e..96d87d961 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -71,6 +71,7 @@
 //! under `Documents/silverhand/iggy/message_bus/transport-plan/`.
 
 pub mod auth;
+pub mod auth_config;
 pub mod cache;
 pub mod client_listener;
 pub mod config;

Reply via email to