Signed-off-by: Aaron Lauterer <a.laute...@proxmox.com> --- .cargo/config.toml | 8 + .gitignore | 5 + Cargo.toml | 20 ++ build.rs | 29 +++ src/lib.rs | 5 + src/main.rs | 504 ++++++++++++++++++++++++++++++++++++++++ src/parallel_handler.rs | 162 +++++++++++++ wrapper.h | 1 + 8 files changed, 734 insertions(+) create mode 100644 .cargo/config.toml create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 build.rs create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 src/parallel_handler.rs create mode 100644 wrapper.h
diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..a439c97 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,8 @@ +[source] +[source.debian-packages] +directory = "/usr/share/cargo/registry" +[source.crates-io] +replace-with = "debian-packages" + +[profile.release] +debug=true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7741e63 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +./target +./build + +Cargo.lock + diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..d3523f3 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "proxmox_rrd_migration_8-9" +version = "0.1.0" +edition = "2021" +authors = [ + "Aaron Lauterer <a.laute...@proxmox.com>", + "Proxmox Support Team <supp...@proxmox.com>", +] +license = "AGPL-3" +homepage = "https://www.proxmox.com" + +[dependencies] +anyhow = "1.0.86" +pico-args = "0.5.0" +proxmox-async = "0.4" +crossbeam-channel = "0.5" + +[build-dependencies] +bindgen = "0.66.1" +pkg-config = "0.3" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..56d07cc --- /dev/null +++ b/build.rs @@ -0,0 +1,29 @@ +use std::env; +use std::path::PathBuf; + +fn main() { + println!("cargo:rustc-link-lib=rrd"); + + println!("cargo:rerun-if-changed=wrapper.h"); + // The bindgen::Builder is the main entry point + // to bindgen, and lets you build up options for + // the resulting bindings. + + let bindings = bindgen::Builder::default() + // The input header we would like to generate + // bindings for. + .header("wrapper.h") + // Tell cargo to invalidate the built crate whenever any of the + // included header files changed. + .parse_callbacks(Box::new(bindgen::CargoCallbacks)) + // Finish the builder and generate the bindings. + .generate() + // Unwrap the Result and panic on failure. + .expect("Unable to generate bindings"); + + // Write the bindings to the $OUT_DIR/bindings.rs file. + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); + bindings + .write_to_file(out_path.join("bindings.rs")) + .expect("Couldn't write bindings!"); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a38a13a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,5 @@ +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] + +include!(concat!(env!("OUT_DIR"), "/bindings.rs")); diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..43f181c --- /dev/null +++ b/src/main.rs @@ -0,0 +1,504 @@ +use anyhow::{bail, Error, Result}; +use proxmox_rrd_migration_8_9::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error}; +use std::ffi::{CStr, CString, OsString}; +use std::fs; +use std::os::unix::ffi::OsStrExt; +use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::parallel_handler::ParallelHandler; + +pub mod parallel_handler; + +const BASE_DIR: &str = "/var/lib/rrdcached/db"; +const SOURCE_SUBDIR_NODE: &str = "pve2-node"; +const SOURCE_SUBDIR_GUEST: &str = "pve2-vm"; +const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage"; +const TARGET_SUBDIR_NODE: &str = "pve9-node"; +const TARGET_SUBDIR_GUEST: &str = "pve9-vm"; +const TARGET_SUBDIR_STORAGE: &str = "pve9-storage"; +const MAX_THREADS: usize = 4; +const RRD_STEP_SIZE: usize = 60; + +// RRAs are defined in the following way: +// +// RRA:CF:xff:step:rows +// CF: AVERAGE or MAX +// xff: 0.5 +// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size: +// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min +// rows: how many aggregated rows are kept, as in how far back in time we store data +// +// how many seconds are aggregated per RRA: steps * stepsize * rows +// how many hours are aggregated per RRA: steps * stepsize * rows / 3600 +// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24 +// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example + +const RRD_VM_DEF: [&CStr; 25] = [ + c"DS:maxcpu:GAUGE:120:0:U", + c"DS:cpu:GAUGE:120:0:U", + c"DS:maxmem:GAUGE:120:0:U", + c"DS:mem:GAUGE:120:0:U", + c"DS:maxdisk:GAUGE:120:0:U", + c"DS:disk:GAUGE:120:0:U", + c"DS:netin:DERIVE:120:0:U", + c"DS:netout:DERIVE:120:0:U", + c"DS:diskread:DERIVE:120:0:U", + c"DS:diskwrite:DERIVE:120:0:U", + c"DS:memhost:GAUGE:120:0:U", + c"DS:pressurecpusome:GAUGE:120:0:U", + c"DS:pressurecpufull:GAUGE:120:0:U", + c"DS:pressureiosome:GAUGE:120:0:U", + c"DS:pressureiofull:GAUGE:120:0:U", + c"DS:pressurememorysome:GAUGE:120:0:U", + c"DS:pressurememoryfull:GAUGE:120:0:U", + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years +]; + +const RRD_NODE_DEF: [&CStr; 29] = [ + c"DS:loadavg:GAUGE:120:0:U", + c"DS:maxcpu:GAUGE:120:0:U", + c"DS:cpu:GAUGE:120:0:U", + c"DS:iowait:GAUGE:120:0:U", + c"DS:memtotal:GAUGE:120:0:U", + c"DS:memused:GAUGE:120:0:U", + c"DS:swaptotal:GAUGE:120:0:U", + c"DS:swapused:GAUGE:120:0:U", + c"DS:roottotal:GAUGE:120:0:U", + c"DS:rootused:GAUGE:120:0:U", + c"DS:netin:DERIVE:120:0:U", + c"DS:netout:DERIVE:120:0:U", + c"DS:memfree:GAUGE:120:0:U", + c"DS:membuffers:GAUGE:120:0:U", + c"DS:memcached:GAUGE:120:0:U", + c"DS:arcsize:GAUGE:120:0:U", + c"DS:pressurecpusome:GAUGE:120:0:U", + c"DS:pressureiosome:GAUGE:120:0:U", + c"DS:pressureiofull:GAUGE:120:0:U", + c"DS:pressurememorysome:GAUGE:120:0:U", + c"DS:pressurememoryfull:GAUGE:120:0:U", + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years +]; + +const RRD_STORAGE_DEF: [&CStr; 10] = [ + c"DS:total:GAUGE:120:0:U", + c"DS:used:GAUGE:120:0:U", + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years +]; + +const HELP: &str = "\ +proxmox-rrd-migration tool + +Migrates existing RRD graph data to the new format. + +Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide! + +USAGE: + proxmox-rrd-migration [OPTIONS] + + FLAGS: + -h, --help Prints this help information + + OPTIONS: + --force Migrate, even if the target already exists. + This will overwrite any migrated RRD files! + + --threads THREADS Number of paralell threads. Default from 1 to 4. + + --test For internal use only. + Tests parallel guest migration only! + --source For internal use only. Source directory. + --target For internal use only. Target directory. + "; + +#[derive(Debug)] +struct Args { + force: bool, + threads: Option<usize>, + test: bool, + source: Option<PathBuf>, + target: Option<PathBuf>, +} + +fn parse_args() -> Result<Args, Error> { + let mut pargs = pico_args::Arguments::from_env(); + + // Help has a higher priority and should be handled separately. + if pargs.contains(["-h", "--help"]) { + print!("{}", HELP); + std::process::exit(0); + } + + let mut args = Args { + threads: pargs.opt_value_from_str("--threads").unwrap(), + force: false, + test: false, + source: pargs.opt_value_from_str("--source").unwrap(), + target: pargs.opt_value_from_str("--target").unwrap(), + }; + + if pargs.contains("--test") { + args.test = true; + } + if pargs.contains("--force") { + args.force = true; + } + + // It's up to the caller what to do with the remaining arguments. + let remaining = pargs.finish(); + if !remaining.is_empty() { + bail!(format!("Warning: unused arguments left: {:?}", remaining)); + } + + Ok(args) +} + +fn main() { + let args = match parse_args() { + Ok(v) => v, + Err(e) => { + eprintln!("Error: {}.", e); + std::process::exit(1); + } + }; + + let mut source_dir_guests: PathBuf = [BASE_DIR, SOURCE_SUBDIR_GUEST].iter().collect(); + let mut target_dir_guests: PathBuf = [BASE_DIR, TARGET_SUBDIR_GUEST].iter().collect(); + let source_dir_nodes: PathBuf = [BASE_DIR, SOURCE_SUBDIR_NODE].iter().collect(); + let target_dir_nodes: PathBuf = [BASE_DIR, TARGET_SUBDIR_NODE].iter().collect(); + let source_dir_storage: PathBuf = [BASE_DIR, SOURCE_SUBDIR_STORAGE].iter().collect(); + let target_dir_storage: PathBuf = [BASE_DIR, TARGET_SUBDIR_STORAGE].iter().collect(); + + if args.test { + source_dir_guests = args.source.clone().unwrap(); + target_dir_guests = args.target.clone().unwrap(); + } + + if !args.force && target_dir_guests.exists() { + eprintln!( + "Aborting! Target path for guests already exists. Use '--force' to still migrate. It will overwrite existing files!" + ); + std::process::exit(1); + } + if !args.force && target_dir_nodes.exists() { + eprintln!( + "Aborting! Target path for nodes already exists. Use '--force' to still migrate. It will overwrite existing files!" + ); + std::process::exit(1); + } + if !args.force && target_dir_storage.exists() { + eprintln!( + "Aborting! Target path for storages already exists. Use '--force' to still migrate. It will overwrite existing files!" + ); + std::process::exit(1); + } + + if !args.test { + if let Err(e) = migrate_nodes(source_dir_nodes, target_dir_nodes) { + eprintln!("Error migrating nodes: {}", e); + std::process::exit(1); + } + if let Err(e) = migrate_storage(source_dir_storage, target_dir_storage) { + eprintln!("Error migrating storage: {}", e); + std::process::exit(1); + } + } + if let Err(e) = migrate_guests(source_dir_guests, target_dir_guests, set_threads(&args)) { + eprintln!("Error migrating guests: {}", e); + std::process::exit(1); + } +} + +/// Set number of threads +/// +/// Either a fixed parameter or determining a range between 1 to 4 threads +/// based on the number of CPU cores available in the system. +fn set_threads(args: &Args) -> usize { + if args.threads.is_some() { + return args.threads.unwrap(); + } + // check for a way to get physical cores and not threads? + let cpus: usize = String::from_utf8_lossy( + std::process::Command::new("nproc") + .output() + .expect("Error running nproc") + .stdout + .as_slice() + .trim_ascii(), + ) + .parse::<usize>() + .expect("Could not parse nproc output"); + + if cpus < 32 { + let threads = cpus / 8; + if threads == 0 { + return 1; + } + return threads; + } + return MAX_THREADS; +} + +/// Migrate guest RRD files +/// +/// In parallel to speed up the process as most time is spent on converting the +/// data to the new format. +fn migrate_guests( + source_dir_guests: PathBuf, + target_dir_guests: PathBuf, + threads: usize, +) -> Result<(), Error> { + println!("Migrating RRD data for guests…"); + println!("Using {} thread(s)", threads); + + let mut guest_source_files: Vec<(CString, OsString)> = Vec::new(); + + fs::read_dir(&source_dir_guests)? + .filter(|f| f.is_ok()) + .map(|f| f.unwrap().path()) + .filter(|f| f.is_file()) + .for_each(|file| { + let path = CString::new(file.as_path().as_os_str().as_bytes()) + .expect("Could not convert path to CString."); + let fname = file + .file_name() + .map(|v| v.to_os_string()) + .expect("Could not convert fname to OsString."); + guest_source_files.push((path, fname)) + }); + if !target_dir_guests.exists() { + println!("Creating new directory: '{}'", target_dir_guests.display()); + std::fs::create_dir(&target_dir_guests)?; + } + + let total_guests = guest_source_files.len(); + let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let guests2 = guests.clone(); + let start_time = std::time::SystemTime::now(); + + let migration_pool = ParallelHandler::new( + "guest rrd migration", + threads, + move |(path, fname): (CString, OsString)| { + let mut source: [*const i8; 2] = [std::ptr::null(); 2]; + source[0] = path.as_ptr(); + + let node_name = fname; + let mut target_path = target_dir_guests.clone(); + target_path.push(node_name); + + let target_path = CString::new(target_path.to_str().unwrap()).unwrap(); + + unsafe { + rrd_get_context(); + rrd_clear_error(); + let res = rrd_create_r2( + target_path.as_ptr(), + RRD_STEP_SIZE as u64, + 0, + 0, + source.as_mut_ptr(), + std::ptr::null(), + RRD_VM_DEF.len() as i32, + RRD_VM_DEF.map(|v| v.as_ptr()).as_mut_ptr(), + ); + if res != 0 { + bail!( + "RRD create Error: {}", + CStr::from_ptr(rrd_get_error()).to_string_lossy() + ); + } + } + let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if current_guests > 0 && current_guests % 200 == 0 { + println!("Migrated {} of {} guests", current_guests, total_guests); + } + Ok(()) + }, + ); + let migration_channel = migration_pool.channel(); + + for file in guest_source_files { + let migration_channel = migration_channel.clone(); + migration_channel.send(file)?; + } + + drop(migration_channel); + migration_pool.complete()?; + + let elapsed = start_time.elapsed()?.as_secs_f64(); + let guests = guests.load(std::sync::atomic::Ordering::SeqCst); + println!("Migrated {} guests in {:.2}s", guests, elapsed,); + + Ok(()) +} + +/// Migrate node RRD files +/// +/// In serial as the number of nodes will not be high. +fn migrate_nodes(source_dir_nodes: PathBuf, target_dir_nodes: PathBuf) -> Result<(), Error> { + println!("Migrating RRD data for nodes…"); + + if !target_dir_nodes.exists() { + println!("Creating new directory: '{}'", target_dir_nodes.display()); + std::fs::create_dir(&target_dir_nodes)?; + } + + let mut node_source_files: Vec<(CString, OsString)> = Vec::new(); + fs::read_dir(&source_dir_nodes)? + .filter(|f| f.is_ok()) + .map(|f| f.unwrap().path()) + .filter(|f| f.is_file()) + .for_each(|file| { + let path = CString::new(file.as_path().as_os_str().as_bytes()) + .expect("Could not convert path to CString."); + let fname = file + .file_name() + .map(|v| v.to_os_string()) + .expect("Could not convert fname to OsString."); + node_source_files.push((path, fname)) + }); + + for file in node_source_files { + println!("Node: '{}'", PathBuf::from(file.1.clone()).display()); + let mut source: [*const i8; 2] = [std::ptr::null(); 2]; + + source[0] = file.0.as_ptr(); + + let node_name = file.1; + let mut target_path = target_dir_nodes.clone(); + target_path.push(node_name); + + let target_path = CString::new(target_path.to_str().unwrap()).unwrap(); + + unsafe { + rrd_get_context(); + rrd_clear_error(); + let res = rrd_create_r2( + target_path.as_ptr(), + RRD_STEP_SIZE as u64, + 0, + 0, + source.as_mut_ptr(), + std::ptr::null(), + RRD_NODE_DEF.len() as i32, + RRD_NODE_DEF.map(|v| v.as_ptr()).as_mut_ptr(), + ); + if res != 0 { + bail!( + "RRD create Error: {}", + CStr::from_ptr(rrd_get_error()).to_string_lossy() + ); + } + } + } + println!("Migrated all nodes"); + + Ok(()) +} + +/// Migrate storage RRD files +/// +/// In serial as the number of storage will not be that high. +fn migrate_storage(source_dir_storage: PathBuf, target_dir_storage: PathBuf) -> Result<(), Error> { + println!("Migrating RRD data for storages…"); + + if !target_dir_storage.exists() { + println!("Creating new directory: '{}'", target_dir_storage.display()); + std::fs::create_dir(&target_dir_storage)?; + } + + // storage has another layer of directories per node over which we need to iterate + fs::read_dir(&source_dir_storage)? + .filter(|f| f.is_ok()) + .map(|f| f.unwrap().path()) + .filter(|f| f.is_dir()) + .try_for_each(|node| { + let mut storage_source_files: Vec<(CString, OsString)> = Vec::new(); + + let mut source_node_subdir = source_dir_storage.clone(); + source_node_subdir.push(&node.file_name().unwrap()); + + let mut target_node_subdir = target_dir_storage.clone(); + target_node_subdir.push(&node.file_name().unwrap()); + + fs::create_dir(target_node_subdir.as_path())?; + let metadata = target_node_subdir.metadata()?; + let mut permissions = metadata.permissions(); + permissions.set_mode(0o755); + + fs::read_dir(&source_node_subdir)? + .filter(|f| f.is_ok()) + .map(|f| f.unwrap().path()) + .filter(|f| f.is_file()) + .for_each(|file| { + let path = CString::new(file.as_path().as_os_str().as_bytes()) + .expect("Could not convert path to CString."); + let fname = file + .file_name() + .map(|v| v.to_os_string()) + .expect("Could not convert fname to OsString."); + storage_source_files.push((path, fname)) + }); + + for file in storage_source_files { + println!("Storage: '{}'", PathBuf::from(file.1.clone()).display()); + let mut source: [*const i8; 2] = [std::ptr::null(); 2]; + + source[0] = file.0.as_ptr(); + + let node_name = file.1; + let mut target_path = target_node_subdir.clone(); + target_path.push(node_name); + + let target_path = CString::new(target_path.to_str().unwrap()).unwrap(); + + unsafe { + rrd_get_context(); + rrd_clear_error(); + let res = rrd_create_r2( + target_path.as_ptr(), + RRD_STEP_SIZE as u64, + 0, + 0, + source.as_mut_ptr(), + std::ptr::null(), + RRD_STORAGE_DEF.len() as i32, + RRD_STORAGE_DEF.map(|v| v.as_ptr()).as_mut_ptr(), + ); + if res != 0 { + bail!( + "RRD create Error: {}", + CStr::from_ptr(rrd_get_error()).to_string_lossy() + ); + } + } + } + Ok(()) + })?; + println!("Migrated all nodes"); + + Ok(()) +} diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs new file mode 100644 index 0000000..787742a --- /dev/null +++ b/src/parallel_handler.rs @@ -0,0 +1,162 @@ +//! A thread pool which run a closure in parallel. + +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; + +use anyhow::{Error, bail, format_err}; +use crossbeam_channel::{Sender, bounded}; + +/// A handle to send data to the worker thread (implements clone) +pub struct SendHandle<I> { + input: Sender<I>, + abort: Arc<Mutex<Option<String>>>, +} + +/// Returns the first error happened, if any +pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> { + let guard = abort.lock().unwrap(); + if let Some(err_msg) = &*guard { + return Err(format_err!("{}", err_msg)); + } + Ok(()) +} + +impl<I: Send> SendHandle<I> { + /// Send data to the worker threads + pub fn send(&self, input: I) -> Result<(), Error> { + check_abort(&self.abort)?; + match self.input.send(input) { + Ok(()) => Ok(()), + Err(_) => bail!("send failed - channel closed"), + } + } +} + +/// A thread pool which run the supplied closure +/// +/// The send command sends data to the worker threads. If one handler +/// returns an error, we mark the channel as failed and it is no +/// longer possible to send data. +/// +/// When done, the 'complete()' method needs to be called to check for +/// outstanding errors. +pub struct ParallelHandler<I> { + handles: Vec<JoinHandle<()>>, + name: String, + input: Option<SendHandle<I>>, +} + +impl<I> Clone for SendHandle<I> { + fn clone(&self) -> Self { + Self { + input: self.input.clone(), + abort: Arc::clone(&self.abort), + } + } +} + +impl<I: Send + 'static> ParallelHandler<I> { + /// Create a new thread pool, each thread processing incoming data + /// with 'handler_fn'. + pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self + where + F: Fn(I) -> Result<(), Error> + Send + Clone + 'static, + { + let mut handles = Vec::new(); + let (input_tx, input_rx) = bounded::<I>(threads); + + let abort = Arc::new(Mutex::new(None)); + + for i in 0..threads { + let input_rx = input_rx.clone(); + let abort = Arc::clone(&abort); + let handler_fn = handler_fn.clone(); + + handles.push( + std::thread::Builder::new() + .name(format!("{} ({})", name, i)) + .spawn(move || { + loop { + let data = match input_rx.recv() { + Ok(data) => data, + Err(_) => return, + }; + if let Err(err) = (handler_fn)(data) { + let mut guard = abort.lock().unwrap(); + if guard.is_none() { + *guard = Some(err.to_string()); + } + } + } + }) + .unwrap(), + ); + } + Self { + handles, + name: name.to_string(), + input: Some(SendHandle { + input: input_tx, + abort, + }), + } + } + + /// Returns a cloneable channel to send data to the worker threads + pub fn channel(&self) -> SendHandle<I> { + self.input.as_ref().unwrap().clone() + } + + /// Send data to the worker threads + pub fn send(&self, input: I) -> Result<(), Error> { + self.input.as_ref().unwrap().send(input)?; + Ok(()) + } + + /// Wait for worker threads to complete and check for errors + pub fn complete(mut self) -> Result<(), Error> { + let input = self.input.take().unwrap(); + let abort = Arc::clone(&input.abort); + check_abort(&abort)?; + drop(input); + + let msg_list = self.join_threads(); + + // an error might be encountered while waiting for the join + check_abort(&abort)?; + + if msg_list.is_empty() { + return Ok(()); + } + Err(format_err!("{}", msg_list.join("\n"))) + } + + fn join_threads(&mut self) -> Vec<String> { + let mut msg_list = Vec::new(); + + let mut i = 0; + while let Some(handle) = self.handles.pop() { + if let Err(panic) = handle.join() { + if let Some(panic_msg) = panic.downcast_ref::<&str>() { + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); + } else if let Some(panic_msg) = panic.downcast_ref::<String>() { + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); + } else { + msg_list.push(format!("thread {} ({i}) panicked", self.name)); + } + } + i += 1; + } + msg_list + } +} + +// Note: We make sure that all threads will be joined +impl<I> Drop for ParallelHandler<I> { + fn drop(&mut self) { + drop(self.input.take()); + while let Some(handle) = self.handles.pop() { + let _ = handle.join(); + } + } +} diff --git a/wrapper.h b/wrapper.h new file mode 100644 index 0000000..64d0aa6 --- /dev/null +++ b/wrapper.h @@ -0,0 +1 @@ +#include <rrd.h> -- 2.39.5 _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel