Signed-off-by: Thomas Lamprecht <t.lampre...@proxmox.com> --- PVE/ExtMetric.pm | 39 ++++++++------------------------------- PVE/Status/Graphite.pm | 14 +++++++------- PVE/Status/InfluxDB.pm | 21 ++++++++++++--------- PVE/Status/Plugin.pm | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 47 deletions(-)
diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm index 14d98317..448d3925 100644 --- a/PVE/ExtMetric.pm +++ b/PVE/ExtMetric.pm @@ -28,17 +28,10 @@ sub update_all($$@) { my $method = "update_${subsystem}_status"; - my (undef, $fn, $line, $subr) = caller(1); for my $txn (@$transactions) { my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type}); $plugin->$method($txn, @params); - - if (length($txn->{data}) > 48000) { - # UDP stack cannot handle messages > 65k, if we've alot of data we - # do smaller batch sends then, but keep the connection alive - transaction_flush($txn, 1); - } } } @@ -69,36 +62,20 @@ sub transactions_start { return $transactions; } -sub transaction_flush { - my ($txn, $keepconnected) = @_; +sub transactions_finish { + my ($transactions) = @_; - if (!$txn->{connection}) { - return if !$txn->{data}; # OK, if data was already sent/flushed - die "cannot flush metric data, no connection available!\n"; - } - return if !defined($txn->{data}) || $txn->{data} eq ''; + for my $txn (@$transactions) { + my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type}); - my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type}); + eval { $plugin->flush_data($txn) }; + my $flush_err = $@; + warn "$flush_err" if $flush_err; - my $data = delete $txn->{data}; - eval { $plugin->send($txn->{connection}, $data) }; - my $senderr = $@; - - if (!$keepconnected) { $plugin->_disconnect($txn->{connection}); $txn->{connection} = undef; # avoid log spam, already got a send error; disconnect would fail too - warn "disconnect failed: $@" if $@ && !$senderr; - } - die "metrics send error '$txn->{id}': $senderr" if $senderr; -}; - -sub transactions_finish { - my ($transactions) = @_; - - for my $txn (@$transactions) { - eval { transaction_flush($txn) }; - warn "$@" if $@; + warn "disconnect failed: $@" if $@ && !$flush_err; } } diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm index 28fa65fd..ecab5583 100644 --- a/PVE/Status/Graphite.pm +++ b/PVE/Status/Graphite.pm @@ -61,25 +61,26 @@ sub options { sub update_node_status { my ($class, $txn, $node, $data, $ctime) = @_; - assemble($txn, $data, $ctime, "nodes.$node"); + return assemble($class, $txn, $data, $ctime, "nodes.$node"); } sub update_qemu_status { my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; - assemble($txn, $data, $ctime, "qemu.$vmid"); + + return assemble($class, $txn, $data, $ctime, "qemu.$vmid"); } sub update_lxc_status { my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; - assemble($txn, $data, $ctime, "lxc.$vmid"); + return assemble($class, $txn, $data, $ctime, "lxc.$vmid"); } sub update_storage_status { my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_; - assemble($txn, $data, $ctime, "storages.$nodename.$storeid"); + return assemble($class, $txn, $data, $ctime, "storages.$nodename.$storeid"); } sub _connect { @@ -108,7 +109,7 @@ sub _connect { } sub assemble { - my ($txn, $data, $ctime, $object) = @_; + my ($class, $txn, $data, $ctime, $object) = @_; my $path = $txn->{cfg}->{path} // 'proxmox'; $path .= ".$object"; @@ -121,7 +122,6 @@ sub assemble { 'serial' => 1, }; - $txn->{data} //= ''; my $assemble_graphite_data; $assemble_graphite_data = sub { my ($metric, $path) = @_; @@ -136,7 +136,7 @@ sub assemble { if (ref($value) eq 'HASH') { $assemble_graphite_data->($value, $metricpath); } elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) { - $txn->{data} .= "$metricpath $value $ctime\n"; + $class->add_metric_data($txn, "$metricpath $value $ctime\n"); } } }; diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm index 21949400..c7bc15a9 100644 --- a/PVE/Status/InfluxDB.pm +++ b/PVE/Status/InfluxDB.pm @@ -5,6 +5,7 @@ use warnings; use POSIX qw(isnan isinf); use Scalar::Util 'looks_like_number'; +use IO::Socket::IP; use PVE::SafeSyslog; @@ -37,7 +38,7 @@ sub update_node_status { $ctime *= 1000000000; - build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node"); + build_influxdb_payload($class, $txn, $data, $ctime, "object=nodes,host=$node"); } sub update_qemu_status { @@ -51,7 +52,7 @@ sub update_qemu_status { } $object =~ s/\s/\\ /g; - build_influxdb_payload(\$txn->{data}, $data, $ctime, $object); + build_influxdb_payload($class, $txn, $data, $ctime, $object); } sub update_lxc_status { @@ -65,7 +66,7 @@ sub update_lxc_status { } $object =~ s/\s/\\ /g; - build_influxdb_payload(\$txn->{data}, $data, $ctime, $object); + build_influxdb_payload($class, $txn, $data, $ctime, $object); } sub update_storage_status { @@ -79,7 +80,7 @@ sub update_storage_status { } $object =~ s/\s/\\ /g; - build_influxdb_payload(\$txn->{data}, $data, $ctime, $object); + build_influxdb_payload($class, $txn, $data, $ctime, $object); } sub _connect { @@ -94,11 +95,13 @@ sub _connect { Proto => 'udp', ) || die "couldn't create influxdb socket [$host]:$port - $@\n"; + $socket->blocking(0); + return $socket; } sub build_influxdb_payload { - my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_; + my ($class, $txn, $data, $ctime, $tags, $measurement, $instance) = @_; my @values = (); @@ -116,9 +119,9 @@ sub build_influxdb_payload { # value is a hash if (!defined($measurement)) { - build_influxdb_payload($payload, $value, $ctime, $tags, $key); + build_influxdb_payload($class, $txn, $value, $ctime, $tags, $key); } elsif(!defined($instance)) { - build_influxdb_payload($payload, $value, $ctime, $tags, $measurement, $key); + build_influxdb_payload($class, $txn, $value, $ctime, $tags, $measurement, $key); } else { push @values, get_recursive_values($value); } @@ -129,8 +132,8 @@ sub build_influxdb_payload { my $mm = $measurement // 'system'; my $tagstring = $tags; $tagstring .= ",instance=$instance" if defined($instance); - my $valuestr = join(',', @values); - $$payload .= "$mm,$tagstring $valuestr $ctime\n"; + my $valuestr = join(',', @values); + $class->add_metric_data($txn, "$mm,$tagstring $valuestr $ctime\n"); } } diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm index 402c5b4a..b1c91f8e 100644 --- a/PVE/Status/Plugin.pm +++ b/PVE/Status/Plugin.pm @@ -66,6 +66,42 @@ sub _disconnect { $connection->close(); # overwrite if not a simple socket } +# UDP cannot do more than 64k at once. Overwrite for different protocol limits. +sub _send_batch_size { + my ($class, $cfg) = @_; + return 48000; +} + +# call with the smalles $data chunks possible +sub add_metric_data { + my ($class, $txn, $data) = @_; + return if !defined($data); + + my $batch_size = $class->_send_batch_size(); + my $data_length = length($data) // 0; + my $dataq_len = length($txn->{data}) // 0; + + if ($dataq_len > ($batch_size / 2) && ($dataq_len + $data_length) > $batch_size) { + $class->flush_data($txn); + } + $txn->{data} //= ''; + $txn->{data} .= "$data"; +} + +sub flush_data { + my ($class, $txn) = @_; + + if (!$txn->{connection}) { + return if !$txn->{data}; # OK, if data was already sent/flushed + die "cannot flush metric data, no connection available!\n"; + } + return if !defined($txn->{data}) || $txn->{data} eq ''; + + my $data = delete $txn->{data}; + eval { $class->send($txn->{connection}, $data) }; + die "metrics send error '$txn->{id}': $@" if $@; +} + sub send { my ($class, $connection, $data) = @_; -- 2.20.1 _______________________________________________ pve-devel mailing list pve-devel@pve.proxmox.com https://pve.proxmox.com/cgi-bin/mailman/listinfo/pve-devel