This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a4b6a8db5 feat(server,mcp): add systemd watchdog integration (#3233)
a4b6a8db5 is described below

commit a4b6a8db58a9e2c0054c3d88150d34035ab91fed
Author: Jonathan Davies <[email protected]>
AuthorDate: Wed May 13 10:20:12 2026 +0100

    feat(server,mcp): add systemd watchdog integration (#3233)
---
 Cargo.lock                                         | 12 +++++
 Cargo.toml                                         |  1 +
 core/ai/mcp/Cargo.toml                             |  5 ++
 core/ai/mcp/README.md                              | 11 +++++
 core/ai/mcp/src/main.rs                            | 21 ++++++++
 core/ai/mcp/src/systemd.rs                         | 57 ++++++++++++++++++++++
 core/server/Cargo.toml                             |  2 +
 core/server/README.md                              | 11 +++++
 core/server/src/shard/mod.rs                       | 21 +++++++-
 core/server/src/shard/systemd.rs                   | 47 ++++++++++++++++++
 .../src/shard/tasks/oneshot/config_writer.rs       |  3 ++
 core/server/src/shard/tasks/periodic/mod.rs        |  4 ++
 .../src/shard/tasks/periodic/systemd_watchdog.rs   | 47 ++++++++++++++++++
 13 files changed, 241 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index 81b5eb856..ae6952824 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6480,6 +6480,7 @@ dependencies = [
  "opentelemetry-semantic-conventions",
  "opentelemetry_sdk",
  "rmcp",
+ "sd-notify",
  "serde",
  "serde_json",
  "socket2 0.6.3",
@@ -6487,6 +6488,7 @@ dependencies = [
  "tempfile",
  "thiserror 2.0.18",
  "tokio",
+ "tokio-util",
  "tower-http",
  "tracing",
  "tracing-opentelemetry",
@@ -10905,6 +10907,15 @@ version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
 
+[[package]]
+name = "sd-notify"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3e4ef7359e694bfaf1dd27a30f9d760b54c00dfae9f19bd0c05a39bc9128fe76"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "sdd"
 version = "3.0.10"
@@ -11284,6 +11295,7 @@ dependencies = [
  "rust-embed",
  "rustls",
  "rustls-pemfile",
+ "sd-notify",
  "secrecy",
  "send_wrapper",
  "serde",
diff --git a/Cargo.toml b/Cargo.toml
index 88e92574f..5181d0b1a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -252,6 +252,7 @@ rust-s3 = { version = "0.37.2", default-features = false, 
features = ["tokio-rus
 rustls = { version = "0.23.40", features = ["ring"] }
 rustls-pemfile = "2.2.0"
 scopeguard = "1.2.0"
+sd-notify = "0.5"
 secrecy = { version = "0.10", features = ["serde"] }
 send_wrapper = "0.6.0"
 serde = { version = "1.0.228", features = ["derive", "rc"] }
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 9fdad8eb8..e5b2e6cd8 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -27,6 +27,9 @@ keywords = ["iggy", "messaging", "streaming", "mcp"]
 readme = "README.md"
 publish = false
 
+[features]
+systemd = ["dep:sd-notify", "dep:tokio-util"]
+
 [dependencies]
 axum = { workspace = true }
 axum-server = { workspace = true }
@@ -47,12 +50,14 @@ rmcp = { workspace = true, features = [
     "transport-io",
     "transport-streamable-http-server",
 ] }
+sd-notify = { workspace = true, optional = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 socket2 = { workspace = true }
 strum = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
+tokio-util = { workspace = true, optional = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
 tracing-opentelemetry = { workspace = true }
diff --git a/core/ai/mcp/README.md b/core/ai/mcp/README.md
index 6e45d2f69..97d580a58 100644
--- a/core/ai/mcp/README.md
+++ b/core/ai/mcp/README.md
@@ -70,6 +70,17 @@ Here's the example configuration to be used with Claude 
Desktop:
 
 ![MCP](../../../assets/iggy_mcp_server.png)
 
+## Systemd integration
+
+Build with the `systemd` feature to enable systemd readiness and watchdog 
notifications:
+
+```sh
+cargo build --bin iggy-mcp --release --features iggy-mcp/systemd
+```
+
+The MCP server behaves the same way the Iggy server does under systemd. See
+[Systemd integration](../../server/README.md#systemd-integration) for details.
+
 ## Telemetry
 
 The MCP server supports OpenTelemetry for logs and traces. To enable 
telemetry, add the following configuration:
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index bc2af46e4..802476585 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -34,6 +34,8 @@ mod error;
 mod log;
 mod service;
 mod stream;
+#[cfg(feature = "systemd")]
+mod systemd;
 
 const VERSION: &str = env!("CARGO_PKG_VERSION");
 
@@ -89,6 +91,9 @@ async fn main() -> Result<(), McpRuntimeError> {
         delete: config.permissions.delete,
     };
 
+    #[cfg(feature = "systemd")]
+    let watchdog_cancel = tokio_util::sync::CancellationToken::new();
+
     if transport == McpTransport::Stdio {
         let Ok(service) = IggyService::new(iggy_client, iggy_consumer, 
permissions)
             .serve(stdio())
@@ -101,11 +106,21 @@ async fn main() -> Result<(), McpRuntimeError> {
             return Err(McpRuntimeError::FailedToCreateService);
         };
 
+        #[cfg(feature = "systemd")]
+        systemd::notify_ready();
+        #[cfg(feature = "systemd")]
+        systemd::spawn_watchdog(watchdog_cancel.clone());
+
         if let Err(error) = service.waiting().await {
             error!("Waiting for service error. {error}");
         }
     } else {
         api::init(config.http, iggy_client, iggy_consumer, permissions).await?;
+
+        #[cfg(feature = "systemd")]
+        systemd::notify_ready();
+        #[cfg(feature = "systemd")]
+        systemd::spawn_watchdog(watchdog_cancel.clone());
     }
 
     #[cfg(unix)]
@@ -127,6 +142,12 @@ async fn main() -> Result<(), McpRuntimeError> {
         }
     }
 
+    #[cfg(feature = "systemd")]
+    {
+        watchdog_cancel.cancel();
+        systemd::notify_stopping();
+    }
+
     client_to_shutdown.shutdown().await?;
     info!("Iggy MCP Server stopped successfully");
     Ok(())
diff --git a/core/ai/mcp/src/systemd.rs b/core/ai/mcp/src/systemd.rs
new file mode 100644
index 000000000..3f6575818
--- /dev/null
+++ b/core/ai/mcp/src/systemd.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use tokio_util::sync::CancellationToken;
+use tracing::{info, warn};
+
+pub fn notify_ready() {
+    if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Ready]) {
+        warn!("Failed to send systemd READY=1 notification: {e}");
+    }
+}
+
+pub fn notify_stopping() {
+    let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]);
+}
+
+/// Spawn the watchdog keep-alive task. It stops cooperatively when `cancel`
+/// fires (driven by the SIGINT/SIGTERM handler in `main`).
+pub fn spawn_watchdog(cancel: CancellationToken) {
+    let Some(timeout) = sd_notify::watchdog_enabled() else {
+        return;
+    };
+
+    let interval = timeout / 2;
+    info!(
+        "Systemd watchdog enabled, pinging every {}s (timeout: {}s).",
+        interval.as_secs(),
+        timeout.as_secs()
+    );
+
+    tokio::spawn(async move {
+        loop {
+            tokio::select! {
+                _ = cancel.cancelled() => break,
+                _ = tokio::time::sleep(interval) => {
+                    if let Err(e) = 
sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
+                        warn!("Failed to send systemd watchdog ping: {e}");
+                    }
+                }
+            }
+        }
+    });
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 656b5480f..0615a7963 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -37,6 +37,7 @@ default = ["mimalloc", "iggy-web"]
 disable-mimalloc = []
 mimalloc = ["dep:mimalloc"]
 iggy-web = ["dep:rust-embed", "dep:mime_guess"]
+systemd = ["dep:sd-notify"]
 
 [dependencies]
 ahash = { workspace = true }
@@ -86,6 +87,7 @@ rolling-file = { workspace = true }
 rust-embed = { workspace = true, optional = true }
 rustls = { workspace = true }
 rustls-pemfile = { workspace = true }
+sd-notify = { workspace = true, optional = true }
 secrecy = { workspace = true }
 send_wrapper = { workspace = true }
 serde = { workspace = true }
diff --git a/core/server/README.md b/core/server/README.md
index a57c62401..fd4f8cea8 100644
--- a/core/server/README.md
+++ b/core/server/README.md
@@ -4,6 +4,17 @@ This is the core server component of Apache Iggy. You can run 
it directly with `
 
 The configuration file is located at 
[core/server/config.toml](https://github.com/apache/iggy/blob/master/core/server/config.toml).
 You can customize the server settings by modifying this file or by using 
environment variables e.g. `IGGY_TCP_ADDRESS=0.0.0.0:8090`.
 
+## Systemd integration
+
+Build with the `systemd` feature to enable systemd readiness and watchdog 
notifications:
+
+```sh
+cargo build --bin iggy-server --release --features systemd
+```
+
+The server will notify systemd when it is ready and then periodically send
+watchdog messages at half the configured `WatchdogSec` interval for the unit.
+
 ![Server](../../assets/server.png)
 
 ![Architecture](../../assets/iggy_architecture.png)
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 794473c61..95f766a57 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -59,6 +59,9 @@ pub mod task_registry;
 pub mod tasks;
 pub mod transmission;
 
+#[cfg(feature = "systemd")]
+pub mod systemd;
+
 mod communication;
 
 pub use communication::calculate_shard_assignment;
@@ -175,6 +178,11 @@ impl IggyShard {
         if !self.config.system.logging.sysinfo_print_interval.is_zero() && 
self.id == 0 {
             periodic::spawn_sysinfo_printer(self.clone());
         }
+
+        #[cfg(feature = "systemd")]
+        if self.id == 0 {
+            periodic::spawn_systemd_watchdog(self.clone());
+        }
     }
 
     pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> {
@@ -194,7 +202,18 @@ impl IggyShard {
         // Spawn shutdown handler
         compio::runtime::spawn(async move {
             let _ = stop_receiver.recv().await;
-            shard_for_shutdown.trigger_shutdown().await;
+            #[cfg(feature = "systemd")]
+            if shard_for_shutdown.id == 0 {
+                systemd::notify_stopping();
+            }
+            let drained = shard_for_shutdown.trigger_shutdown().await;
+            #[cfg(feature = "systemd")]
+            if shard_for_shutdown.id == 0 && !drained {
+                warn!("Graceful shutdown timed out; some tasks did not drain 
in time");
+                systemd::notify_status("graceful shutdown timed out");
+            }
+            #[cfg(not(feature = "systemd"))]
+            let _ = drained;
             let _ = shutdown_complete_tx.send(()).await;
         })
         .detach();
diff --git a/core/server/src/shard/systemd.rs b/core/server/src/shard/systemd.rs
new file mode 100644
index 000000000..2f4b7aab9
--- /dev/null
+++ b/core/server/src/shard/systemd.rs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Thin wrappers around `sd_notify` so every systemd interaction on the server
+//! side lives in one place (mirrors `core/ai/mcp/src/systemd.rs`).
+
+use tracing::warn;
+
+/// Tell systemd the service has finished start-up (`READY=1`).
+pub fn notify_ready() {
+    if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Ready]) {
+        warn!("Failed to send systemd READY=1 notification: {e}");
+    }
+}
+
+/// Tell systemd the service has begun shutting down (`STOPPING=1`).
+pub fn notify_stopping() {
+    let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]);
+}
+
+/// Surface a non-fatal shutdown problem in `systemctl status` / journald.
+pub fn notify_status(status: &str) {
+    let _ = sd_notify::notify(&[sd_notify::NotifyState::Status(status)]);
+}
+
+/// Send a single watchdog keep-alive ping (`WATCHDOG=1`).
+pub fn ping_watchdog() {
+    if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
+        warn!("Failed to send systemd watchdog ping: {e}");
+    }
+}
diff --git a/core/server/src/shard/tasks/oneshot/config_writer.rs 
b/core/server/src/shard/tasks/oneshot/config_writer.rs
index 3bee6cc6d..c48a7c8ef 100644
--- a/core/server/src/shard/tasks/oneshot/config_writer.rs
+++ b/core/server/src/shard/tasks/oneshot/config_writer.rs
@@ -73,6 +73,9 @@ async fn write_config(
         }
     }
 
+    #[cfg(feature = "systemd")]
+    crate::shard::systemd::notify_ready();
+
     let mut current_config = shard_clone.config.clone();
 
     let tcp_addr = shard_clone.tcp_bound_address.get();
diff --git a/core/server/src/shard/tasks/periodic/mod.rs 
b/core/server/src/shard/tasks/periodic/mod.rs
index 92e2f14a7..e1010ed42 100644
--- a/core/server/src/shard/tasks/periodic/mod.rs
+++ b/core/server/src/shard/tasks/periodic/mod.rs
@@ -23,6 +23,8 @@ mod message_saver;
 mod personal_access_token_cleaner;
 mod revocation_timeout;
 mod sysinfo_printer;
+#[cfg(feature = "systemd")]
+mod systemd_watchdog;
 
 pub use heartbeat_verifier::spawn_heartbeat_verifier;
 pub use jwt_token_cleaner::spawn_jwt_token_cleaner;
@@ -31,3 +33,5 @@ pub use message_saver::spawn_message_saver;
 pub use personal_access_token_cleaner::spawn_personal_access_token_cleaner;
 pub use revocation_timeout::spawn_revocation_timeout_checker;
 pub use sysinfo_printer::spawn_sysinfo_printer;
+#[cfg(feature = "systemd")]
+pub use systemd_watchdog::spawn_systemd_watchdog;
diff --git a/core/server/src/shard/tasks/periodic/systemd_watchdog.rs 
b/core/server/src/shard/tasks/periodic/systemd_watchdog.rs
new file mode 100644
index 000000000..29aea68da
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/systemd_watchdog.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::shard::IggyShard;
+use crate::shard::systemd;
+use iggy_common::IggyError;
+use std::rc::Rc;
+use tracing::info;
+
+pub fn spawn_systemd_watchdog(shard: Rc<IggyShard>) {
+    let Some(timeout) = sd_notify::watchdog_enabled() else {
+        return;
+    };
+
+    let interval = timeout / 2;
+    info!(
+        "Systemd watchdog enabled, pinging every {}s (timeout: {}s).",
+        interval.as_secs(),
+        timeout.as_secs()
+    );
+
+    shard
+        .task_registry
+        .periodic("systemd_watchdog")
+        .every(interval)
+        .tick(move |_shutdown| ping_watchdog())
+        .spawn();
+}
+
+async fn ping_watchdog() -> Result<(), IggyError> {
+    systemd::ping_watchdog();
+    Ok(())
+}

Reply via email to