Signed-off-by: Aaron Lauterer <a.laute...@proxmox.com>
---
 .cargo/config.toml      |   8 +
 .gitignore              |   5 +
 Cargo.toml              |  20 ++
 build.rs                |  29 +++
 src/lib.rs              |   5 +
 src/main.rs             | 504 ++++++++++++++++++++++++++++++++++++++++
 src/parallel_handler.rs | 162 +++++++++++++
 wrapper.h               |   1 +
 8 files changed, 734 insertions(+)
 create mode 100644 .cargo/config.toml
 create mode 100644 .gitignore
 create mode 100644 Cargo.toml
 create mode 100644 build.rs
 create mode 100644 src/lib.rs
 create mode 100644 src/main.rs
 create mode 100644 src/parallel_handler.rs
 create mode 100644 wrapper.h

diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..a439c97
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,8 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
+
+[profile.release]
+debug=true
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7741e63
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+./target
+./build
+
+Cargo.lock
+
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..d3523f3
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox_rrd_migration_8-9"
+version = "0.1.0"
+edition = "2021"
+authors = [
+    "Aaron Lauterer <a.laute...@proxmox.com>",
+    "Proxmox Support Team <supp...@proxmox.com>",
+]
+license = "AGPL-3"
+homepage = "https://www.proxmox.com";
+
+[dependencies]
+anyhow = "1.0.86"
+pico-args = "0.5.0"
+proxmox-async = "0.4"
+crossbeam-channel = "0.5"
+
+[build-dependencies]
+bindgen = "0.66.1"
+pkg-config = "0.3"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..56d07cc
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,29 @@
+use std::env;
+use std::path::PathBuf;
+
+fn main() {
+    println!("cargo:rustc-link-lib=rrd");
+
+    println!("cargo:rerun-if-changed=wrapper.h");
+    // The bindgen::Builder is the main entry point
+    // to bindgen, and lets you build up options for
+    // the resulting bindings.
+
+    let bindings = bindgen::Builder::default()
+        // The input header we would like to generate
+        // bindings for.
+        .header("wrapper.h")
+        // Tell cargo to invalidate the built crate whenever any of the
+        // included header files changed.
+        .parse_callbacks(Box::new(bindgen::CargoCallbacks))
+        // Finish the builder and generate the bindings.
+        .generate()
+        // Unwrap the Result and panic on failure.
+        .expect("Unable to generate bindings");
+
+    // Write the bindings to the $OUT_DIR/bindings.rs file.
+    let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
+    bindings
+        .write_to_file(out_path.join("bindings.rs"))
+        .expect("Couldn't write bindings!");
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..a38a13a
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+#![allow(non_upper_case_globals)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+
+include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..43f181c
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,504 @@
+use anyhow::{bail, Error, Result};
+use proxmox_rrd_migration_8_9::{rrd_clear_error, rrd_create_r2, 
rrd_get_context, rrd_get_error};
+use std::ffi::{CStr, CString, OsString};
+use std::fs;
+use std::os::unix::ffi::OsStrExt;
+use std::os::unix::fs::PermissionsExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use crate::parallel_handler::ParallelHandler;
+
+pub mod parallel_handler;
+
+const BASE_DIR: &str = "/var/lib/rrdcached/db";
+const SOURCE_SUBDIR_NODE: &str = "pve2-node";
+const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
+const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
+const TARGET_SUBDIR_NODE: &str = "pve9-node";
+const TARGET_SUBDIR_GUEST: &str = "pve9-vm";
+const TARGET_SUBDIR_STORAGE: &str = "pve9-storage";
+const MAX_THREADS: usize = 4;
+const RRD_STEP_SIZE: usize = 60;
+
+// RRAs are defined in the following way:
+//
+// RRA:CF:xff:step:rows
+// CF: AVERAGE or MAX
+// xff: 0.5
+// steps: stepsize is defined on rrd file creation! example: with 60 seconds 
step size:
+//     e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
+// rows: how many aggregated rows are kept, as in how far back in time we 
store data
+//
+// how many seconds are aggregated per RRA: steps * stepsize * rows
+// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
+// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
+// 
https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
+
+const RRD_VM_DEF: [&CStr; 25] = [
+    c"DS:maxcpu:GAUGE:120:0:U",
+    c"DS:cpu:GAUGE:120:0:U",
+    c"DS:maxmem:GAUGE:120:0:U",
+    c"DS:mem:GAUGE:120:0:U",
+    c"DS:maxdisk:GAUGE:120:0:U",
+    c"DS:disk:GAUGE:120:0:U",
+    c"DS:netin:DERIVE:120:0:U",
+    c"DS:netout:DERIVE:120:0:U",
+    c"DS:diskread:DERIVE:120:0:U",
+    c"DS:diskwrite:DERIVE:120:0:U",
+    c"DS:memhost:GAUGE:120:0:U",
+    c"DS:pressurecpusome:GAUGE:120:0:U",
+    c"DS:pressurecpufull:GAUGE:120:0:U",
+    c"DS:pressureiosome:GAUGE:120:0:U",
+    c"DS:pressureiofull:GAUGE:120:0:U",
+    c"DS:pressurememorysome:GAUGE:120:0:U",
+    c"DS:pressurememoryfull:GAUGE:120:0:U",
+    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
+    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
+    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
+    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
+    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
+];
+
+const RRD_NODE_DEF: [&CStr; 29] = [
+    c"DS:loadavg:GAUGE:120:0:U",
+    c"DS:maxcpu:GAUGE:120:0:U",
+    c"DS:cpu:GAUGE:120:0:U",
+    c"DS:iowait:GAUGE:120:0:U",
+    c"DS:memtotal:GAUGE:120:0:U",
+    c"DS:memused:GAUGE:120:0:U",
+    c"DS:swaptotal:GAUGE:120:0:U",
+    c"DS:swapused:GAUGE:120:0:U",
+    c"DS:roottotal:GAUGE:120:0:U",
+    c"DS:rootused:GAUGE:120:0:U",
+    c"DS:netin:DERIVE:120:0:U",
+    c"DS:netout:DERIVE:120:0:U",
+    c"DS:memfree:GAUGE:120:0:U",
+    c"DS:membuffers:GAUGE:120:0:U",
+    c"DS:memcached:GAUGE:120:0:U",
+    c"DS:arcsize:GAUGE:120:0:U",
+    c"DS:pressurecpusome:GAUGE:120:0:U",
+    c"DS:pressureiosome:GAUGE:120:0:U",
+    c"DS:pressureiofull:GAUGE:120:0:U",
+    c"DS:pressurememorysome:GAUGE:120:0:U",
+    c"DS:pressurememoryfull:GAUGE:120:0:U",
+    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
+    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
+    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
+    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
+    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
+];
+
+const RRD_STORAGE_DEF: [&CStr; 10] = [
+    c"DS:total:GAUGE:120:0:U",
+    c"DS:used:GAUGE:120:0:U",
+    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
+    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
+    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
+    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
+    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
+];
+
+const HELP: &str = "\
+proxmox-rrd-migration tool
+
+Migrates existing RRD graph data to the new format.
+
+Use this only in the process of upgrading from Proxmox VE 8 to 9 according to 
the upgrade guide!
+
+USAGE:
+    proxmox-rrd-migration [OPTIONS]
+
+    FLAGS:
+        -h, --help                  Prints this help information
+
+    OPTIONS:
+        --force                     Migrate, even if the target already exists.
+                                    This will overwrite any migrated RRD files!
+
+        --threads THREADS           Number of paralell threads. Default from 1 
to 4.
+
+        --test                      For internal use only.
+                                    Tests parallel guest migration only!
+        --source                    For internal use only. Source directory.
+        --target                    For internal use only. Target directory.
+      ";
+
+#[derive(Debug)]
+struct Args {
+    force: bool,
+    threads: Option<usize>,
+    test: bool,
+    source: Option<PathBuf>,
+    target: Option<PathBuf>,
+}
+
+fn parse_args() -> Result<Args, Error> {
+    let mut pargs = pico_args::Arguments::from_env();
+
+    // Help has a higher priority and should be handled separately.
+    if pargs.contains(["-h", "--help"]) {
+        print!("{}", HELP);
+        std::process::exit(0);
+    }
+
+    let mut args = Args {
+        threads: pargs.opt_value_from_str("--threads").unwrap(),
+        force: false,
+        test: false,
+        source: pargs.opt_value_from_str("--source").unwrap(),
+        target: pargs.opt_value_from_str("--target").unwrap(),
+    };
+
+    if pargs.contains("--test") {
+        args.test = true;
+    }
+    if pargs.contains("--force") {
+        args.force = true;
+    }
+
+    // It's up to the caller what to do with the remaining arguments.
+    let remaining = pargs.finish();
+    if !remaining.is_empty() {
+        bail!(format!("Warning: unused arguments left: {:?}", remaining));
+    }
+
+    Ok(args)
+}
+
+fn main() {
+    let args = match parse_args() {
+        Ok(v) => v,
+        Err(e) => {
+            eprintln!("Error: {}.", e);
+            std::process::exit(1);
+        }
+    };
+
+    let mut source_dir_guests: PathBuf = [BASE_DIR, 
SOURCE_SUBDIR_GUEST].iter().collect();
+    let mut target_dir_guests: PathBuf = [BASE_DIR, 
TARGET_SUBDIR_GUEST].iter().collect();
+    let source_dir_nodes: PathBuf = [BASE_DIR, 
SOURCE_SUBDIR_NODE].iter().collect();
+    let target_dir_nodes: PathBuf = [BASE_DIR, 
TARGET_SUBDIR_NODE].iter().collect();
+    let source_dir_storage: PathBuf = [BASE_DIR, 
SOURCE_SUBDIR_STORAGE].iter().collect();
+    let target_dir_storage: PathBuf = [BASE_DIR, 
TARGET_SUBDIR_STORAGE].iter().collect();
+
+    if args.test {
+        source_dir_guests = args.source.clone().unwrap();
+        target_dir_guests = args.target.clone().unwrap();
+    }
+
+    if !args.force && target_dir_guests.exists() {
+        eprintln!(
+            "Aborting! Target path for guests already exists. Use '--force' to 
still migrate. It will overwrite existing files!"
+        );
+        std::process::exit(1);
+    }
+    if !args.force && target_dir_nodes.exists() {
+        eprintln!(
+            "Aborting! Target path for nodes already exists. Use '--force' to 
still migrate. It will overwrite existing files!"
+        );
+        std::process::exit(1);
+    }
+    if !args.force && target_dir_storage.exists() {
+        eprintln!(
+            "Aborting! Target path for storages already exists. Use '--force' 
to still migrate. It will overwrite existing files!"
+        );
+        std::process::exit(1);
+    }
+
+    if !args.test {
+        if let Err(e) = migrate_nodes(source_dir_nodes, target_dir_nodes) {
+            eprintln!("Error migrating nodes: {}", e);
+            std::process::exit(1);
+        }
+        if let Err(e) = migrate_storage(source_dir_storage, 
target_dir_storage) {
+            eprintln!("Error migrating storage: {}", e);
+            std::process::exit(1);
+        }
+    }
+    if let Err(e) = migrate_guests(source_dir_guests, target_dir_guests, 
set_threads(&args)) {
+        eprintln!("Error migrating guests: {}", e);
+        std::process::exit(1);
+    }
+}
+
+/// Set number of threads
+///
+/// Either a fixed parameter or determining a range between 1 to 4 threads
+///  based on the number of CPU cores available in the system.
+fn set_threads(args: &Args) -> usize {
+    if args.threads.is_some() {
+        return args.threads.unwrap();
+    }
+    // check for a way to get physical cores and not threads?
+    let cpus: usize = String::from_utf8_lossy(
+        std::process::Command::new("nproc")
+            .output()
+            .expect("Error running nproc")
+            .stdout
+            .as_slice()
+            .trim_ascii(),
+    )
+    .parse::<usize>()
+    .expect("Could not parse nproc output");
+
+    if cpus < 32 {
+        let threads = cpus / 8;
+        if threads == 0 {
+            return 1;
+        }
+        return threads;
+    }
+    return MAX_THREADS;
+}
+
+/// Migrate guest RRD files
+///
+/// In parallel to speed up the process as most time is spent on converting the
+/// data to the new format.
+fn migrate_guests(
+    source_dir_guests: PathBuf,
+    target_dir_guests: PathBuf,
+    threads: usize,
+) -> Result<(), Error> {
+    println!("Migrating RRD data for guests…");
+    println!("Using {} thread(s)", threads);
+
+    let mut guest_source_files: Vec<(CString, OsString)> = Vec::new();
+
+    fs::read_dir(&source_dir_guests)?
+        .filter(|f| f.is_ok())
+        .map(|f| f.unwrap().path())
+        .filter(|f| f.is_file())
+        .for_each(|file| {
+            let path = CString::new(file.as_path().as_os_str().as_bytes())
+                .expect("Could not convert path to CString.");
+            let fname = file
+                .file_name()
+                .map(|v| v.to_os_string())
+                .expect("Could not convert fname to OsString.");
+            guest_source_files.push((path, fname))
+        });
+    if !target_dir_guests.exists() {
+        println!("Creating new directory: '{}'", target_dir_guests.display());
+        std::fs::create_dir(&target_dir_guests)?;
+    }
+
+    let total_guests = guest_source_files.len();
+    let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+    let guests2 = guests.clone();
+    let start_time = std::time::SystemTime::now();
+
+    let migration_pool = ParallelHandler::new(
+        "guest rrd migration",
+        threads,
+        move |(path, fname): (CString, OsString)| {
+            let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+            source[0] = path.as_ptr();
+
+            let node_name = fname;
+            let mut target_path = target_dir_guests.clone();
+            target_path.push(node_name);
+
+            let target_path = 
CString::new(target_path.to_str().unwrap()).unwrap();
+
+            unsafe {
+                rrd_get_context();
+                rrd_clear_error();
+                let res = rrd_create_r2(
+                    target_path.as_ptr(),
+                    RRD_STEP_SIZE as u64,
+                    0,
+                    0,
+                    source.as_mut_ptr(),
+                    std::ptr::null(),
+                    RRD_VM_DEF.len() as i32,
+                    RRD_VM_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+                );
+                if res != 0 {
+                    bail!(
+                        "RRD create Error: {}",
+                        CStr::from_ptr(rrd_get_error()).to_string_lossy()
+                    );
+                }
+            }
+            let current_guests = guests2.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
+            if current_guests > 0 && current_guests % 200 == 0 {
+                println!("Migrated {} of {} guests", current_guests, 
total_guests);
+            }
+            Ok(())
+        },
+    );
+    let migration_channel = migration_pool.channel();
+
+    for file in guest_source_files {
+        let migration_channel = migration_channel.clone();
+        migration_channel.send(file)?;
+    }
+
+    drop(migration_channel);
+    migration_pool.complete()?;
+
+    let elapsed = start_time.elapsed()?.as_secs_f64();
+    let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
+    println!("Migrated {} guests in {:.2}s", guests, elapsed,);
+
+    Ok(())
+}
+
+/// Migrate node RRD files
+///
+/// In serial as the number of nodes will not be high.
+fn migrate_nodes(source_dir_nodes: PathBuf, target_dir_nodes: PathBuf) -> 
Result<(), Error> {
+    println!("Migrating RRD data for nodes…");
+
+    if !target_dir_nodes.exists() {
+        println!("Creating new directory: '{}'", target_dir_nodes.display());
+        std::fs::create_dir(&target_dir_nodes)?;
+    }
+
+    let mut node_source_files: Vec<(CString, OsString)> = Vec::new();
+    fs::read_dir(&source_dir_nodes)?
+        .filter(|f| f.is_ok())
+        .map(|f| f.unwrap().path())
+        .filter(|f| f.is_file())
+        .for_each(|file| {
+            let path = CString::new(file.as_path().as_os_str().as_bytes())
+                .expect("Could not convert path to CString.");
+            let fname = file
+                .file_name()
+                .map(|v| v.to_os_string())
+                .expect("Could not convert fname to OsString.");
+            node_source_files.push((path, fname))
+        });
+
+    for file in node_source_files {
+        println!("Node: '{}'", PathBuf::from(file.1.clone()).display());
+        let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+        source[0] = file.0.as_ptr();
+
+        let node_name = file.1;
+        let mut target_path = target_dir_nodes.clone();
+        target_path.push(node_name);
+
+        let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+        unsafe {
+            rrd_get_context();
+            rrd_clear_error();
+            let res = rrd_create_r2(
+                target_path.as_ptr(),
+                RRD_STEP_SIZE as u64,
+                0,
+                0,
+                source.as_mut_ptr(),
+                std::ptr::null(),
+                RRD_NODE_DEF.len() as i32,
+                RRD_NODE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+            );
+            if res != 0 {
+                bail!(
+                    "RRD create Error: {}",
+                    CStr::from_ptr(rrd_get_error()).to_string_lossy()
+                );
+            }
+        }
+    }
+    println!("Migrated all nodes");
+
+    Ok(())
+}
+
+/// Migrate storage RRD files
+///
+/// In serial as the number of storage will not be that high.
+fn migrate_storage(source_dir_storage: PathBuf, target_dir_storage: PathBuf) 
-> Result<(), Error> {
+    println!("Migrating RRD data for storages…");
+
+    if !target_dir_storage.exists() {
+        println!("Creating new directory: '{}'", target_dir_storage.display());
+        std::fs::create_dir(&target_dir_storage)?;
+    }
+
+    // storage has another layer of directories per node over which we need to 
iterate
+    fs::read_dir(&source_dir_storage)?
+        .filter(|f| f.is_ok())
+        .map(|f| f.unwrap().path())
+        .filter(|f| f.is_dir())
+        .try_for_each(|node| {
+            let mut storage_source_files: Vec<(CString, OsString)> = 
Vec::new();
+
+            let mut source_node_subdir = source_dir_storage.clone();
+            source_node_subdir.push(&node.file_name().unwrap());
+
+            let mut target_node_subdir = target_dir_storage.clone();
+            target_node_subdir.push(&node.file_name().unwrap());
+
+            fs::create_dir(target_node_subdir.as_path())?;
+            let metadata = target_node_subdir.metadata()?;
+            let mut permissions = metadata.permissions();
+            permissions.set_mode(0o755);
+
+            fs::read_dir(&source_node_subdir)?
+                .filter(|f| f.is_ok())
+                .map(|f| f.unwrap().path())
+                .filter(|f| f.is_file())
+                .for_each(|file| {
+                    let path = 
CString::new(file.as_path().as_os_str().as_bytes())
+                        .expect("Could not convert path to CString.");
+                    let fname = file
+                        .file_name()
+                        .map(|v| v.to_os_string())
+                        .expect("Could not convert fname to OsString.");
+                    storage_source_files.push((path, fname))
+                });
+
+            for file in storage_source_files {
+                println!("Storage: '{}'", 
PathBuf::from(file.1.clone()).display());
+                let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+                source[0] = file.0.as_ptr();
+
+                let node_name = file.1;
+                let mut target_path = target_node_subdir.clone();
+                target_path.push(node_name);
+
+                let target_path = 
CString::new(target_path.to_str().unwrap()).unwrap();
+
+                unsafe {
+                    rrd_get_context();
+                    rrd_clear_error();
+                    let res = rrd_create_r2(
+                        target_path.as_ptr(),
+                        RRD_STEP_SIZE as u64,
+                        0,
+                        0,
+                        source.as_mut_ptr(),
+                        std::ptr::null(),
+                        RRD_STORAGE_DEF.len() as i32,
+                        RRD_STORAGE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+                    );
+                    if res != 0 {
+                        bail!(
+                            "RRD create Error: {}",
+                            CStr::from_ptr(rrd_get_error()).to_string_lossy()
+                        );
+                    }
+                }
+            }
+            Ok(())
+        })?;
+    println!("Migrated all nodes");
+
+    Ok(())
+}
diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
new file mode 100644
index 0000000..787742a
--- /dev/null
+++ b/src/parallel_handler.rs
@@ -0,0 +1,162 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{Error, bail, format_err};
+use crossbeam_channel::{Sender, bounded};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+    input: Sender<I>,
+    abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+    let guard = abort.lock().unwrap();
+    if let Some(err_msg) = &*guard {
+        return Err(format_err!("{}", err_msg));
+    }
+    Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+    /// Send data to the worker threads
+    pub fn send(&self, input: I) -> Result<(), Error> {
+        check_abort(&self.abort)?;
+        match self.input.send(input) {
+            Ok(()) => Ok(()),
+            Err(_) => bail!("send failed - channel closed"),
+        }
+    }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+    handles: Vec<JoinHandle<()>>,
+    name: String,
+    input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+    fn clone(&self) -> Self {
+        Self {
+            input: self.input.clone(),
+            abort: Arc::clone(&self.abort),
+        }
+    }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+    /// Create a new thread pool, each thread processing incoming data
+    /// with 'handler_fn'.
+    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+    where
+        F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+    {
+        let mut handles = Vec::new();
+        let (input_tx, input_rx) = bounded::<I>(threads);
+
+        let abort = Arc::new(Mutex::new(None));
+
+        for i in 0..threads {
+            let input_rx = input_rx.clone();
+            let abort = Arc::clone(&abort);
+            let handler_fn = handler_fn.clone();
+
+            handles.push(
+                std::thread::Builder::new()
+                    .name(format!("{} ({})", name, i))
+                    .spawn(move || {
+                        loop {
+                            let data = match input_rx.recv() {
+                                Ok(data) => data,
+                                Err(_) => return,
+                            };
+                            if let Err(err) = (handler_fn)(data) {
+                                let mut guard = abort.lock().unwrap();
+                                if guard.is_none() {
+                                    *guard = Some(err.to_string());
+                                }
+                            }
+                        }
+                    })
+                    .unwrap(),
+            );
+        }
+        Self {
+            handles,
+            name: name.to_string(),
+            input: Some(SendHandle {
+                input: input_tx,
+                abort,
+            }),
+        }
+    }
+
+    /// Returns a cloneable channel to send data to the worker threads
+    pub fn channel(&self) -> SendHandle<I> {
+        self.input.as_ref().unwrap().clone()
+    }
+
+    /// Send data to the worker threads
+    pub fn send(&self, input: I) -> Result<(), Error> {
+        self.input.as_ref().unwrap().send(input)?;
+        Ok(())
+    }
+
+    /// Wait for worker threads to complete and check for errors
+    pub fn complete(mut self) -> Result<(), Error> {
+        let input = self.input.take().unwrap();
+        let abort = Arc::clone(&input.abort);
+        check_abort(&abort)?;
+        drop(input);
+
+        let msg_list = self.join_threads();
+
+        // an error might be encountered while waiting for the join
+        check_abort(&abort)?;
+
+        if msg_list.is_empty() {
+            return Ok(());
+        }
+        Err(format_err!("{}", msg_list.join("\n")))
+    }
+
+    fn join_threads(&mut self) -> Vec<String> {
+        let mut msg_list = Vec::new();
+
+        let mut i = 0;
+        while let Some(handle) = self.handles.pop() {
+            if let Err(panic) = handle.join() {
+                if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+                    msg_list.push(format!("thread {} ({i}) panicked: 
{panic_msg}", self.name));
+                } else if let Some(panic_msg) = panic.downcast_ref::<String>() 
{
+                    msg_list.push(format!("thread {} ({i}) panicked: 
{panic_msg}", self.name));
+                } else {
+                    msg_list.push(format!("thread {} ({i}) panicked", 
self.name));
+                }
+            }
+            i += 1;
+        }
+        msg_list
+    }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+    fn drop(&mut self) {
+        drop(self.input.take());
+        while let Some(handle) = self.handles.pop() {
+            let _ = handle.join();
+        }
+    }
+}
diff --git a/wrapper.h b/wrapper.h
new file mode 100644
index 0000000..64d0aa6
--- /dev/null
+++ b/wrapper.h
@@ -0,0 +1 @@
+#include <rrd.h>
-- 
2.39.5



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

Reply via email to