Thanks also for this patch and for working through / including most of
my previous v1 suggestions.

Please see my comments inline.

On 2/13/26 10:47 AM, Kefu Chai wrote:
Add RRD (Round-Robin Database) file persistence system:
- RrdWriter: Main API for RRD operations
- Schema definitions for CPU, memory, network metrics
- Format migration support (v1/v2/v3)
- rrdcached integration for batched writes
- Data transformation for legacy formats

This is an independent crate with no internal dependencies,
only requiring external RRD libraries (rrd, rrdcached-client)
and tokio for async operations. It handles time-series data
storage compatible with the C implementation.

Includes comprehensive unit tests for data transformation,
schema generation, and multi-source data processing.

Signed-off-by: Kefu Chai <[email protected]>
---
  src/pmxcfs-rs/Cargo.toml                      |  12 +
  src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml           |  23 +
  src/pmxcfs-rs/pmxcfs-rrd/README.md            | 119 ++++
  src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs       |  62 ++
  .../pmxcfs-rrd/src/backend/backend_daemon.rs  | 184 ++++++
  .../pmxcfs-rrd/src/backend/backend_direct.rs  | 586 ++++++++++++++++++
  .../src/backend/backend_fallback.rs           | 212 +++++++
  src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs        | 140 +++++

This file doesn't seem to be included (there is no mod definition).
Please remove if not needed.

  src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs      | 408 ++++++++++++
  src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs           |  23 +
  src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs         | 124 ++++
  .../pmxcfs-rrd/src/rrdcached/LICENSE          |  21 +
  .../pmxcfs-rrd/src/rrdcached/client.rs        | 208 +++++++
  .../src/rrdcached/consolidation_function.rs   |  30 +
  .../pmxcfs-rrd/src/rrdcached/create.rs        | 410 ++++++++++++
  .../pmxcfs-rrd/src/rrdcached/errors.rs        |  29 +
  src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs |  45 ++
  src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs |  18 +
  .../pmxcfs-rrd/src/rrdcached/parsers.rs       |  65 ++
  .../pmxcfs-rrd/src/rrdcached/sanitisation.rs  | 100 +++
  src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs        | 577 +++++++++++++++++
  src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs        | 582 +++++++++++++++++
  22 files changed, 3978 insertions(+)
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
  create mode 100644 
src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
  create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index d26fac04c..2457fe368 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -4,6 +4,7 @@ members = [
      "pmxcfs-api-types",  # Shared types and error definitions
      "pmxcfs-config",     # Configuration management
      "pmxcfs-logger",     # Cluster log with ring buffer and deduplication
+    "pmxcfs-rrd",        # RRD (Round-Robin Database) persistence
  ]
  resolver = "2"
@@ -20,16 +21,27 @@ rust-version = "1.85"
  pmxcfs-api-types = { path = "pmxcfs-api-types" }
  pmxcfs-config = { path = "pmxcfs-config" }
  pmxcfs-logger = { path = "pmxcfs-logger" }
+pmxcfs-rrd = { path = "pmxcfs-rrd" }
+
+# Core async runtime
+tokio = { version = "1.35", features = ["full"] }
# Error handling
+anyhow = "1.0"
  thiserror = "1.0"
+# Logging and tracing
+tracing = "0.1"
+
  # Concurrency primitives
  parking_lot = "0.12"
# System integration
  libc = "0.2"
+# Development dependencies
+tempfile = "3.8"
+
  [workspace.lints.clippy]
  uninlined_format_args = "warn"
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
new file mode 100644
index 000000000..33c87ec91
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "pmxcfs-rrd"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+
+[features]
+default = ["rrdcached"]
+rrdcached = []
+
+[dependencies]
+anyhow.workspace = true
+async-trait = "0.1"
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+nom = "8.0"

Do we actually need this extra dependency?
It seems like we only use it for basic string operations.

+rrd = "0.2"
+thiserror = "2.0"

In the workspace there is already thiserror = "1.0".
Please align accordingly.

