Signed-off-by: Thomas Lamprecht <t.lampre...@proxmox.com>
---
 PVE/ExtMetric.pm       | 39 ++++++++-------------------------------
 PVE/Status/Graphite.pm | 14 +++++++-------
 PVE/Status/InfluxDB.pm | 21 ++++++++++++---------
 PVE/Status/Plugin.pm   | 36 ++++++++++++++++++++++++++++++++++++
 4 files changed, 63 insertions(+), 47 deletions(-)

diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm
index 14d98317..448d3925 100644
--- a/PVE/ExtMetric.pm
+++ b/PVE/ExtMetric.pm
@@ -28,17 +28,10 @@ sub update_all($$@) {
 
     my $method = "update_${subsystem}_status";
 
-    my (undef, $fn, $line, $subr) = caller(1);
     for my $txn (@$transactions) {
        my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
 
        $plugin->$method($txn, @params);
-
-       if (length($txn->{data}) > 48000) {
-           # UDP stack cannot handle messages > 65k, if we've alot of data we
-           # do smaller batch sends then, but keep the connection alive
-           transaction_flush($txn, 1);
-       }
     }
 }
 
@@ -69,36 +62,20 @@ sub transactions_start {
     return $transactions;
 }
 
-sub transaction_flush {
-    my ($txn, $keepconnected) = @_;
+sub transactions_finish {
+    my ($transactions) = @_;
 
-    if (!$txn->{connection}) {
-       return if !$txn->{data}; # OK, if data was already sent/flushed
-       die "cannot flush metric data, no connection available!\n";
-    }
-    return if !defined($txn->{data}) || $txn->{data} eq '';
+    for my $txn (@$transactions) {
+       my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
 
-    my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+       eval { $plugin->flush_data($txn) };
+       my $flush_err = $@;
+       warn "$flush_err" if $flush_err;
 
-    my $data = delete $txn->{data};
-    eval { $plugin->send($txn->{connection}, $data) };
-    my $senderr = $@;
-
-    if (!$keepconnected) {
        $plugin->_disconnect($txn->{connection});
        $txn->{connection} = undef;
        # avoid log spam, already got a send error; disconnect would fail too
-       warn "disconnect failed: $@" if $@ && !$senderr;
-    }
-    die "metrics send error '$txn->{id}': $senderr" if $senderr;
-};
-
-sub transactions_finish {
-    my ($transactions) = @_;
-
-    for my $txn (@$transactions) {
-       eval { transaction_flush($txn) };
-       warn "$@" if $@;
+       warn "disconnect failed: $@" if $@ && !$flush_err;
     }
 }
 
diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm
index 28fa65fd..ecab5583 100644
--- a/PVE/Status/Graphite.pm
+++ b/PVE/Status/Graphite.pm
@@ -61,25 +61,26 @@ sub options {
 sub update_node_status {
     my ($class, $txn, $node, $data, $ctime) = @_;
 
-    assemble($txn, $data, $ctime, "nodes.$node");
+    return assemble($class, $txn, $data, $ctime, "nodes.$node");
 
 }
 
 sub update_qemu_status {
     my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
-    assemble($txn, $data, $ctime, "qemu.$vmid");
+
+    return assemble($class, $txn, $data, $ctime, "qemu.$vmid");
 }
 
 sub update_lxc_status {
     my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
 
-    assemble($txn, $data, $ctime, "lxc.$vmid");
+    return assemble($class, $txn, $data, $ctime, "lxc.$vmid");
 }
 
 sub update_storage_status {
     my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
 
-    assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
+    return assemble($class, $txn, $data, $ctime, 
"storages.$nodename.$storeid");
 }
 
 sub _connect {
@@ -108,7 +109,7 @@ sub _connect {
 }
 
 sub assemble {
-    my ($txn, $data, $ctime, $object) = @_;
+    my ($class, $txn, $data, $ctime, $object) = @_;
 
     my $path = $txn->{cfg}->{path} // 'proxmox';
     $path .= ".$object";
@@ -121,7 +122,6 @@ sub assemble {
        'serial' => 1,
     };
 
-    $txn->{data} //= '';
     my $assemble_graphite_data;
     $assemble_graphite_data = sub {
        my ($metric, $path) = @_;
@@ -136,7 +136,7 @@ sub assemble {
            if (ref($value) eq 'HASH') {
                $assemble_graphite_data->($value, $metricpath);
            } elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && 
!$key_blacklist->{$key}) {
-               $txn->{data} .= "$metricpath $value $ctime\n";
+               $class->add_metric_data($txn, "$metricpath $value $ctime\n");
            }
        }
     };
diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm
index 21949400..c7bc15a9 100644
--- a/PVE/Status/InfluxDB.pm
+++ b/PVE/Status/InfluxDB.pm
@@ -5,6 +5,7 @@ use warnings;
 
 use POSIX qw(isnan isinf);
 use Scalar::Util 'looks_like_number';
+use IO::Socket::IP;
 
 use PVE::SafeSyslog;
 
@@ -37,7 +38,7 @@ sub update_node_status {
 
     $ctime *= 1000000000;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, 
"object=nodes,host=$node");
+    build_influxdb_payload($class, $txn, $data, $ctime, 
"object=nodes,host=$node");
 }
 
 sub update_qemu_status {
@@ -51,7 +52,7 @@ sub update_qemu_status {
     }
     $object =~ s/\s/\\ /g;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+    build_influxdb_payload($class, $txn, $data, $ctime, $object);
 }
 
 sub update_lxc_status {
@@ -65,7 +66,7 @@ sub update_lxc_status {
     }
     $object =~ s/\s/\\ /g;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+    build_influxdb_payload($class, $txn, $data, $ctime, $object);
 }
 
 sub update_storage_status {
@@ -79,7 +80,7 @@ sub update_storage_status {
     }
     $object =~ s/\s/\\ /g;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+    build_influxdb_payload($class, $txn, $data, $ctime, $object);
 }
 
 sub _connect {
@@ -94,11 +95,13 @@ sub _connect {
         Proto       => 'udp',
     ) || die "couldn't create influxdb socket [$host]:$port - $@\n";
 
+    $socket->blocking(0);
+
     return $socket;
 }
 
 sub build_influxdb_payload {
-    my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
+    my ($class, $txn, $data, $ctime, $tags, $measurement, $instance) = @_;
 
     my @values = ();
 
@@ -116,9 +119,9 @@ sub build_influxdb_payload {
            # value is a hash
 
            if (!defined($measurement)) {
-               build_influxdb_payload($payload, $value, $ctime, $tags, $key);
+               build_influxdb_payload($class, $txn, $value, $ctime, $tags, 
$key);
            } elsif(!defined($instance)) {
-               build_influxdb_payload($payload, $value, $ctime, $tags, 
$measurement, $key);
+               build_influxdb_payload($class, $txn, $value, $ctime, $tags, 
$measurement, $key);
            } else {
                push @values, get_recursive_values($value);
            }
@@ -129,8 +132,8 @@ sub build_influxdb_payload {
        my $mm = $measurement // 'system';
        my $tagstring = $tags;
        $tagstring .= ",instance=$instance" if defined($instance);
-       my $valuestr =  join(',', @values);
-       $$payload .= "$mm,$tagstring $valuestr $ctime\n";
+       my $valuestr = join(',', @values);
+       $class->add_metric_data($txn, "$mm,$tagstring $valuestr $ctime\n");
     }
 }
 
diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm
index 402c5b4a..b1c91f8e 100644
--- a/PVE/Status/Plugin.pm
+++ b/PVE/Status/Plugin.pm
@@ -66,6 +66,42 @@ sub _disconnect {
     $connection->close(); # overwrite if not a simple socket
 }
 
+# UDP cannot do more than 64k at once. Overwrite for different protocol limits.
+sub _send_batch_size {
+    my ($class, $cfg) = @_;
+    return 48000;
+}
+
+# call with the smalles $data chunks possible
+sub add_metric_data {
+    my ($class, $txn, $data) = @_;
+    return if !defined($data);
+
+    my $batch_size = $class->_send_batch_size();
+    my $data_length = length($data) // 0;
+    my $dataq_len = length($txn->{data}) // 0;
+
+    if ($dataq_len > ($batch_size / 2) && ($dataq_len + $data_length) > 
$batch_size) {
+       $class->flush_data($txn);
+    }
+    $txn->{data} //= '';
+    $txn->{data} .= "$data";
+}
+
+sub flush_data {
+    my ($class, $txn) = @_;
+
+    if (!$txn->{connection}) {
+       return if !$txn->{data}; # OK, if data was already sent/flushed
+       die "cannot flush metric data, no connection available!\n";
+    }
+    return if !defined($txn->{data}) || $txn->{data} eq '';
+
+    my $data = delete $txn->{data};
+    eval { $class->send($txn->{connection}, $data) };
+    die "metrics send error '$txn->{id}': $@" if $@;
+}
+
 sub send {
     my ($class, $connection, $data) = @_;
 
-- 
2.20.1


_______________________________________________
pve-devel mailing list
pve-devel@pve.proxmox.com
https://pve.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

Reply via email to