This is an automated email from the ASF dual-hosted git repository.
hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a4b6a8db5 feat(server,mcp): add systemd watchdog integration (#3233)
a4b6a8db5 is described below
commit a4b6a8db58a9e2c0054c3d88150d34035ab91fed
Author: Jonathan Davies <[email protected]>
AuthorDate: Wed May 13 10:20:12 2026 +0100
feat(server,mcp): add systemd watchdog integration (#3233)
---
Cargo.lock | 12 +++++
Cargo.toml | 1 +
core/ai/mcp/Cargo.toml | 5 ++
core/ai/mcp/README.md | 11 +++++
core/ai/mcp/src/main.rs | 21 ++++++++
core/ai/mcp/src/systemd.rs | 57 ++++++++++++++++++++++
core/server/Cargo.toml | 2 +
core/server/README.md | 11 +++++
core/server/src/shard/mod.rs | 21 +++++++-
core/server/src/shard/systemd.rs | 47 ++++++++++++++++++
.../src/shard/tasks/oneshot/config_writer.rs | 3 ++
core/server/src/shard/tasks/periodic/mod.rs | 4 ++
.../src/shard/tasks/periodic/systemd_watchdog.rs | 47 ++++++++++++++++++
13 files changed, 241 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 81b5eb856..ae6952824 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6480,6 +6480,7 @@ dependencies = [
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
"rmcp",
+ "sd-notify",
"serde",
"serde_json",
"socket2 0.6.3",
@@ -6487,6 +6488,7 @@ dependencies = [
"tempfile",
"thiserror 2.0.18",
"tokio",
+ "tokio-util",
"tower-http",
"tracing",
"tracing-opentelemetry",
@@ -10905,6 +10907,15 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+[[package]]
+name = "sd-notify"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e4ef7359e694bfaf1dd27a30f9d760b54c00dfae9f19bd0c05a39bc9128fe76"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "sdd"
version = "3.0.10"
@@ -11284,6 +11295,7 @@ dependencies = [
"rust-embed",
"rustls",
"rustls-pemfile",
+ "sd-notify",
"secrecy",
"send_wrapper",
"serde",
diff --git a/Cargo.toml b/Cargo.toml
index 88e92574f..5181d0b1a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -252,6 +252,7 @@ rust-s3 = { version = "0.37.2", default-features = false,
features = ["tokio-rus
rustls = { version = "0.23.40", features = ["ring"] }
rustls-pemfile = "2.2.0"
scopeguard = "1.2.0"
+sd-notify = "0.5"
secrecy = { version = "0.10", features = ["serde"] }
send_wrapper = "0.6.0"
serde = { version = "1.0.228", features = ["derive", "rc"] }
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 9fdad8eb8..e5b2e6cd8 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -27,6 +27,9 @@ keywords = ["iggy", "messaging", "streaming", "mcp"]
readme = "README.md"
publish = false
+[features]
+systemd = ["dep:sd-notify", "dep:tokio-util"]
+
[dependencies]
axum = { workspace = true }
axum-server = { workspace = true }
@@ -47,12 +50,14 @@ rmcp = { workspace = true, features = [
"transport-io",
"transport-streamable-http-server",
] }
+sd-notify = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
socket2 = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
+tokio-util = { workspace = true, optional = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
diff --git a/core/ai/mcp/README.md b/core/ai/mcp/README.md
index 6e45d2f69..97d580a58 100644
--- a/core/ai/mcp/README.md
+++ b/core/ai/mcp/README.md
@@ -70,6 +70,17 @@ Here's the example configuration to be used with Claude
Desktop:

+## Systemd integration
+
+Build with the `systemd` feature to enable systemd readiness and watchdog
notifications:
+
+```sh
+cargo build --bin iggy-mcp --release --features iggy-mcp/systemd
+```
+
+The MCP server behaves the same way the Iggy server does under systemd. See
+[Systemd integration](../../server/README.md#systemd-integration) for details.
+
## Telemetry
The MCP server supports OpenTelemetry for logs and traces. To enable
telemetry, add the following configuration:
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index bc2af46e4..802476585 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -34,6 +34,8 @@ mod error;
mod log;
mod service;
mod stream;
+#[cfg(feature = "systemd")]
+mod systemd;
const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -89,6 +91,9 @@ async fn main() -> Result<(), McpRuntimeError> {
delete: config.permissions.delete,
};
+ #[cfg(feature = "systemd")]
+ let watchdog_cancel = tokio_util::sync::CancellationToken::new();
+
if transport == McpTransport::Stdio {
let Ok(service) = IggyService::new(iggy_client, iggy_consumer,
permissions)
.serve(stdio())
@@ -101,11 +106,21 @@ async fn main() -> Result<(), McpRuntimeError> {
return Err(McpRuntimeError::FailedToCreateService);
};
+ #[cfg(feature = "systemd")]
+ systemd::notify_ready();
+ #[cfg(feature = "systemd")]
+ systemd::spawn_watchdog(watchdog_cancel.clone());
+
if let Err(error) = service.waiting().await {
error!("Waiting for service error. {error}");
}
} else {
api::init(config.http, iggy_client, iggy_consumer, permissions).await?;
+
+ #[cfg(feature = "systemd")]
+ systemd::notify_ready();
+ #[cfg(feature = "systemd")]
+ systemd::spawn_watchdog(watchdog_cancel.clone());
}
#[cfg(unix)]
@@ -127,6 +142,12 @@ async fn main() -> Result<(), McpRuntimeError> {
}
}
+ #[cfg(feature = "systemd")]
+ {
+ watchdog_cancel.cancel();
+ systemd::notify_stopping();
+ }
+
client_to_shutdown.shutdown().await?;
info!("Iggy MCP Server stopped successfully");
Ok(())
diff --git a/core/ai/mcp/src/systemd.rs b/core/ai/mcp/src/systemd.rs
new file mode 100644
index 000000000..3f6575818
--- /dev/null
+++ b/core/ai/mcp/src/systemd.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use tokio_util::sync::CancellationToken;
+use tracing::{info, warn};
+
+pub fn notify_ready() {
+ if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Ready]) {
+ warn!("Failed to send systemd READY=1 notification: {e}");
+ }
+}
+
+pub fn notify_stopping() {
+ let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]);
+}
+
+/// Spawn the watchdog keep-alive task. It stops cooperatively when `cancel`
+/// fires (driven by the SIGINT/SIGTERM handler in `main`).
+pub fn spawn_watchdog(cancel: CancellationToken) {
+ let Some(timeout) = sd_notify::watchdog_enabled() else {
+ return;
+ };
+
+ let interval = timeout / 2;
+ info!(
+ "Systemd watchdog enabled, pinging every {}s (timeout: {}s).",
+ interval.as_secs(),
+ timeout.as_secs()
+ );
+
+ tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = cancel.cancelled() => break,
+ _ = tokio::time::sleep(interval) => {
+ if let Err(e) =
sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
+ warn!("Failed to send systemd watchdog ping: {e}");
+ }
+ }
+ }
+ }
+ });
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 656b5480f..0615a7963 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -37,6 +37,7 @@ default = ["mimalloc", "iggy-web"]
disable-mimalloc = []
mimalloc = ["dep:mimalloc"]
iggy-web = ["dep:rust-embed", "dep:mime_guess"]
+systemd = ["dep:sd-notify"]
[dependencies]
ahash = { workspace = true }
@@ -86,6 +87,7 @@ rolling-file = { workspace = true }
rust-embed = { workspace = true, optional = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
+sd-notify = { workspace = true, optional = true }
secrecy = { workspace = true }
send_wrapper = { workspace = true }
serde = { workspace = true }
diff --git a/core/server/README.md b/core/server/README.md
index a57c62401..fd4f8cea8 100644
--- a/core/server/README.md
+++ b/core/server/README.md
@@ -4,6 +4,17 @@ This is the core server component of Apache Iggy. You can run
it directly with `
The configuration file is located at
[core/server/config.toml](https://github.com/apache/iggy/blob/master/core/server/config.toml).
You can customize the server settings by modifying this file or by using
environment variables e.g. `IGGY_TCP_ADDRESS=0.0.0.0:8090`.
+## Systemd integration
+
+Build with the `systemd` feature to enable systemd readiness and watchdog
notifications:
+
+```sh
+cargo build --bin iggy-server --release --features systemd
+```
+
+The server will notify systemd when it is ready and then periodically send
+watchdog messages at half the configured `WatchdogSec` interval for the unit.
+


diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 794473c61..95f766a57 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -59,6 +59,9 @@ pub mod task_registry;
pub mod tasks;
pub mod transmission;
+#[cfg(feature = "systemd")]
+pub mod systemd;
+
mod communication;
pub use communication::calculate_shard_assignment;
@@ -175,6 +178,11 @@ impl IggyShard {
if !self.config.system.logging.sysinfo_print_interval.is_zero() &&
self.id == 0 {
periodic::spawn_sysinfo_printer(self.clone());
}
+
+ #[cfg(feature = "systemd")]
+ if self.id == 0 {
+ periodic::spawn_systemd_watchdog(self.clone());
+ }
}
pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> {
@@ -194,7 +202,18 @@ impl IggyShard {
// Spawn shutdown handler
compio::runtime::spawn(async move {
let _ = stop_receiver.recv().await;
- shard_for_shutdown.trigger_shutdown().await;
+ #[cfg(feature = "systemd")]
+ if shard_for_shutdown.id == 0 {
+ systemd::notify_stopping();
+ }
+ let drained = shard_for_shutdown.trigger_shutdown().await;
+ #[cfg(feature = "systemd")]
+ if shard_for_shutdown.id == 0 && !drained {
+ warn!("Graceful shutdown timed out; some tasks did not drain
in time");
+ systemd::notify_status("graceful shutdown timed out");
+ }
+ #[cfg(not(feature = "systemd"))]
+ let _ = drained;
let _ = shutdown_complete_tx.send(()).await;
})
.detach();
diff --git a/core/server/src/shard/systemd.rs b/core/server/src/shard/systemd.rs
new file mode 100644
index 000000000..2f4b7aab9
--- /dev/null
+++ b/core/server/src/shard/systemd.rs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Thin wrappers around `sd_notify` so every systemd interaction on the server
+//! side lives in one place (mirrors `core/ai/mcp/src/systemd.rs`).
+
+use tracing::warn;
+
+/// Tell systemd the service has finished start-up (`READY=1`).
+pub fn notify_ready() {
+ if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Ready]) {
+ warn!("Failed to send systemd READY=1 notification: {e}");
+ }
+}
+
+/// Tell systemd the service has begun shutting down (`STOPPING=1`).
+pub fn notify_stopping() {
+ let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]);
+}
+
+/// Surface a non-fatal shutdown problem in `systemctl status` / journald.
+pub fn notify_status(status: &str) {
+ let _ = sd_notify::notify(&[sd_notify::NotifyState::Status(status)]);
+}
+
+/// Send a single watchdog keep-alive ping (`WATCHDOG=1`).
+pub fn ping_watchdog() {
+ if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
+ warn!("Failed to send systemd watchdog ping: {e}");
+ }
+}
diff --git a/core/server/src/shard/tasks/oneshot/config_writer.rs
b/core/server/src/shard/tasks/oneshot/config_writer.rs
index 3bee6cc6d..c48a7c8ef 100644
--- a/core/server/src/shard/tasks/oneshot/config_writer.rs
+++ b/core/server/src/shard/tasks/oneshot/config_writer.rs
@@ -73,6 +73,9 @@ async fn write_config(
}
}
+ #[cfg(feature = "systemd")]
+ crate::shard::systemd::notify_ready();
+
let mut current_config = shard_clone.config.clone();
let tcp_addr = shard_clone.tcp_bound_address.get();
diff --git a/core/server/src/shard/tasks/periodic/mod.rs
b/core/server/src/shard/tasks/periodic/mod.rs
index 92e2f14a7..e1010ed42 100644
--- a/core/server/src/shard/tasks/periodic/mod.rs
+++ b/core/server/src/shard/tasks/periodic/mod.rs
@@ -23,6 +23,8 @@ mod message_saver;
mod personal_access_token_cleaner;
mod revocation_timeout;
mod sysinfo_printer;
+#[cfg(feature = "systemd")]
+mod systemd_watchdog;
pub use heartbeat_verifier::spawn_heartbeat_verifier;
pub use jwt_token_cleaner::spawn_jwt_token_cleaner;
@@ -31,3 +33,5 @@ pub use message_saver::spawn_message_saver;
pub use personal_access_token_cleaner::spawn_personal_access_token_cleaner;
pub use revocation_timeout::spawn_revocation_timeout_checker;
pub use sysinfo_printer::spawn_sysinfo_printer;
+#[cfg(feature = "systemd")]
+pub use systemd_watchdog::spawn_systemd_watchdog;
diff --git a/core/server/src/shard/tasks/periodic/systemd_watchdog.rs
b/core/server/src/shard/tasks/periodic/systemd_watchdog.rs
new file mode 100644
index 000000000..29aea68da
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/systemd_watchdog.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::shard::IggyShard;
+use crate::shard::systemd;
+use iggy_common::IggyError;
+use std::rc::Rc;
+use tracing::info;
+
+pub fn spawn_systemd_watchdog(shard: Rc<IggyShard>) {
+ let Some(timeout) = sd_notify::watchdog_enabled() else {
+ return;
+ };
+
+ let interval = timeout / 2;
+ info!(
+ "Systemd watchdog enabled, pinging every {}s (timeout: {}s).",
+ interval.as_secs(),
+ timeout.as_secs()
+ );
+
+ shard
+ .task_registry
+ .periodic("systemd_watchdog")
+ .every(interval)
+ .tick(move |_shutdown| ping_watchdog())
+ .spawn();
+}
+
+async fn ping_watchdog() -> Result<(), IggyError> {
+ systemd::ping_watchdog();
+ Ok(())
+}