+tokio.workspace = true
+tracing.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md 
b/src/pmxcfs-rs/pmxcfs-rrd/README.md
new file mode 100644
index 000000000..d6f6ad9b1
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md
@@ -0,0 +1,119 @@
+# pmxcfs-rrd
+
+RRD (Round-Robin Database) persistence for pmxcfs performance metrics.
+
+## Overview
+
+This crate provides RRD file management for storing time-series performance 
data from Proxmox nodes and VMs. It handles file creation, updates, and 
integration with rrdcached daemon for efficient writes.
+
+### Key Features
+
+- RRD file creation with schema-based initialization
+- RRD updates (write metrics to disk)
+- rrdcached integration for batched writes
+- Support for both legacy and current schema versions (v1/v2/v3)
+- Type-safe key parsing and validation
+- Compatible with existing C-created RRD files
+
+## Usage Flow
+
+The typical data flow through this crate:
+
+1. **Metrics Collection**: pmxcfs-status collects performance metrics (CPU, 
memory, network, etc.)
+2. **Key Generation**: Metrics are organized by key type (node, VM, storage)
+3. **Schema Selection**: Appropriate RRD schema is selected based on key type 
and version
+4. **Data Transformation**: Legacy data (v1/v2) is transformed to current 
format (v3) if needed
+5. **Backend Selection**:
+   - **Daemon backend**: Preferred for performance, batches writes via 
rrdcached
+   - **Direct backend**: Fallback using librrd directly when daemon unavailable
+   - **Fallback backend**: Tries daemon first, falls back to direct on failure
+6. **File Operations**: Create RRD files if needed, update with new data points
+
+### Data Transformation
+
+The crate handles migration between schema versions:
+- **v1 → v2**: Adds additional data sources for extended metrics
+- **v2 → v3**: Consolidates and optimizes data sources
+- **Transform logic**: `schema.rs:transform_data()` handles conversion, 
skipping incompatible entries
+
+### Backend Differences
+
+- **Daemon Backend** (`backend_daemon.rs`):
+  - Uses vendored rrdcached client for async communication
+  - Batches multiple updates for efficiency
+  - Requires rrdcached daemon running
+  - Best for high-frequency updates

And:

The C code tries rrdc_update() on every call, only falling back to
rrd_update_r() for that individual call if it fails, it doesn't
permanently disable the daemon path. So this is a difference too and
should be documented, or fixed.

+
+- **Direct Backend** (`backend_direct.rs`):
+  - Uses rrd crate (librrd FFI bindings) directly
+  - Synchronous file operations
+  - No external daemon required
+  - Reliable fallback option
+
+- **Fallback Backend** (`backend_fallback.rs`):
+  - Composite pattern: tries daemon, falls back to direct
+  - Matches C implementation behavior
+  - Provides best of both worlds
+
+## Module Structure
+
+| Module | Purpose |
+|--------|---------|
+| `writer.rs` | Main RrdWriter API - high-level interface for RRD operations |
+| `schema.rs` | RRD schema definitions (DS, RRA) and data transformation logic 
|
+| `key_type.rs` | RRD key parsing, validation, and path sanitization |
+| `daemon.rs` | rrdcached daemon client wrapper |
+| `backend.rs` | Backend trait and implementations (daemon/direct/fallback) |
+| `rrdcached/` | Vendored rrdcached client implementation (adapted from 
rrdcached-client v0.1.5) |
+
+## Usage Example
+
+```rust
+use pmxcfs_rrd::{RrdWriter, RrdFallbackBackend};

RrdFallbackBackend is not exported from lib.rs.
Also the signature below doesn't match the current code.
Please verify.

+
+// Create writer with fallback backend
+let backend = RrdFallbackBackend::new("/var/run/rrdcached.sock").await?;
+let writer = RrdWriter::new(backend);
+
+// Update node CPU metrics
+writer.update(
+    "pve/nodes/node1/cpu",
+    &[0.45, 0.52, 0.38, 0.61], // CPU usage values
+    None, // Use current timestamp
+).await?;
+
+// Create new RRD file for VM
+writer.create(
+    "pve/qemu/100/cpu",
+    1704067200, // Start timestamp
+).await?;
+```
+
+## External Dependencies
+
+- **rrd crate**: Provides Rust bindings to librrd (RRDtool C library)
+- **rrdcached client**: Vendored and adapted from rrdcached-client v0.1.5 
(Apache-2.0 license)
+  - Original source: https://github.com/SINTEF/rrdcached-client
+  - Vendored to gain full control and adapt to our specific needs
+  - Can be disabled via the `rrdcached` feature flag
+
+## Testing
+
+Unit tests verify:
+- Schema generation and validation
+- Key parsing for different RRD types (node, VM, storage)
+- RRD file creation and update operations
+- rrdcached client connection and fallback behavior
+
+Run tests with:
+```bash
+cargo test -p pmxcfs-rrd
+```
+
+## References
+
+- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded)
+- **Related Crates**:
+  - `pmxcfs-status` - Uses RrdWriter for metrics persistence
+  - `pmxcfs` - FUSE `.rrd` plugin reads RRD files
+- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
new file mode 100644
index 000000000..2fa4fa39d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
@@ -0,0 +1,62 @@
+/// RRD Backend Trait and Implementations
+///
+/// This module provides an abstraction over different RRD writing mechanisms:
+/// - Daemon-based (via rrdcached) for performance and batching
+/// - Direct file writing for reliability and fallback scenarios
+/// - Fallback composite that tries daemon first, then falls back to direct
+///
+/// This design matches the C implementation's behavior in status.c where
+/// it attempts daemon update first, then falls back to direct file writes.
+use super::schema::RrdSchema;
+use anyhow::Result;
+use async_trait::async_trait;
+use std::path::Path;
+
+/// Constants for RRD configuration
+pub const DEFAULT_SOCKET_PATH: &str = "/var/run/rrdcached.sock";
+pub const RRD_STEP_SECONDS: u64 = 60;
+
+/// Trait for RRD backend implementations
+///
+/// Provides abstraction over different RRD writing mechanisms.
+/// All methods are async to support both async (daemon) and sync (direct 
file) operations.
+#[async_trait]
+pub trait RrdBackend: Send + Sync {
+    /// Update RRD file with new data
+    ///
+    /// # Arguments
+    /// * `file_path` - Full path to the RRD file
+    /// * `data` - Update data in format "timestamp:value1:value2:..."
+    async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>;
+
+    /// Create new RRD file with schema
+    ///
+    /// # Arguments
+    /// * `file_path` - Full path where RRD file should be created
+    /// * `schema` - RRD schema defining data sources and archives
+    /// * `start_timestamp` - Start time for the RRD file (Unix timestamp)
+    async fn create(
+        &mut self,
+        file_path: &Path,
+        schema: &RrdSchema,
+        start_timestamp: i64,
+    ) -> Result<()>;
+
+    /// Flush pending updates to disk
+    ///
+    /// For daemon backends, this sends a FLUSH command.
+    /// For direct backends, this is a no-op (writes are immediate).
+    async fn flush(&mut self) -> Result<()>;
+
+    /// Get a human-readable name for this backend
+    fn name(&self) -> &str;
+}
+
+// Backend implementations
+mod backend_daemon;

