Use an explicit AnyEvent::Handle similar to websocket proxying.

Needs some special care to make sure we apply backpressure correctly to
avoid caching too much data. Note that because of AnyEvent restrictions,
specifying a "fh" to point to a file or a packet-based socket may result
in unwanted behaviour[0].

[0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION

Signed-off-by: Stefan Reiter <s.rei...@proxmox.com>
---
 PVE/APIServer/AnyEvent.pm | 97 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 93 insertions(+), 4 deletions(-)

diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm
index 60a2a1c..643ae88 100644
--- a/PVE/APIServer/AnyEvent.pm
+++ b/PVE/APIServer/AnyEvent.pm
@@ -189,7 +189,7 @@ sub finish_response {
 }
 
 sub response {
-    my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+    my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
 
     #print "$$: send response: " . Dumper($resp);
 
@@ -231,7 +231,7 @@ sub response {
     $resp->header('Server' => "pve-api-daemon/3.0");
 
     my $content_length;
-    if ($content) {
+    if ($content && !$stream_fh) {
 
        $content_length = length($content);
 
@@ -258,11 +258,93 @@ sub response {
     #print "SEND(without content) $res\n" if $self->{debug};
 
     $res .= "\015\012";
-    $res .= $content if $content;
+    $res .= $content if $content && !$stream_fh;
 
     $self->log_request($reqstate, $reqstate->{request});
 
-    if ($delay && $delay > 0) {
+    if ($stream_fh) {
+       # write headers and preamble
+       $reqstate->{hdl}->push_write($res);
+
+       # then attach an AnyEvent::Handle to pass through the data
+       my $buf_size = 4*1024*1024;
+
+        my $on_read;
+       $on_read = sub {
+           my ($hdl) = @_;
+           my $reqhdl = $reqstate->{hdl};
+           return if !$reqhdl;
+
+           my $wbuf_len = length($reqhdl->{wbuf});
+           my $rbuf_len = length($hdl->{rbuf});
+           # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+           # that's unbounded, so just assume $buf_size
+           my $to_read = $buf_size - $wbuf_len;
+           $to_read = $rbuf_len if $rbuf_len < $to_read;
+           if ($to_read > 0) {
+               my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+               $reqhdl->push_write($data);
+               $rbuf_len -= $to_read;
+           } elsif ($hdl->{_eof}) {
+               # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+               # any data when called at EOF, so unregister ourselves - data is
+               # flushed by on_eof anyway
+               # see: 
https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+               $hdl->on_read();
+               return;
+           }
+
+           # apply backpressure so we don't accept any more data into
+           # buffer if the client isn't downloading fast enough
+           # note: read_size can double upon read, and we also need to
+           # account for one more read after start_read, so *4
+           if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+               # stop reading
+               $hdl->on_read();
+               my $prev_on_drain = $reqhdl->{on_drain};
+               $reqhdl->on_drain(sub {
+                   my ($wrhdl) = @_;
+                   # write buffer is empty, continue reading
+                   $hdl->on_read($on_read);
+                   if ($prev_on_drain) {
+                       $wrhdl->on_drain($prev_on_drain);
+                       $prev_on_drain->($wrhdl);
+                   }
+               });
+           }
+       };
+
+       $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+           fh => $stream_fh,
+           rbuf_max => $buf_size,
+           timeout => 0,
+           on_read => $on_read,
+           on_eof => sub {
+               my ($hdl) = @_;
+               eval {
+                   if (my $reqhdl = $reqstate->{hdl}) {
+                       $self->log_aborted_request($reqstate);
+                       # write out any remaining data
+                       $reqhdl->push_write($hdl->{rbuf}) if 
length($hdl->{rbuf}) > 0;
+                       $hdl->{rbuf} = "";
+                       $reqhdl->push_shutdown();
+                       $self->finish_response($reqstate);
+                   }
+               };
+               if (my $err = $@) { syslog('err', "$err"); }
+               $on_read = undef;
+           },
+           on_error => sub {
+               my ($hdl, $fatal, $message) = @_;
+               eval {
+                   $self->log_aborted_request($reqstate, $message);
+                   $self->client_do_disconnect($reqstate);
+               };
+               if (my $err = $@) { syslog('err', "$err"); }
+               $on_read = undef;
+           },
+       );
+    } elsif ($delay && $delay > 0) {
        my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
            undef $w; # delete reference
            $reqstate->{hdl}->push_write($res);
@@ -322,6 +404,13 @@ sub send_file_start {
            if (ref($download) eq 'HASH') {
                $fh = $download->{fh};
                $mime = $download->{'content-type'};
+
+               if ($download->{stream}) {
+                   my $header = HTTP::Headers->new(Content_Type => $mime);
+                   my $resp = HTTP::Response->new(200, "OK", $header);
+                   $self->response($reqstate, $resp, undef, 1, 0, $fh);
+                   return;
+               }
            } else {
                my $filename = $download;
                $fh = IO::File->new($filename, '<') ||
-- 
2.20.1



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

Reply via email to