Thanks also for this patch and for working through / including most of
my previous v1 suggestions.
Please see my comments inline.
On 2/13/26 10:47 AM, Kefu Chai wrote:
Add RRD (Round-Robin Database) file persistence system:
- RrdWriter: Main API for RRD operations
- Schema definitions for CPU, memory, network metrics
- Format migration support (v1/v2/v3)
- rrdcached integration for batched writes
- Data transformation for legacy formats
This is an independent crate with no internal dependencies,
only requiring external RRD libraries (rrd, rrdcached-client)
and tokio for async operations. It handles time-series data
storage compatible with the C implementation.
Includes comprehensive unit tests for data transformation,
schema generation, and multi-source data processing.
Signed-off-by: Kefu Chai <[email protected]>
---
src/pmxcfs-rs/Cargo.toml | 12 +
src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml | 23 +
src/pmxcfs-rs/pmxcfs-rrd/README.md | 119 ++++
src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs | 62 ++
.../pmxcfs-rrd/src/backend/backend_daemon.rs | 184 ++++++
.../pmxcfs-rrd/src/backend/backend_direct.rs | 586 ++++++++++++++++++
.../src/backend/backend_fallback.rs | 212 +++++++
src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs | 140 +++++
This file doesn't seem to be included (there is no mod definition).
Please remove if not needed.
src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs | 408 ++++++++++++
src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs | 23 +
src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs | 124 ++++
.../pmxcfs-rrd/src/rrdcached/LICENSE | 21 +
.../pmxcfs-rrd/src/rrdcached/client.rs | 208 +++++++
.../src/rrdcached/consolidation_function.rs | 30 +
.../pmxcfs-rrd/src/rrdcached/create.rs | 410 ++++++++++++
.../pmxcfs-rrd/src/rrdcached/errors.rs | 29 +
src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs | 45 ++
src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs | 18 +
.../pmxcfs-rrd/src/rrdcached/parsers.rs | 65 ++
.../pmxcfs-rrd/src/rrdcached/sanitisation.rs | 100 +++
src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs | 577 +++++++++++++++++
src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs | 582 +++++++++++++++++
22 files changed, 3978 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
create mode 100644
src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index d26fac04c..2457fe368 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -4,6 +4,7 @@ members = [
"pmxcfs-api-types", # Shared types and error definitions
"pmxcfs-config", # Configuration management
"pmxcfs-logger", # Cluster log with ring buffer and deduplication
+ "pmxcfs-rrd", # RRD (Round-Robin Database) persistence
]
resolver = "2"
@@ -20,16 +21,27 @@ rust-version = "1.85"
pmxcfs-api-types = { path = "pmxcfs-api-types" }
pmxcfs-config = { path = "pmxcfs-config" }
pmxcfs-logger = { path = "pmxcfs-logger" }
+pmxcfs-rrd = { path = "pmxcfs-rrd" }
+
+# Core async runtime
+tokio = { version = "1.35", features = ["full"] }
# Error handling
+anyhow = "1.0"
thiserror = "1.0"
+# Logging and tracing
+tracing = "0.1"
+
# Concurrency primitives
parking_lot = "0.12"
# System integration
libc = "0.2"
+# Development dependencies
+tempfile = "3.8"
+
[workspace.lints.clippy]
uninlined_format_args = "warn"
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
new file mode 100644
index 000000000..33c87ec91
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "pmxcfs-rrd"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+
+[features]
+default = ["rrdcached"]
+rrdcached = []
+
+[dependencies]
+anyhow.workspace = true
+async-trait = "0.1"
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+nom = "8.0"
Do we actually need this extra dependency?
It seems like we only use it for basic string operations.
+rrd = "0.2"
+thiserror = "2.0"
In the workspace there is already thiserror = "1.0".
Please align accordingly.
+tokio.workspace = true
+tracing.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md
b/src/pmxcfs-rs/pmxcfs-rrd/README.md
new file mode 100644
index 000000000..d6f6ad9b1
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md
@@ -0,0 +1,119 @@
+# pmxcfs-rrd
+
+RRD (Round-Robin Database) persistence for pmxcfs performance metrics.
+
+## Overview
+
+This crate provides RRD file management for storing time-series performance
data from Proxmox nodes and VMs. It handles file creation, updates, and
integration with rrdcached daemon for efficient writes.
+
+### Key Features
+
+- RRD file creation with schema-based initialization
+- RRD updates (write metrics to disk)
+- rrdcached integration for batched writes
+- Support for both legacy and current schema versions (v1/v2/v3)
+- Type-safe key parsing and validation
+- Compatible with existing C-created RRD files
+
+## Usage Flow
+
+The typical data flow through this crate:
+
+1. **Metrics Collection**: pmxcfs-status collects performance metrics (CPU,
memory, network, etc.)
+2. **Key Generation**: Metrics are organized by key type (node, VM, storage)
+3. **Schema Selection**: Appropriate RRD schema is selected based on key type
and version
+4. **Data Transformation**: Legacy data (v1/v2) is transformed to current
format (v3) if needed
+5. **Backend Selection**:
+ - **Daemon backend**: Preferred for performance, batches writes via
rrdcached
+ - **Direct backend**: Fallback using librrd directly when daemon unavailable
+ - **Fallback backend**: Tries daemon first, falls back to direct on failure
+6. **File Operations**: Create RRD files if needed, update with new data points
+
+### Data Transformation
+
+The crate handles migration between schema versions:
+- **v1 → v2**: Adds additional data sources for extended metrics
+- **v2 → v3**: Consolidates and optimizes data sources
+- **Transform logic**: `schema.rs:transform_data()` handles conversion,
skipping incompatible entries
+
+### Backend Differences
+
+- **Daemon Backend** (`backend_daemon.rs`):
+ - Uses vendored rrdcached client for async communication
+ - Batches multiple updates for efficiency
+ - Requires rrdcached daemon running
+ - Best for high-frequency updates
And:
The C code tries rrdc_update() on every call, only falling back to
rrd_update_r() for that individual call if it fails, it doesn't
permanently disable the daemon path. So this is a difference too and
should be documented, or fixed.
+
+- **Direct Backend** (`backend_direct.rs`):
+ - Uses rrd crate (librrd FFI bindings) directly
+ - Synchronous file operations
+ - No external daemon required
+ - Reliable fallback option
+
+- **Fallback Backend** (`backend_fallback.rs`):
+ - Composite pattern: tries daemon, falls back to direct
+ - Matches C implementation behavior
+ - Provides best of both worlds
+
+## Module Structure
+
+| Module | Purpose |
+|--------|---------|
+| `writer.rs` | Main RrdWriter API - high-level interface for RRD operations |
+| `schema.rs` | RRD schema definitions (DS, RRA) and data transformation logic
|
+| `key_type.rs` | RRD key parsing, validation, and path sanitization |
+| `daemon.rs` | rrdcached daemon client wrapper |
+| `backend.rs` | Backend trait and implementations (daemon/direct/fallback) |
+| `rrdcached/` | Vendored rrdcached client implementation (adapted from
rrdcached-client v0.1.5) |
+
+## Usage Example
+
+```rust
+use pmxcfs_rrd::{RrdWriter, RrdFallbackBackend};
RrdFallbackBackend is not exported from lib.rs.
Also the signature below doesn't match the current code.
Please verify.
+
+// Create writer with fallback backend
+let backend = RrdFallbackBackend::new("/var/run/rrdcached.sock").await?;
+let writer = RrdWriter::new(backend);
+
+// Update node CPU metrics
+writer.update(
+ "pve/nodes/node1/cpu",
+ &[0.45, 0.52, 0.38, 0.61], // CPU usage values
+ None, // Use current timestamp
+).await?;
+
+// Create new RRD file for VM
+writer.create(
+ "pve/qemu/100/cpu",
+ 1704067200, // Start timestamp
+).await?;
+```
+
+## External Dependencies
+
+- **rrd crate**: Provides Rust bindings to librrd (RRDtool C library)
+- **rrdcached client**: Vendored and adapted from rrdcached-client v0.1.5
(Apache-2.0 license)
+ - Original source: https://github.com/SINTEF/rrdcached-client
+ - Vendored to gain full control and adapt to our specific needs
+ - Can be disabled via the `rrdcached` feature flag
+
+## Testing
+
+Unit tests verify:
+- Schema generation and validation
+- Key parsing for different RRD types (node, VM, storage)
+- RRD file creation and update operations
+- rrdcached client connection and fallback behavior
+
+Run tests with:
+```bash
+cargo test -p pmxcfs-rrd
+```
+
+## References
+
+- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded)
+- **Related Crates**:
+ - `pmxcfs-status` - Uses RrdWriter for metrics persistence
+ - `pmxcfs` - FUSE `.rrd` plugin reads RRD files
+- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
new file mode 100644
index 000000000..2fa4fa39d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
@@ -0,0 +1,62 @@
+/// RRD Backend Trait and Implementations
+///
+/// This module provides an abstraction over different RRD writing mechanisms:
+/// - Daemon-based (via rrdcached) for performance and batching
+/// - Direct file writing for reliability and fallback scenarios
+/// - Fallback composite that tries daemon first, then falls back to direct
+///
+/// This design matches the C implementation's behavior in status.c where
+/// it attempts daemon update first, then falls back to direct file writes.
+use super::schema::RrdSchema;
+use anyhow::Result;
+use async_trait::async_trait;
+use std::path::Path;
+
+/// Constants for RRD configuration
+pub const DEFAULT_SOCKET_PATH: &str = "/var/run/rrdcached.sock";
+pub const RRD_STEP_SECONDS: u64 = 60;
+
+/// Trait for RRD backend implementations
+///
+/// Provides abstraction over different RRD writing mechanisms.
+/// All methods are async to support both async (daemon) and sync (direct
file) operations.
+#[async_trait]
+pub trait RrdBackend: Send + Sync {
+ /// Update RRD file with new data
+ ///
+ /// # Arguments
+ /// * `file_path` - Full path to the RRD file
+ /// * `data` - Update data in format "timestamp:value1:value2:..."
+ async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>;
+
+ /// Create new RRD file with schema
+ ///
+ /// # Arguments
+ /// * `file_path` - Full path where RRD file should be created
+ /// * `schema` - RRD schema defining data sources and archives
+ /// * `start_timestamp` - Start time for the RRD file (Unix timestamp)
+ async fn create(
+ &mut self,
+ file_path: &Path,
+ schema: &RrdSchema,
+ start_timestamp: i64,
+ ) -> Result<()>;
+
+ /// Flush pending updates to disk
+ ///
+ /// For daemon backends, this sends a FLUSH command.
+ /// For direct backends, this is a no-op (writes are immediate).
+ async fn flush(&mut self) -> Result<()>;
+
+ /// Get a human-readable name for this backend
+ fn name(&self) -> &str;
+}
+
+// Backend implementations
+mod backend_daemon;
The rrdcached module is conditional, but the daemon backend is always
included. Please feature gate this too.
+mod backend_direct;
+mod backend_fallback;
+
+pub use backend_daemon::RrdCachedBackend;
Also this should be gated, no?
And similarly please gate the daemon usage in backend_fallback.rs and
writer.rs where the fallback backend tries to connect to the daemon.
+pub use backend_direct::RrdDirectBackend;
+pub use backend_fallback::RrdFallbackBackend;
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
[..]
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
new file mode 100644
index 000000000..fabe7e669
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
@@ -0,0 +1,408 @@
+/// RRD Key Type Parsing and Path Resolution
+///
+/// This module handles parsing RRD status update keys and mapping them
+/// to the appropriate file paths and schemas.
+use super::schema::{RrdFormat, RrdSchema};
+use anyhow::{Context, Result};
+use std::path::{Path, PathBuf};
+
+/// Metric type for determining column skipping rules
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum MetricType {
+ Node,
+ Vm,
+ Storage,
+}
+
+impl MetricType {
+ /// Number of non-archivable columns to skip from the start of the data
string
+ ///
+ /// The data from pvestatd has non-archivable fields at the beginning:
+ /// - Node: skip 2 (uptime, sublevel) - then ctime:loadavg:maxcpu:...
+ /// - VM: skip 4 (uptime, name, status, template) - then
ctime:maxcpu:cpu:...
+ /// - Storage: skip 0 - data starts with ctime:total:used
+ ///
+ /// C implementation: status.c:1300 (node skip=2), status.c:1335 (VM
skip=4)
+ pub fn skip_columns(self) -> usize {
+ match self {
+ MetricType::Node => 2,
+ MetricType::Vm => 4,
+ MetricType::Storage => 0,
+ }
+ }
+
+ /// Get column count for a specific RRD format
+ #[allow(dead_code)]
+ pub fn column_count(self, format: RrdFormat) -> usize {
+ match (format, self) {
+ (RrdFormat::Pve2, MetricType::Node) => 12,
+ (RrdFormat::Pve9_0, MetricType::Node) => 19,
+ (RrdFormat::Pve2, MetricType::Vm) => 10,
+ (RrdFormat::Pve9_0, MetricType::Vm) => 17,
+ (_, MetricType::Storage) => 2, // Same for both formats
+ }
+ }
+}
+
+/// RRD key types for routing to correct schema and path
+///
+/// This enum represents the different types of RRD metrics that pmxcfs tracks:
+/// - Node metrics (CPU, memory, network for a node)
+/// - VM metrics (CPU, memory, disk, network for a VM/CT)
+/// - Storage metrics (total/used space for a storage)
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) enum RrdKeyType {
+ /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename}
+ Node { nodename: String, format: RrdFormat },
+ /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid}
+ Vm { vmid: String, format: RrdFormat },
+ /// Storage metrics: pve2-storage/{node}/{storage} or
pve-storage-9.0/{node}/{storage}
+ Storage {
+ nodename: String,
+ storage: String,
+ format: RrdFormat,
+ },
+}
+
+impl RrdKeyType {
+ /// Parse RRD key from status update key
+ ///
+ /// Supported formats:
+ /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 }
+ /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 }
+ /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 }
+ /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage:
"local", format: Pve9_0 }
+ ///
+ /// # Security
+ ///
+ /// Path components are validated to prevent directory traversal attacks:
+ /// - Rejects paths containing ".."
+ /// - Rejects absolute paths
+ /// - Rejects paths with special characters that could be exploited
+ pub(crate) fn parse(key: &str) -> Result<Self> {
+ let parts: Vec<&str> = key.split('/').collect();
+
+ if parts.is_empty() {
+ anyhow::bail!("Empty RRD key");
+ }
+
+ // Validate all path components for security
+ for part in &parts[1..] {
+ Self::validate_path_component(part)?;
+ }
+
+ match parts[0] {
+ "pve2-node" => {
+ let nodename = parts.get(1).context("Missing
nodename")?.to_string();
+ Ok(RrdKeyType::Node {
+ nodename,
+ format: RrdFormat::Pve2,
+ })
+ }
+ prefix if prefix.starts_with("pve-node-") => {
+ let nodename = parts.get(1).context("Missing
nodename")?.to_string();
+ Ok(RrdKeyType::Node {
+ nodename,
+ format: RrdFormat::Pve9_0,
"pve-node-9.0" matches, but so does "pve-node-9.1", "pve-node-10.0" all
treated as Pve9_0
I think we maybe parse the suffix and match exactly?
+ })
+ }
+ "pve2.3-vm" => {
+ let vmid = parts.get(1).context("Missing vmid")?.to_string();
+ Ok(RrdKeyType::Vm {
+ vmid,
+ format: RrdFormat::Pve2,
+ })
+ }
+ prefix if prefix.starts_with("pve-vm-") => {
+ let vmid = parts.get(1).context("Missing vmid")?.to_string();
+ Ok(RrdKeyType::Vm {
+ vmid,
+ format: RrdFormat::Pve9_0,
+ })
+ }
+ "pve2-storage" => {
+ let nodename = parts.get(1).context("Missing
nodename")?.to_string();
+ let storage = parts.get(2).context("Missing
storage")?.to_string();
+ Ok(RrdKeyType::Storage {
+ nodename,
+ storage,
+ format: RrdFormat::Pve2,
+ })
+ }
+ prefix if prefix.starts_with("pve-storage-") => {
+ let nodename = parts.get(1).context("Missing
nodename")?.to_string();
+ let storage = parts.get(2).context("Missing
storage")?.to_string();
+ Ok(RrdKeyType::Storage {
+ nodename,
+ storage,
+ format: RrdFormat::Pve9_0,
+ })
+ }
+ _ => anyhow::bail!("Unknown RRD key format: {key}"),
+ }
+ }
+
+ /// Validate a path component for security
+ ///
+ /// Prevents directory traversal attacks by rejecting:
+ /// - ".." (parent directory)
+ /// - Absolute paths (starting with "/")
+ /// - Empty components
+ /// - Components with null bytes or other dangerous characters
+ fn validate_path_component(component: &str) -> Result<()> {
+ if component.is_empty() {
+ anyhow::bail!("Empty path component");
+ }
+
+ if component == ".." {
+ anyhow::bail!("Path traversal attempt: '..' not allowed");
+ }
+
+ if component.starts_with('/') {
+ anyhow::bail!("Absolute paths not allowed");
+ }
+
+ if component.contains('\0') {
+ anyhow::bail!("Null byte in path component");
+ }
+
+ // Reject other potentially dangerous characters
+ if component.contains(['\\', '\n', '\r']) {
+ anyhow::bail!("Invalid characters in path component");
+ }
+
+ Ok(())
+ }
+
+ /// Get the RRD file path for this key type
+ ///
+ /// Always returns paths using the current format (9.0), regardless of the
input format.
+ /// This enables transparent format migration: old PVE8 nodes can send
`pve2-node/` keys,
+ /// and they'll be written to `pve-node-9.0/` files automatically.
+ ///
+ /// # Format Migration Strategy
+ ///
+ /// Returns the file path for this RRD key (without .rrd extension)
+ ///
+ /// The C implementation always creates files in the current format
directory
+ /// (see status.c:1287). This Rust implementation follows the same
approach:
+ /// - Input: `pve2-node/node1` → Output:
`/var/lib/rrdcached/db/pve-node-9.0/node1`
+ /// - Input: `pve-node-9.0/node1` → Output:
`/var/lib/rrdcached/db/pve-node-9.0/node1`
+ ///
+ /// This allows rolling upgrades where old and new nodes coexist in the
same cluster.
+ ///
+ /// Note: The path does NOT include .rrd extension, matching C
implementation.
+ /// The librrd functions (rrd_create_r, rrdc_update) add .rrd internally.
+ pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf {
+ match self {
+ RrdKeyType::Node { nodename, .. } => {
+ // Always use current format path
+ base_dir.join("pve-node-9.0").join(nodename)
+ }
+ RrdKeyType::Vm { vmid, .. } => {
+ // Always use current format path
+ base_dir.join("pve-vm-9.0").join(vmid)
+ }
+ RrdKeyType::Storage {
+ nodename, storage, ..
+ } => {
+ // Always use current format path
+ base_dir
+ .join("pve-storage-9.0")
+ .join(nodename)
+ .join(storage)
+ }
+ }
+ }
+
+ /// Get the source format from the input key
+ ///
+ /// This is used for data transformation (padding/truncation).
+ pub(crate) fn source_format(&self) -> RrdFormat {
+ match self {
+ RrdKeyType::Node { format, .. }
+ | RrdKeyType::Vm { format, .. }
+ | RrdKeyType::Storage { format, .. } => *format,
+ }
+ }
+
+ /// Get the target RRD schema (always current format)
+ ///
+ /// Files are always created using the current format (Pve9_0),
+ /// regardless of the source format in the key.
+ pub(crate) fn schema(&self) -> RrdSchema {
+ match self {
+ RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0),
+ RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0),
+ RrdKeyType::Storage { .. } =>
RrdSchema::storage(RrdFormat::Pve9_0),
+ }
+ }
+
+ /// Get the metric type for this key
+ pub(crate) fn metric_type(&self) -> MetricType {
+ match self {
+ RrdKeyType::Node { .. } => MetricType::Node,
+ RrdKeyType::Vm { .. } => MetricType::Vm,
+ RrdKeyType::Storage { .. } => MetricType::Storage,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
[..]
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
new file mode 100644
index 000000000..8da6b633d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
@@ -0,0 +1,100 @@
+use super::errors::RRDCachedClientError;
+
+pub fn check_data_source_name(name: &str) -> Result<(), RRDCachedClientError> {
+ if name.is_empty() || name.len() > 64 {
+ return Err(RRDCachedClientError::InvalidDataSourceName(
+ "name must be between 1 and 64 characters".to_string(),
+ ));
+ }
+ if !name
+ .chars()
+ .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
+ {
+ return Err(RRDCachedClientError::InvalidDataSourceName(
+ "name must only contain alphanumeric characters and
underscores".to_string(),
+ ));
+ }
+ Ok(())
+}
+
+pub fn check_rrd_path(name: &str) -> Result<(), RRDCachedClientError> {
+ if name.is_empty() || name.len() > 64 {
+ return Err(RRDCachedClientError::InvalidCreateDataSerie(
+ "name must be between 1 and 64 characters".to_string(),
+ ));
+ }
+ if !name
+ .chars()
+ .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
This rejects "/" and ".", but we pass full system paths to it. Also
please check the path length limitation above.
+ {
+ return Err(RRDCachedClientError::InvalidCreateDataSerie(
+ "name must only contain alphanumeric characters and
underscores".to_string(),
+ ));
+ }
+ Ok(())
+}
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
[..]
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
new file mode 100644
index 000000000..6c48940be
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
@@ -0,0 +1,582 @@
+/// RRD File Writer
+///
+/// Handles creating and updating RRD files via pluggable backends.
+/// Supports daemon-based (rrdcached) and direct file writing modes.
+use super::backend::{DEFAULT_SOCKET_PATH, RrdFallbackBackend};
+use super::key_type::{MetricType, RrdKeyType};
+use super::schema::{RrdFormat, RrdSchema};
+use anyhow::{Context, Result};
+use chrono::Local;
+use std::fs;
+use std::path::{Path, PathBuf};
+
+
+/// RRD writer for persistent metric storage
+///
+/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations.
+pub struct RrdWriter {
+ /// Base directory for RRD files (default: /var/lib/rrdcached/db)
+ base_dir: PathBuf,
+ /// Backend for RRD operations (daemon, direct, or fallback)
+ backend: Box<dyn super::backend::RrdBackend>,
+}
+
+impl RrdWriter {
+ /// Create new RRD writer with default fallback backend
+ ///
+ /// Uses the fallback backend that tries daemon first, then falls back to
direct file writes.
+ /// This matches the C implementation's behavior.
+ ///
+ /// # Arguments
+ /// * `base_dir` - Base directory for RRD files
+ pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
+ let backend = Self::default_backend().await?;
+ Self::with_backend(base_dir, backend).await
+ }
+
+ /// Create new RRD writer with specific backend
+ ///
+ /// # Arguments
+ /// * `base_dir` - Base directory for RRD files
+ /// * `backend` - RRD backend to use (daemon, direct, or fallback)
+ pub(crate) async fn with_backend<P: AsRef<Path>>(
+ base_dir: P,
+ backend: Box<dyn super::backend::RrdBackend>,
+ ) -> Result<Self> {
+ let base_dir = base_dir.as_ref().to_path_buf();
+
+ // Create base directory if it doesn't exist
+ fs::create_dir_all(&base_dir)
+ .with_context(|| format!("Failed to create RRD base directory:
{base_dir:?}"))?;
+
+ tracing::info!("RRD writer using backend: {}", backend.name());
+
+ Ok(Self { base_dir, backend })
+ }
+
+ /// Create default backend (fallback: daemon + direct)
+ ///
+ /// This matches the C implementation's behavior:
+ /// - Tries rrdcached daemon first for performance
+ /// - Falls back to direct file writes if daemon fails
+ async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> {
+ let backend = RrdFallbackBackend::new(DEFAULT_SOCKET_PATH).await;
+ Ok(Box::new(backend))
+ }
+
+ /// Update RRD file with metric data
+ ///
+ /// This will:
+ /// 1. Transform data from source format to target format
(padding/truncation/column skipping)
+ /// 2. Create the RRD file if it doesn't exist
+ /// 3. Update via rrdcached daemon
+ ///
+ /// # Arguments
+ /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100")
+ /// * `data` - Raw metric data string from pvestatd (format:
"skipped_fields...:ctime:val1:val2:...")
+ pub async fn update(&mut self, key: &str, data: &str) -> Result<()> {
+ // Parse the key to determine file path and schema
+ let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD
key: {key}"))?;
+
+ // Get source format and target schema
+ let source_format = key_type.source_format();
+ let target_schema = key_type.schema();
+ let metric_type = key_type.metric_type();
+
+ // Transform data from source to target format
+ let transformed_data =
+ Self::transform_data(data, source_format, &target_schema,
metric_type)
+ .with_context(|| format!("Failed to transform RRD data for key:
{key}"))?;
+
+ // Get the file path (always uses current format)
+ let file_path = key_type.file_path(&self.base_dir);
+
+ // Ensure the RRD file exists
+ // Always check file existence directly - handles file
deletion/rotation
+ if !file_path.exists() {
+ self.create_rrd_file(&key_type, &file_path).await?;
The on-disk naming convention for .rrd is inconsistent across the crate
and I think this can break the logic here.
file_path() in key_type.rs is documented as returning paths without
.rrd, and that's what this existence check runs against. But the vendored
rrdcached client in rrdcached/client.rs and rrdcached/create.rs
appends .rrd when building the update and create commands, so the
daemon backend creates files at path.rrd. Meanwhile the direct backend
tests in backend_direct.rs also construct paths with .rrd explicitly.
Can we please pin down whether the rrd crate's create() / update_all()
auto append .rrd or not then make one consistent decision and
align file_path(), the existence check, the vendored client and the
direct backend tests to the same convention?
+ }
+
+ // Update the RRD file via backend
+ self.backend.update(&file_path, &transformed_data).await?;
+
+ Ok(())
+ }
+
+ /// Create RRD file with appropriate schema via backend
+ async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) ->
Result<()> {
+ // Ensure parent directory exists
+ if let Some(parent) = file_path.parent() {
+ fs::create_dir_all(parent)
+ .with_context(|| format!("Failed to create directory:
{parent:?}"))?;
+ }
+
+ // Get schema for this RRD type
+ let schema = key_type.schema();
+
+ // Calculate start time (at day boundary, matching C implementation)
+ // C uses localtime() (status.c:1206-1219), not UTC
+ let now = Local::now();
+ let start = now
+ .date_naive()
+ .and_hms_opt(0, 0, 0)
+ .expect("00:00:00 is always a valid time")
+ .and_local_timezone(Local)
+ .single()
This might return None and would panic in that case.
Maybe earliest() would help here?
+ .expect("Local midnight should have single timezone mapping");
+ let start_timestamp = start.timestamp();
+
+ tracing::debug!(
+ "Creating RRD file: {:?} with {} data sources via {}",
+ file_path,
+ schema.column_count(),
+ self.backend.name()
+ );
+
+ // Delegate to backend for creation
+ self.backend
+ .create(file_path, &schema, start_timestamp)
+ .await?;
+
+ tracing::info!("Created RRD file: {:?} ({})", file_path, schema);
+
+ Ok(())
+ }
++#[cfg(test)]
+mod tests {
+ use super::super::schema::{RrdFormat, RrdSchema};
+ use super::*;
+
+ #[test]
+ fn test_rrd_file_path_generation() {
+ let temp_dir = std::path::PathBuf::from("/tmp/test");
+
+ let key_node = RrdKeyType::Node {
+ nodename: "testnode".to_string(),
+ format: RrdFormat::Pve9_0,
+ };
+ let path = key_node.file_path(&temp_dir);
+ assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode"));
+ }
+
+ // ===== Format Adaptation Tests =====
+
+ #[test]
+ fn test_transform_data_node_pve2_to_pve9() {
+ // Test padding old format (12 archivable cols) to new format
(19 archivable cols)
+ // pvestatd data format for node:
"uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout"
+ // = 2 non-archivable + 1 timestamp + 12 archivable = 15 fields
+ let data =
"1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Node).unwrap();
+
+ // After skip(2): "1234567890:1.5:4:2.0:0.5:...:500000" = 13 fields
+ // Pad to 20 total (timestamp + 19 values): 13 + 7 "U" = 20
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890", "Timestamp should be
preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+ assert_eq!(parts[1], "1.5", "First value after skip should be
loadavg");
+ assert_eq!(parts[2], "4", "Second value should be maxcpu");
+ assert_eq!(parts[12], "500000", "Last data value should be
netout");
+
+ // Check padding (7 columns: 19 - 12 = 7)
+ for (i, item) in parts.iter().enumerate().take(20).skip(13) {
+ assert_eq!(item, &"U", "Column {} should be padded with U", i);
+ }
+ }
+
+ #[test]
+ fn test_transform_data_vm_pve2_to_pve9() {
+ // Test VM transformation with 4 columns skipped
+ // pvestatd data format for VM:
"uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite"
+ // = 4 non-archivable + 1 timestamp + 10 archivable = 15 fields
+ let data =
"1000:myvm:1:0:1234567890:4:2:4096:2048:100000:50000:1000:500:100:50";
+
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Vm).unwrap();
+
+ // After skip(4): "1234567890:4:2:4096:...:50" = 11 fields
+ // Pad to 18 total (timestamp + 17 values): 11 + 7 "U" = 18
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890");
+ assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+ assert_eq!(parts[1], "4", "First value after skip should be
maxcpu");
+ assert_eq!(parts[10], "50", "Last data value should be diskwrite");
+
+ // Check padding (7 columns: 17 - 10 = 7)
+ for (i, item) in parts.iter().enumerate().take(18).skip(11) {
+ assert_eq!(item, &"U", "Column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_transform_data_no_padding_needed() {
+ // Test when source and target have same column count (Pve9_0
node: 19 archivable cols)
+ // pvestatd format:
"uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout:memavail:arcsize:cpu_some:io_some:io_full:mem_some:mem_full"
+ // = 2 non-archivable + 1 timestamp + 19 archivable = 22 fields
+ let data =
"1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0.12:0.05:0.02:0.08:0.03";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema,
MetricType::Node).unwrap();
+
+ // After skip(2): 20 fields = timestamp + 19 values (exact
match, no padding)
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+ assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+ assert_eq!(parts[1], "1.5", "First value after skip should be
loadavg");
+ assert_eq!(parts[19], "0.03", "Last value should be mem_full
(no padding)");
+ }
+
+ #[test]
+ fn test_transform_data_future_format_truncation() {
+ // Test truncation when a future format sends more columns than
current pve9.0
+ // Simulating: uptime:sublevel:ctime:1:2:3:...:25 (2 skipped +
timestamp + 25 archivable = 28 fields)
+ let data =
+
"999:0:1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema,
MetricType::Node).unwrap();
+
+ // After skip(2): "1234567890:1:2:...:25" = 26 fields
+ // take(20): truncate to timestamp + 19 values
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19
values");
+ assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+ assert_eq!(parts[1], "1", "First archivable value");
+ assert_eq!(parts[19], "19", "Last value should be column 19
(truncated)");
+ }
+
+ #[test]
+ fn test_transform_data_storage_no_change() {
+ // Storage format is same for Pve2 and Pve9_0 (2 columns, no
skipping)
+ let data = "1234567890:1000000000000:500000000000";
+
+ let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Storage).unwrap();
+
+ assert_eq!(result, data, "Storage data should not be transformed");
+ }
+
+ #[test]
+ fn test_metric_type_methods() {
+ assert_eq!(MetricType::Node.skip_columns(), 2);
+ assert_eq!(MetricType::Vm.skip_columns(), 4);
+ assert_eq!(MetricType::Storage.skip_columns(), 0);
+ }
+
+ #[test]
+ fn test_format_column_counts() {
+ assert_eq!(MetricType::Node.column_count(RrdFormat::Pve2), 12);
+ assert_eq!(MetricType::Node.column_count(RrdFormat::Pve9_0), 19);
+ assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve2), 10);
+ assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve9_0), 17);
+ assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve2), 2);
+ assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve9_0), 2);
+ }
+
+ // ===== Real Payload Fixtures from Production Systems =====
+ //
+ // These tests use actual RRD data captured from running PVE systems
+ // to validate transform_data() correctness against real-world
payloads.
+
+ #[test]
+ fn test_real_payload_node_pve2() {
+ // Real pve2-node payload captured from PVE 6.x system
+ // Format:
uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout
+ let data =
"432156:0:1709123456:0.15:8:3.2:0.8:33554432000:12884901888:8589934592:0:107374182400:53687091200:1234567890:987654321";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Node).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+ // Verify key metrics are preserved
+ assert_eq!(parts[1], "0.15", "Load average preserved");
+ assert_eq!(parts[2], "8", "Max CPU preserved");
+ assert_eq!(parts[3], "3.2", "CPU usage preserved");
+ assert_eq!(parts[4], "0.8", "IO wait preserved");
+
+ // Verify padding for new columns (7 new columns in Pve9_0)
+ for i in 13..20 {
+ assert_eq!(parts[i], "U", "New column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_real_payload_vm_pve2() {
+ // Real pve2.3-vm payload captured from PVE 6.x system
+ // Format:
uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite
+ let data =
"86400:vm-100-disk-0:running:0:1709123456:4:45.3:8589934592:4294967296:107374182400:32212254720:123456789:98765432:1048576:2097152";
+
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Vm).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+
+ // Verify key metrics are preserved
+ assert_eq!(parts[1], "4", "Max CPU preserved");
+ assert_eq!(parts[2], "45.3", "CPU usage preserved");
+ assert_eq!(parts[3], "8589934592", "Max memory preserved");
+ assert_eq!(parts[4], "4294967296", "Memory usage preserved");
+
+ // Verify padding for new columns (7 new columns in Pve9_0)
+ for i in 11..18 {
+ assert_eq!(parts[i], "U", "New column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_real_payload_storage_pve2() {
+ // Real pve2-storage payload captured from PVE 6.x system
+ // Format: ctime:total:used
+ let data = "1709123456:1099511627776:549755813888";
+
+ let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Storage)
+ .unwrap();
+
+ // Storage format unchanged between Pve2 and Pve9_0
+ assert_eq!(result, data, "Storage data should not be transformed");
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts[1], "1099511627776", "Total storage preserved");
+ assert_eq!(parts[2], "549755813888", "Used storage preserved");
+ }
+
+ #[test]
+ fn test_real_payload_node_pve9_0() {
+ // Real pve-node-9.0 payload from PVE 8.x system (already in
target format)
Can we please add real binary fixtures instead?
We would catch more issues using that.
+ // Input has 19 fields, after skip(2) = 17 archivable columns
+ // Schema expects 19 archivable columns, so 2 "U" padding added
+ let data =
"864321:0:1709123456:0.25:16:8.5:1.2:67108864000:25769803776:17179869184:0:214748364800:107374182400:2345678901:1876543210:x86_64:6.5.11:0.3:250";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema,
MetricType::Node)
+ .unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+ // Verify all columns preserved
+ assert_eq!(parts[1], "0.25", "Load average preserved");
+ assert_eq!(parts[13], "x86_64", "CPU info preserved");
+ assert_eq!(parts[14], "6.5.11", "Kernel version preserved");
+ assert_eq!(parts[15], "0.3", "Wait time preserved");
+ assert_eq!(parts[16], "250", "Process count preserved");
+
+ // Last 3 columns are padding (input had 17 archivable, schema
expects 19)
+ assert_eq!(parts[17], "U", "Padding column 1");
+ assert_eq!(parts[18], "U", "Padding column 2");
+ assert_eq!(parts[19], "U", "Padding column 3");
+ }
+
+ #[test]
+ fn test_real_payload_with_missing_values() {
+ // Real payload with some missing values (represented as "U")
+ // This can happen when metrics are temporarily unavailable
+ let data =
"432156:0:1709123456:0.15:8:U:0.8:33554432000:12884901888:U:0:107374182400:53687091200:1234567890:987654321";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Node).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+
+ // Verify "U" values are preserved (after skip(2), positions shift)
+ assert_eq!(parts[3], "U", "Missing CPU value preserved as U");
+ assert_eq!(parts[7], "U", "Missing swap total preserved as U");
+ }
[..]