The rrdcached module is conditional, but the daemon backend is always included. Please feature gate this too.

+mod backend_direct;
+mod backend_fallback;
+
+pub use backend_daemon::RrdCachedBackend;

Also this should be gated, no?

And similarly please gate the daemon usage in backend_fallback.rs and writer.rs where the fallback backend tries to connect to the daemon.

+pub use backend_direct::RrdDirectBackend;
+pub use backend_fallback::RrdFallbackBackend;
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs

[..]

+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
new file mode 100644
index 000000000..fabe7e669
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
@@ -0,0 +1,408 @@
+/// RRD Key Type Parsing and Path Resolution
+///
+/// This module handles parsing RRD status update keys and mapping them
+/// to the appropriate file paths and schemas.
+use super::schema::{RrdFormat, RrdSchema};
+use anyhow::{Context, Result};
+use std::path::{Path, PathBuf};
+
+/// Metric type for determining column skipping rules
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum MetricType {
+    Node,
+    Vm,
+    Storage,
+}
+
+impl MetricType {
+    /// Number of non-archivable columns to skip from the start of the data 
string
+    ///
+    /// The data from pvestatd has non-archivable fields at the beginning:
+    /// - Node: skip 2 (uptime, sublevel) - then ctime:loadavg:maxcpu:...
+    /// - VM: skip 4 (uptime, name, status, template) - then 
ctime:maxcpu:cpu:...
+    /// - Storage: skip 0 - data starts with ctime:total:used
+    ///
+    /// C implementation: status.c:1300 (node skip=2), status.c:1335 (VM 
skip=4)
+    pub fn skip_columns(self) -> usize {
+        match self {
+            MetricType::Node => 2,
+            MetricType::Vm => 4,
+            MetricType::Storage => 0,
+        }
+    }
+
+    /// Get column count for a specific RRD format
+    #[allow(dead_code)]
+    pub fn column_count(self, format: RrdFormat) -> usize {
+        match (format, self) {
+            (RrdFormat::Pve2, MetricType::Node) => 12,
+            (RrdFormat::Pve9_0, MetricType::Node) => 19,
+            (RrdFormat::Pve2, MetricType::Vm) => 10,
+            (RrdFormat::Pve9_0, MetricType::Vm) => 17,
+            (_, MetricType::Storage) => 2, // Same for both formats
+        }
+    }
+}
+
+/// RRD key types for routing to correct schema and path
+///
+/// This enum represents the different types of RRD metrics that pmxcfs tracks:
+/// - Node metrics (CPU, memory, network for a node)
+/// - VM metrics (CPU, memory, disk, network for a VM/CT)
+/// - Storage metrics (total/used space for a storage)
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) enum RrdKeyType {
+    /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename}
+    Node { nodename: String, format: RrdFormat },
+    /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid}
+    Vm { vmid: String, format: RrdFormat },
+    /// Storage metrics: pve2-storage/{node}/{storage} or 
pve-storage-9.0/{node}/{storage}
+    Storage {
+        nodename: String,
+        storage: String,
+        format: RrdFormat,
+    },
+}
+
+impl RrdKeyType {
+    /// Parse RRD key from status update key
+    ///
+    /// Supported formats:
+    /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 }
+    /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 }
+    /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 }
+    /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: 
"local", format: Pve9_0 }
+    ///
+    /// # Security
+    ///
+    /// Path components are validated to prevent directory traversal attacks:
+    /// - Rejects paths containing ".."
+    /// - Rejects absolute paths
+    /// - Rejects paths with special characters that could be exploited
+    pub(crate) fn parse(key: &str) -> Result<Self> {
+        let parts: Vec<&str> = key.split('/').collect();
+
+        if parts.is_empty() {
+            anyhow::bail!("Empty RRD key");
+        }
+
+        // Validate all path components for security
+        for part in &parts[1..] {
+            Self::validate_path_component(part)?;
+        }
+
+        match parts[0] {
+            "pve2-node" => {
+                let nodename = parts.get(1).context("Missing 
nodename")?.to_string();
+                Ok(RrdKeyType::Node {
+                    nodename,
+                    format: RrdFormat::Pve2,
+                })
+            }
+            prefix if prefix.starts_with("pve-node-") => {
+                let nodename = parts.get(1).context("Missing 
nodename")?.to_string();
+                Ok(RrdKeyType::Node {
+                    nodename,
+                    format: RrdFormat::Pve9_0,

"pve-node-9.0" matches, but so does "pve-node-9.1", "pve-node-10.0" all treated as Pve9_0

I think we maybe parse the suffix and match exactly?

+                })
+            }
+            "pve2.3-vm" => {
+                let vmid = parts.get(1).context("Missing vmid")?.to_string();
+                Ok(RrdKeyType::Vm {
+                    vmid,
+                    format: RrdFormat::Pve2,
+                })
+            }
+            prefix if prefix.starts_with("pve-vm-") => {
+                let vmid = parts.get(1).context("Missing vmid")?.to_string();
+                Ok(RrdKeyType::Vm {
+                    vmid,
+                    format: RrdFormat::Pve9_0,
+                })
+            }
+            "pve2-storage" => {
+                let nodename = parts.get(1).context("Missing 
nodename")?.to_string();
+                let storage = parts.get(2).context("Missing 
storage")?.to_string();
+                Ok(RrdKeyType::Storage {
+                    nodename,
+                    storage,
+                    format: RrdFormat::Pve2,
+                })
+            }
+            prefix if prefix.starts_with("pve-storage-") => {
+                let nodename = parts.get(1).context("Missing 
nodename")?.to_string();
+                let storage = parts.get(2).context("Missing 
storage")?.to_string();
+                Ok(RrdKeyType::Storage {
+                    nodename,
+                    storage,
+                    format: RrdFormat::Pve9_0,
+                })
+            }
+            _ => anyhow::bail!("Unknown RRD key format: {key}"),
+        }
+    }
+
+    /// Validate a path component for security
+    ///
+    /// Prevents directory traversal attacks by rejecting:
+    /// - ".." (parent directory)
+    /// - Absolute paths (starting with "/")
+    /// - Empty components
+    /// - Components with null bytes or other dangerous characters
+    fn validate_path_component(component: &str) -> Result<()> {
+        if component.is_empty() {
+            anyhow::bail!("Empty path component");
+        }
+
+        if component == ".." {
+            anyhow::bail!("Path traversal attempt: '..' not allowed");
+        }
+
+        if component.starts_with('/') {
+            anyhow::bail!("Absolute paths not allowed");
+        }
+
+        if component.contains('\0') {
+            anyhow::bail!("Null byte in path component");
+        }
+
+        // Reject other potentially dangerous characters
+        if component.contains(['\\', '\n', '\r']) {
+            anyhow::bail!("Invalid characters in path component");
+        }
+
+        Ok(())
+    }
+
+    /// Get the RRD file path for this key type
+    ///
+    /// Always returns paths using the current format (9.0), regardless of the 
input format.
+    /// This enables transparent format migration: old PVE8 nodes can send 
`pve2-node/` keys,
+    /// and they'll be written to `pve-node-9.0/` files automatically.
+    ///
+    /// # Format Migration Strategy
+    ///
+    /// Returns the file path for this RRD key (without .rrd extension)
+    ///
+    /// The C implementation always creates files in the current format 
directory
+    /// (see status.c:1287). This Rust implementation follows the same 
approach:
+    /// - Input: `pve2-node/node1` → Output: 
`/var/lib/rrdcached/db/pve-node-9.0/node1`
+    /// - Input: `pve-node-9.0/node1` → Output: 
`/var/lib/rrdcached/db/pve-node-9.0/node1`
+    ///
+    /// This allows rolling upgrades where old and new nodes coexist in the 
same cluster.
+    ///
+    /// Note: The path does NOT include .rrd extension, matching C 
implementation.
+    /// The librrd functions (rrd_create_r, rrdc_update) add .rrd internally.
+    pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf {
+        match self {
+            RrdKeyType::Node { nodename, .. } => {
+                // Always use current format path
+                base_dir.join("pve-node-9.0").join(nodename)
+            }
+            RrdKeyType::Vm { vmid, .. } => {
+                // Always use current format path
+                base_dir.join("pve-vm-9.0").join(vmid)
+            }
+            RrdKeyType::Storage {
+                nodename, storage, ..
+            } => {
+                // Always use current format path
+                base_dir
+                    .join("pve-storage-9.0")
+                    .join(nodename)
+                    .join(storage)
+            }
+        }
+    }
+
+    /// Get the source format from the input key
+    ///
+    /// This is used for data transformation (padding/truncation).
+    pub(crate) fn source_format(&self) -> RrdFormat {
+        match self {
+            RrdKeyType::Node { format, .. }
+            | RrdKeyType::Vm { format, .. }
+            | RrdKeyType::Storage { format, .. } => *format,
+        }
+    }
+
+    /// Get the target RRD schema (always current format)
+    ///
+    /// Files are always created using the current format (Pve9_0),
+    /// regardless of the source format in the key.
+    pub(crate) fn schema(&self) -> RrdSchema {
+        match self {
+            RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0),
+            RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0),
+            RrdKeyType::Storage { .. } => 
RrdSchema::storage(RrdFormat::Pve9_0),
+        }
+    }
+
+    /// Get the metric type for this key
+    pub(crate) fn metric_type(&self) -> MetricType {
+        match self {
+            RrdKeyType::Node { .. } => MetricType::Node,
+            RrdKeyType::Vm { .. } => MetricType::Vm,
+            RrdKeyType::Storage { .. } => MetricType::Storage,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {

[..]

+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
new file mode 100644
index 000000000..8da6b633d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
@@ -0,0 +1,100 @@
+use super::errors::RRDCachedClientError;
+
+pub fn check_data_source_name(name: &str) -> Result<(), RRDCachedClientError> {
+    if name.is_empty() || name.len() > 64 {
+        return Err(RRDCachedClientError::InvalidDataSourceName(
+            "name must be between 1 and 64 characters".to_string(),
+        ));
+    }
+    if !name
+        .chars()
+        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
+    {
+        return Err(RRDCachedClientError::InvalidDataSourceName(
+            "name must only contain alphanumeric characters and 
underscores".to_string(),
+        ));
+    }
+    Ok(())
+}
+
+pub fn check_rrd_path(name: &str) -> Result<(), RRDCachedClientError> {
+    if name.is_empty() || name.len() > 64 {
+        return Err(RRDCachedClientError::InvalidCreateDataSerie(
+            "name must be between 1 and 64 characters".to_string(),
+        ));
+    }
+    if !name
+        .chars()
+        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')

This rejects "/" and ".", but we pass full system paths to it. Also please check the path length limitation above.

+    {
+        return Err(RRDCachedClientError::InvalidCreateDataSerie(
+            "name must only contain alphanumeric characters and 
underscores".to_string(),
+        ));
+    }
+    Ok(())
+}

[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
[..]

diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs 
b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
new file mode 100644
index 000000000..6c48940be
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
@@ -0,0 +1,582 @@
+/// RRD File Writer
+///
+/// Handles creating and updating RRD files via pluggable backends.
+/// Supports daemon-based (rrdcached) and direct file writing modes.
+use super::backend::{DEFAULT_SOCKET_PATH, RrdFallbackBackend};
+use super::key_type::{MetricType, RrdKeyType};
+use super::schema::{RrdFormat, RrdSchema};
+use anyhow::{Context, Result};
+use chrono::Local;
+use std::fs;
+use std::path::{Path, PathBuf};
+
+
+/// RRD writer for persistent metric storage
+///
+/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations.
+pub struct RrdWriter {
+    /// Base directory for RRD files (default: /var/lib/rrdcached/db)
+    base_dir: PathBuf,
+    /// Backend for RRD operations (daemon, direct, or fallback)
+    backend: Box<dyn super::backend::RrdBackend>,
+}
+
+impl RrdWriter {
+    /// Create new RRD writer with default fallback backend
+    ///
+    /// Uses the fallback backend that tries daemon first, then falls back to 
direct file writes.
+    /// This matches the C implementation's behavior.
+    ///
+    /// # Arguments
+    /// * `base_dir` - Base directory for RRD files
+    pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
+        let backend = Self::default_backend().await?;
+        Self::with_backend(base_dir, backend).await
+    }
+
+    /// Create new RRD writer with specific backend
+    ///
+    /// # Arguments
+    /// * `base_dir` - Base directory for RRD files
+    /// * `backend` - RRD backend to use (daemon, direct, or fallback)
+    pub(crate) async fn with_backend<P: AsRef<Path>>(
+        base_dir: P,
+        backend: Box<dyn super::backend::RrdBackend>,
+    ) -> Result<Self> {
+        let base_dir = base_dir.as_ref().to_path_buf();
+
+        // Create base directory if it doesn't exist
+        fs::create_dir_all(&base_dir)
+            .with_context(|| format!("Failed to create RRD base directory: 
{base_dir:?}"))?;
+
+        tracing::info!("RRD writer using backend: {}", backend.name());
+
+        Ok(Self { base_dir, backend })
+    }
+
+    /// Create default backend (fallback: daemon + direct)
+    ///
+    /// This matches the C implementation's behavior:
+    /// - Tries rrdcached daemon first for performance
+    /// - Falls back to direct file writes if daemon fails
+    async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> {
+        let backend = RrdFallbackBackend::new(DEFAULT_SOCKET_PATH).await;
+        Ok(Box::new(backend))
+    }
+
+    /// Update RRD file with metric data
+    ///
+    /// This will:
+    /// 1. Transform data from source format to target format 
(padding/truncation/column skipping)
+    /// 2. Create the RRD file if it doesn't exist
+    /// 3. Update via rrdcached daemon
+    ///
+    /// # Arguments
+    /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100")
+    /// * `data` - Raw metric data string from pvestatd (format: 
"skipped_fields...:ctime:val1:val2:...")
+    pub async fn update(&mut self, key: &str, data: &str) -> Result<()> {
+        // Parse the key to determine file path and schema
+        let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD 
key: {key}"))?;
+
+        // Get source format and target schema
+        let source_format = key_type.source_format();
+        let target_schema = key_type.schema();
+        let metric_type = key_type.metric_type();
+
+        // Transform data from source to target format
+        let transformed_data =
+            Self::transform_data(data, source_format, &target_schema, 
metric_type)
+                .with_context(|| format!("Failed to transform RRD data for key: 
{key}"))?;
+
+        // Get the file path (always uses current format)
+        let file_path = key_type.file_path(&self.base_dir);
+
+        // Ensure the RRD file exists
+        // Always check file existence directly - handles file 
deletion/rotation
+        if !file_path.exists() {
+            self.create_rrd_file(&key_type, &file_path).await?;

The on-disk naming convention for .rrd is inconsistent across the crate
and I think this can break the logic here.
file_path() in key_type.rs is documented as returning paths without
.rrd, and that's what this existence check runs against. But the vendored
rrdcached client in rrdcached/client.rs and rrdcached/create.rs
appends .rrd when building the update and create commands, so the
daemon backend creates files at path.rrd. Meanwhile the direct backend
tests in backend_direct.rs also construct paths with .rrd explicitly.

Can we please pin down whether the rrd crate's create() / update_all()
auto append .rrd or not then make one consistent decision and
align file_path(), the existence check, the vendored client and the
direct backend tests to the same convention?

+        }
+
+        // Update the RRD file via backend
+        self.backend.update(&file_path, &transformed_data).await?;
+
+        Ok(())
+    }
+
+    /// Create RRD file with appropriate schema via backend
+    async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> 
Result<()> {
+        // Ensure parent directory exists
+        if let Some(parent) = file_path.parent() {
+            fs::create_dir_all(parent)
+                .with_context(|| format!("Failed to create directory: 
{parent:?}"))?;
+        }
+
+        // Get schema for this RRD type
+        let schema = key_type.schema();
+
+        // Calculate start time (at day boundary, matching C implementation)
+        // C uses localtime() (status.c:1206-1219), not UTC
+        let now = Local::now();
+        let start = now
+            .date_naive()
+            .and_hms_opt(0, 0, 0)
+            .expect("00:00:00 is always a valid time")
+            .and_local_timezone(Local)
+            .single()

This might return None and would panic in that case.
Maybe earliest() would help here?

+            .expect("Local midnight should have single timezone mapping");
+        let start_timestamp = start.timestamp();
+
+        tracing::debug!(
+            "Creating RRD file: {:?} with {} data sources via {}",
+            file_path,
+            schema.column_count(),
+            self.backend.name()
+        );
+
+        // Delegate to backend for creation
+        self.backend
+            .create(file_path, &schema, start_timestamp)
+            .await?;
+
+        tracing::info!("Created RRD file: {:?} ({})", file_path, schema);
+
+        Ok(())
+    }
++#[cfg(test)]
+mod tests {
+    use super::super::schema::{RrdFormat, RrdSchema};
+    use super::*;
+
+    #[test]
+    fn test_rrd_file_path_generation() {
+        let temp_dir = std::path::PathBuf::from("/tmp/test");
+
+        let key_node = RrdKeyType::Node {
+            nodename: "testnode".to_string(),
+            format: RrdFormat::Pve9_0,
+        };
+        let path = key_node.file_path(&temp_dir);
+        assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode"));
+    }
+
+    // ===== Format Adaptation Tests =====
+
+    #[test]
+    fn test_transform_data_node_pve2_to_pve9() {
+ // Test padding old format (12 archivable cols) to new format (19 archivable cols) + // pvestatd data format for node: "uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout"
+        // = 2 non-archivable + 1 timestamp + 12 archivable = 15 fields
+ let data = "1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap();
+
+        // After skip(2): "1234567890:1.5:4:2.0:0.5:...:500000" = 13 fields
+        // Pad to 20 total (timestamp + 19 values): 13 + 7 "U" = 20
+        let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890", "Timestamp should be preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+ assert_eq!(parts[1], "1.5", "First value after skip should be loadavg");
+        assert_eq!(parts[2], "4", "Second value should be maxcpu");
+ assert_eq!(parts[12], "500000", "Last data value should be netout");
+
+        // Check padding (7 columns: 19 - 12 = 7)
+        for (i, item) in parts.iter().enumerate().take(20).skip(13) {
+            assert_eq!(item, &"U", "Column {} should be padded with U", i);
+        }
+    }
+
+    #[test]
+    fn test_transform_data_vm_pve2_to_pve9() {
+        // Test VM transformation with 4 columns skipped
+ // pvestatd data format for VM: "uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite"
+        // = 4 non-archivable + 1 timestamp + 10 archivable = 15 fields
+ let data = "1000:myvm:1:0:1234567890:4:2:4096:2048:100000:50000:1000:500:100:50";
+
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap();
+
+        // After skip(4): "1234567890:4:2:4096:...:50" = 11 fields
+        // Pad to 18 total (timestamp + 17 values): 11 + 7 "U" = 18
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1234567890");
+        assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+ assert_eq!(parts[1], "4", "First value after skip should be maxcpu");
+        assert_eq!(parts[10], "50", "Last data value should be diskwrite");
+
+        // Check padding (7 columns: 17 - 10 = 7)
+        for (i, item) in parts.iter().enumerate().take(18).skip(11) {
+            assert_eq!(item, &"U", "Column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_transform_data_no_padding_needed() {
+ // Test when source and target have same column count (Pve9_0 node: 19 archivable cols) + // pvestatd format: "uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout:memavail:arcsize:cpu_some:io_some:io_full:mem_some:mem_full"
+        // = 2 non-archivable + 1 timestamp + 19 archivable = 22 fields
+ let data = "1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0.12:0.05:0.02:0.08:0.03";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap();
+
+ // After skip(2): 20 fields = timestamp + 19 values (exact match, no padding)
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+        assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+ assert_eq!(parts[1], "1.5", "First value after skip should be loadavg"); + assert_eq!(parts[19], "0.03", "Last value should be mem_full (no padding)");
+    }
+
+    #[test]
+    fn test_transform_data_future_format_truncation() {
+ // Test truncation when a future format sends more columns than current pve9.0 + // Simulating: uptime:sublevel:ctime:1:2:3:...:25 (2 skipped + timestamp + 25 archivable = 28 fields)
+        let data =
+ "999:0:1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap();
+
+        // After skip(2): "1234567890:1:2:...:25" = 26 fields
+        // take(20): truncate to timestamp + 19 values
+        let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19 values");
+        assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+        assert_eq!(parts[1], "1", "First archivable value");
+ assert_eq!(parts[19], "19", "Last value should be column 19 (truncated)");
+    }
+
+    #[test]
+    fn test_transform_data_storage_no_change() {
+ // Storage format is same for Pve2 and Pve9_0 (2 columns, no skipping)
+        let data = "1234567890:1000000000000:500000000000";
+
+        let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage).unwrap();
+
+        assert_eq!(result, data, "Storage data should not be transformed");
+    }
+
+    #[test]
+    fn test_metric_type_methods() {
+        assert_eq!(MetricType::Node.skip_columns(), 2);
+        assert_eq!(MetricType::Vm.skip_columns(), 4);
+        assert_eq!(MetricType::Storage.skip_columns(), 0);
+    }
+
+    #[test]
+    fn test_format_column_counts() {
+        assert_eq!(MetricType::Node.column_count(RrdFormat::Pve2), 12);
+        assert_eq!(MetricType::Node.column_count(RrdFormat::Pve9_0), 19);
+        assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve2), 10);
+        assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve9_0), 17);
+        assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve2), 2);
+        assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve9_0), 2);
+    }
+
+    // ===== Real Payload Fixtures from Production Systems =====
+    //
+    // These tests use actual RRD data captured from running PVE systems
+ // to validate transform_data() correctness against real-world payloads.
+
+    #[test]
+    fn test_real_payload_node_pve2() {
+        // Real pve2-node payload captured from PVE 6.x system
+ // Format: uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout + let data = "432156:0:1709123456:0.15:8:3.2:0.8:33554432000:12884901888:8589934592:0:107374182400:53687091200:1234567890:987654321";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+        // Verify key metrics are preserved
+        assert_eq!(parts[1], "0.15", "Load average preserved");
+        assert_eq!(parts[2], "8", "Max CPU preserved");
+        assert_eq!(parts[3], "3.2", "CPU usage preserved");
+        assert_eq!(parts[4], "0.8", "IO wait preserved");
+
+        // Verify padding for new columns (7 new columns in Pve9_0)
+        for i in 13..20 {
+            assert_eq!(parts[i], "U", "New column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_real_payload_vm_pve2() {
+        // Real pve2.3-vm payload captured from PVE 6.x system
+ // Format: uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite + let data = "86400:vm-100-disk-0:running:0:1709123456:4:45.3:8589934592:4294967296:107374182400:32212254720:123456789:98765432:1048576:2097152";
+
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+
+        // Verify key metrics are preserved
+        assert_eq!(parts[1], "4", "Max CPU preserved");
+        assert_eq!(parts[2], "45.3", "CPU usage preserved");
+        assert_eq!(parts[3], "8589934592", "Max memory preserved");
+        assert_eq!(parts[4], "4294967296", "Memory usage preserved");
+
+        // Verify padding for new columns (7 new columns in Pve9_0)
+        for i in 11..18 {
+            assert_eq!(parts[i], "U", "New column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_real_payload_storage_pve2() {
+        // Real pve2-storage payload captured from PVE 6.x system
+        // Format: ctime:total:used
+        let data = "1709123456:1099511627776:549755813888";
+
+        let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage)
+                .unwrap();
+
+        // Storage format unchanged between Pve2 and Pve9_0
+        assert_eq!(result, data, "Storage data should not be transformed");
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts[1], "1099511627776", "Total storage preserved");
+        assert_eq!(parts[2], "549755813888", "Used storage preserved");
+    }
+
+    #[test]
+    fn test_real_payload_node_pve9_0() {
+ // Real pve-node-9.0 payload from PVE 8.x system (already in target format)

Can we please add real binary fixtures instead?
We would catch more issues using that.

+        // Input has 19 fields, after skip(2) = 17 archivable columns
+        // Schema expects 19 archivable columns, so 2 "U" padding added
+ let data = "864321:0:1709123456:0.25:16:8.5:1.2:67108864000:25769803776:17179869184:0:214748364800:107374182400:2345678901:1876543210:x86_64:6.5.11:0.3:250";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node)
+                .unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+        // Verify all columns preserved
+        assert_eq!(parts[1], "0.25", "Load average preserved");
+        assert_eq!(parts[13], "x86_64", "CPU info preserved");
+        assert_eq!(parts[14], "6.5.11", "Kernel version preserved");
+        assert_eq!(parts[15], "0.3", "Wait time preserved");
+        assert_eq!(parts[16], "250", "Process count preserved");
+
+ // Last 3 columns are padding (input had 17 archivable, schema expects 19)
+        assert_eq!(parts[17], "U", "Padding column 1");
+        assert_eq!(parts[18], "U", "Padding column 2");
+        assert_eq!(parts[19], "U", "Padding column 3");
+    }
+
+    #[test]
+    fn test_real_payload_with_missing_values() {
+        // Real payload with some missing values (represented as "U")
+        // This can happen when metrics are temporarily unavailable
+ let data = "432156:0:1709123456:0.15:8:U:0.8:33554432000:12884901888:U:0:107374182400:53687091200:1234567890:987654321";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+
+        // Verify "U" values are preserved (after skip(2), positions shift)
+        assert_eq!(parts[3], "U", "Missing CPU value preserved as U");
+        assert_eq!(parts[7], "U", "Missing swap total preserved as U");
+    }
[..]




Reply via email to