remote migration uses a websocket connection to a task worker running on
the target node instead of commands via SSH to control the migration.
this websocket tunnel is started earlier than the SSH tunnel, and allows
adding UNIX-socket forwarding over additional websocket connections
on-demand.

the main differences to regular intra-cluster migration are:
- source VM config and disks are only removed upon request via --delete
- shared storages are treated like local storages, since we can't
assume they are shared across clusters (with potentical to extend this
by marking storages as shared)
- NBD migrated disks are explicitly pre-allocated on the target node via
tunnel command before starting the target VM instance
- in addition to storages, network bridges and the VMID itself is
transformed via a user defined mapping
- all commands and migration data streams are sent via a WS tunnel proxy

Signed-off-by: Fabian Grünbichler <f.gruenbich...@proxmox.com>
---

Notes:
    requires proxmox-websocket-tunnel

 PVE/API2/Qemu.pm   |   4 +-
 PVE/QemuMigrate.pm | 647 +++++++++++++++++++++++++++++++++++++++------
 PVE/QemuServer.pm  |   8 +-
 3 files changed, 575 insertions(+), 84 deletions(-)

diff --git a/PVE/API2/Qemu.pm b/PVE/API2/Qemu.pm
index a1a1813..24f5b98 100644
--- a/PVE/API2/Qemu.pm
+++ b/PVE/API2/Qemu.pm
@@ -4610,7 +4610,7 @@ __PACKAGE__->register_method({
                    # bump/reset both for breaking changes
                    # bump tunnel only for opt-in changes
                    return {
-                       api => 2,
+                       api => $PVE::QemuMigrate::WS_TUNNEL_VERSION,
                        age => 0,
                    };
                },
@@ -4897,7 +4897,7 @@ __PACKAGE__->register_method({
                            PVE::Firewall::remove_vmfw_conf($vmid);
                        }
 
-                       if (my @volumes = keys 
$state->{cleanup}->{volumes}->$%) {
+                       if (my @volumes = keys 
$state->{cleanup}->{volumes}->%*) {
                            PVE::Storage::foreach_volid(@volumes, sub {
                                my ($volid, $sid, $volname, $d) = @_;
 
diff --git a/PVE/QemuMigrate.pm b/PVE/QemuMigrate.pm
index 07b56eb..7378551 100644
--- a/PVE/QemuMigrate.pm
+++ b/PVE/QemuMigrate.pm
@@ -7,9 +7,15 @@ use IO::File;
 use IPC::Open2;
 use POSIX qw( WNOHANG );
 use Time::HiRes qw( usleep );
+use JSON qw(encode_json decode_json);
+use IO::Socket::UNIX;
+use Socket qw(SOCK_STREAM);
+use Storable qw(dclone);
+use URI::Escape;
 
-use PVE::Format qw(render_bytes);
+use PVE::APIClient::LWP;
 use PVE::Cluster;
+use PVE::Format qw(render_bytes);
 use PVE::GuestHelpers qw(safe_boolean_ne safe_string_ne);
 use PVE::INotify;
 use PVE::RPCEnvironment;
@@ -30,6 +36,9 @@ use PVE::QemuServer;
 use PVE::AbstractMigrate;
 use base qw(PVE::AbstractMigrate);
 
+# compared against remote end's minimum version
+our $WS_TUNNEL_VERSION = 2;
+
 sub fork_command_pipe {
     my ($self, $cmd) = @_;
 
@@ -85,7 +94,7 @@ sub finish_command_pipe {
        }
     }
 
-    $self->log('info', "ssh tunnel still running - terminating now with 
SIGTERM\n");
+    $self->log('info', "tunnel still running - terminating now with 
SIGTERM\n");
     kill(15, $cpid);
 
     # wait again
@@ -94,11 +103,11 @@ sub finish_command_pipe {
        sleep(1);
     }
 
-    $self->log('info', "ssh tunnel still running - terminating now with 
SIGKILL\n");
+    $self->log('info', "tunnel still running - terminating now with 
SIGKILL\n");
     kill 9, $cpid;
     sleep 1;
 
-    $self->log('err', "ssh tunnel child process (PID $cpid) couldn't be 
collected\n")
+    $self->log('err', "tunnel child process (PID $cpid) couldn't be 
collected\n")
        if !&$collect_child_process();
 }
 
@@ -115,18 +124,28 @@ sub read_tunnel {
     };
     die "reading from tunnel failed: $@\n" if $@;
 
-    chomp $output;
+    chomp $output if defined($output);
 
     return $output;
 }
 
 sub write_tunnel {
-    my ($self, $tunnel, $timeout, $command) = @_;
+    my ($self, $tunnel, $timeout, $command, $params) = @_;
 
     $timeout = 60 if !defined($timeout);
 
     my $writer = $tunnel->{writer};
 
+    if ($tunnel->{version} && $tunnel->{version} >= 2) {
+       my $object = defined($params) ? dclone($params) : {};
+       $object->{cmd} = $command;
+
+       $command = eval { JSON::encode_json($object) };
+
+       die "failed to encode command as JSON - $@\n"
+           if $@;
+    }
+
     eval {
        PVE::Tools::run_with_timeout($timeout, sub {
            print $writer "$command\n";
@@ -136,13 +155,29 @@ sub write_tunnel {
     die "writing to tunnel failed: $@\n" if $@;
 
     if ($tunnel->{version} && $tunnel->{version} >= 1) {
-       my $res = eval { $self->read_tunnel($tunnel, 10); };
+       my $res = eval { $self->read_tunnel($tunnel, $timeout); };
        die "no reply to command '$command': $@\n" if $@;
 
-       if ($res eq 'OK') {
-           return;
+       if ($tunnel->{version} == 1) {
+           if ($res eq 'OK') {
+               return;
+           } else {
+               die "tunnel replied '$res' to command '$command'\n";
+           }
        } else {
-           die "tunnel replied '$res' to command '$command'\n";
+           my $parsed = eval { JSON::decode_json($res) };
+           die "failed to decode tunnel reply '$res' (command '$command') - 
$@\n"
+               if $@;
+
+           if (!$parsed->{success}) {
+               if (defined($parsed->{msg})) {
+                   die "error - tunnel command '$command' failed - 
$parsed->{msg}\n";
+               } else {
+                   die "error - tunnel command '$command' failed\n";
+               }
+           }
+
+           return $parsed;
        }
     }
 }
@@ -185,10 +220,150 @@ sub fork_tunnel {
     return $tunnel;
 }
 
+my $forward_unix_socket = sub {
+    my ($self, $local, $remote) = @_;
+
+    my $params = dclone($self->{tunnel}->{params});
+    $params->{unix} = $local;
+    $params->{url} = $params->{url} ."socket=$remote&";
+    $params->{ticket} = { path => $remote };
+
+    my $cmd = encode_json({
+       control => JSON::true,
+       cmd => 'forward',
+       data => $params,
+    });
+
+    my $writer = $self->{tunnel}->{writer};
+    eval {
+       unlink $local;
+       PVE::Tools::run_with_timeout(15, sub {
+           print $writer "$cmd\n";
+           $writer->flush();
+       });
+    };
+    die "failed to write forwarding command - $@\n" if $@;
+
+    $self->read_tunnel($self->{tunnel});
+
+    $self->log('info', "Forwarded local unix socket '$local' to remote 
'$remote' via websocket tunnel");
+};
+
+sub fork_websocket_tunnel {
+    my ($self, $storages) = @_;
+
+    my $remote = $self->{opts}->{remote};
+    my $conn = $remote->{conn};
+
+    my $websocket_url = 
"https://$conn->{host}:$conn->{port}/api2/json/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnelwebsocket";
+
+    my $params = {
+       url => $websocket_url,
+    };
+
+    if (my $apitoken = $conn->{apitoken}) {
+       $params->{headers} = [["Authorization", "$apitoken"]];
+    } else {
+       die "can't connect to remote host without credentials\n";
+    }
+
+    if (my $fps = $conn->{cached_fingerprints}) {
+       $params->{fingerprint} = (keys %$fps)[0];
+    }
+
+    my $api_client = PVE::APIClient::LWP->new(%$conn);
+    my $storage_list = join(',', keys %$storages);
+    my $res = 
$api_client->post("/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnel", { 
storages => $storage_list });
+    $self->log('info', "remote: started migration tunnel worker 
'$res->{upid}'");
+    $params->{url} .= "?ticket=".uri_escape($res->{ticket});
+    $params->{url} .= "&socket=$res->{socket}";
+
+    my $reader = IO::Pipe->new();
+    my $writer = IO::Pipe->new();
+
+    my $cpid = fork();
+    if ($cpid) {
+       $writer->writer();
+       $reader->reader();
+       my $tunnel = { writer => $writer, reader => $reader, pid => $cpid };
+
+       eval {
+           my $writer = $tunnel->{writer};
+           my $cmd = encode_json({
+               control => JSON::true,
+               cmd => 'connect',
+               data => $params,
+           });
+
+           eval {
+               PVE::Tools::run_with_timeout(15, sub {
+                   print {$writer} "$cmd\n";
+                   $writer->flush();
+               });
+           };
+           die "failed to write tunnel connect command - $@\n" if $@;
+       };
+       die "failed to connect via WS: $@\n" if $@;
+
+       my $err;
+        eval {
+           my $writer = $tunnel->{writer};
+           my $cmd = encode_json({
+               cmd => 'version',
+           });
+
+           eval {
+               PVE::Tools::run_with_timeout(15, sub {
+                   print {$writer} "$cmd\n";
+                   $writer->flush();
+               });
+           };
+           $err = "failed to write tunnel version command - $@\n" if $@;
+           my $res = $self->read_tunnel($tunnel, 10);
+           $res = JSON::decode_json($res);
+           my $version = $res->{api};
+
+           if ($version =~ /^(\d+)$/) {
+               $tunnel->{version} = $1;
+               $tunnel->{age} = $res->{age};
+               $self->log('info', "tunnel info: $version\n");
+           } else {
+               $err = "received invalid tunnel version string '$version'\n" if 
!$err;
+           }
+       };
+       $err = $@ if !$err;
+
+       if ($err) {
+           $self->finish_command_pipe($tunnel);
+           die "can't open migration tunnel - $err";
+       }
+
+       $params->{url} = "$websocket_url?";
+       $tunnel->{params} = $params; # for forwarding
+
+       return $tunnel;
+    } else {
+       eval {
+           $writer->reader();
+           $reader->writer();
+           PVE::Tools::run_command(
+               ['proxmox-websocket-tunnel'],
+               input => "<&".fileno($writer),
+               output => ">&".fileno($reader),
+               errfunc => sub { my $line = shift; print "tunnel: $line\n"; },
+           );
+       };
+       warn "CMD websocket tunnel died: $@\n" if $@;
+       exit 0;
+    }
+}
+
 sub finish_tunnel {
-    my ($self, $tunnel) = @_;
+    my ($self, $tunnel, $cleanup) = @_;
 
-    eval { $self->write_tunnel($tunnel, 30, 'quit'); };
+    $cleanup = $cleanup ? 1 : 0;
+
+    eval { $self->write_tunnel($tunnel, 30, 'quit', { cleanup => $cleanup }); 
};
     my $err = $@;
 
     $self->finish_command_pipe($tunnel, 30);
@@ -338,23 +513,34 @@ sub prepare {
     }
 
     my $vollist = PVE::QemuServer::get_vm_volumes($conf);
+
+    my $storages = {};
     foreach my $volid (@$vollist) {
        my ($sid, $volname) = PVE::Storage::parse_volume_id($volid, 1);
 
-       # check if storage is available on both nodes
+       # check if storage is available on source node
        my $scfg = PVE::Storage::storage_check_enabled($storecfg, $sid);
 
        my $targetsid = $sid;
-       # NOTE: we currently ignore shared source storages in mappings so skip 
here too for now
-       if (!$scfg->{shared}) {
+       # NOTE: local ignores shared mappings, remote maps them
+       if (!$scfg->{shared} || $self->{opts}->{remote}) {
            $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, 
$sid);
        }
 
-       my $target_scfg = PVE::Storage::storage_check_enabled($storecfg, 
$targetsid, $self->{node});
-       my ($vtype) = PVE::Storage::parse_volname($storecfg, $volid);
+       $storages->{$targetsid} = 1;
+
+       if (!$self->{opts}->{remote}) {
+           # check if storage is available on target node
+           my $target_scfg = PVE::Storage::storage_check_enabled(
+               $storecfg,
+               $targetsid,
+               $self->{node},
+           );
+           my ($vtype) = PVE::Storage::parse_volname($storecfg, $volid);
 
-       die "$volid: content type '$vtype' is not available on storage 
'$targetsid'\n"
-           if !$target_scfg->{content}->{$vtype};
+           die "$volid: content type '$vtype' is not available on storage 
'$targetsid'\n"
+               if !$target_scfg->{content}->{$vtype};
+       }
 
        if ($scfg->{shared}) {
            # PVE::Storage::activate_storage checks this for non-shared storages
@@ -364,10 +550,23 @@ sub prepare {
        }
     }
 
-    # test ssh connection
-    my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
-    eval { $self->cmd_quiet($cmd); };
-    die "Can't connect to destination address using public key\n" if $@;
+    if ($self->{opts}->{remote}) {
+       # test & establish websocket connection
+       my $tunnel = $self->fork_websocket_tunnel($storages);
+       my $min_version = $tunnel->{version} - $tunnel->{age};
+       die "Remote tunnel endpoint not compatible, upgrade required (current: 
$WS_TUNNEL_VERSION, required: $min_version)\n"
+           if $WS_TUNNEL_VERSION < $min_version;
+        die "Remote tunnel endpoint too old, upgrade required (local: 
$WS_TUNNEL_VERSION, remote: $tunnel->{version})"
+           if $WS_TUNNEL_VERSION > $tunnel->{version};
+
+       print "websocket tunnel started\n";
+       $self->{tunnel} = $tunnel;
+    } else {
+       # test ssh connection
+       my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
+       eval { $self->cmd_quiet($cmd); };
+       die "Can't connect to destination address using public key\n" if $@;
+    }
 
     return $running;
 }
@@ -405,7 +604,7 @@ sub scan_local_volumes {
        my @sids = PVE::Storage::storage_ids($storecfg);
        foreach my $storeid (@sids) {
            my $scfg = PVE::Storage::storage_config($storecfg, $storeid);
-           next if $scfg->{shared};
+           next if $scfg->{shared} && !$self->{opts}->{remote};
            next if !PVE::Storage::storage_check_enabled($storecfg, $storeid, 
undef, 1);
 
            # get list from PVE::Storage (for unused volumes)
@@ -414,19 +613,24 @@ sub scan_local_volumes {
            next if @{$dl->{$storeid}} == 0;
 
            my $targetsid = 
PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
-           # check if storage is available on target node
-           my $target_scfg = PVE::Storage::storage_check_enabled(
-               $storecfg,
-               $targetsid,
-               $self->{node},
-           );
-
-           die "content type 'images' is not available on storage 
'$targetsid'\n"
-               if !$target_scfg->{content}->{images};
+           my $bwlimit_sids = [$storeid];
+           if (!$self->{opts}->{remote}) {
+               # check if storage is available on target node
+               my $target_scfg = PVE::Storage::storage_check_enabled(
+                   $storecfg,
+                   $targetsid,
+                   $self->{node},
+               );
+
+               die "content type 'images' is not available on storage 
'$targetsid'\n"
+                   if !$target_scfg->{content}->{images};
+
+               push @$bwlimit_sids, $targetsid;
+           }
 
            my $bwlimit = PVE::Storage::get_bandwidth_limit(
                'migration',
-               [$targetsid, $storeid],
+               $bwlimit_sids,
                $self->{opts}->{bwlimit},
            );
 
@@ -482,14 +686,17 @@ sub scan_local_volumes {
            my $scfg = PVE::Storage::storage_check_enabled($storecfg, $sid);
 
            my $targetsid = $sid;
-           # NOTE: we currently ignore shared source storages in mappings so 
skip here too for now
-           if (!$scfg->{shared}) {
+           # NOTE: local ignores shared mappings, remote maps them
+           if (!$scfg->{shared} || $self->{opts}->{remote}) {
                $targetsid = 
PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
            }
 
-           PVE::Storage::storage_check_enabled($storecfg, $targetsid, 
$self->{node});
+           # check target storage on target node if intra-cluster migration
+           if (!$self->{opts}->{remote}) {
+               PVE::Storage::storage_check_enabled($storecfg, $targetsid, 
$self->{node});
 
-           return if $scfg->{shared};
+               return if $scfg->{shared};
+           }
 
            $local_volumes->{$volid}->{ref} = $attr->{referenced_in_config} ? 
'config' : 'snapshot';
            $local_volumes->{$volid}->{ref} = 'storage' if $attr->{is_unused};
@@ -578,6 +785,9 @@ sub scan_local_volumes {
 
            my $migratable = $scfg->{type} =~ 
/^(?:dir|btrfs|zfspool|lvmthin|lvm)$/;
 
+           # TODO: what is this even here for?
+           $migratable = 1 if $self->{opts}->{remote};
+
            die "can't migrate '$volid' - storage type '$scfg->{type}' not 
supported\n"
                if !$migratable;
 
@@ -612,6 +822,10 @@ sub handle_replication {
     my $local_volumes = $self->{local_volumes};
 
     return if !$self->{replication_jobcfg};
+
+    die "can't migrate VM with replicated volumes to remote cluster/node\n"
+       if $self->{opts}->{remote};
+
     if ($self->{running}) {
 
        my $version = PVE::QemuServer::kvm_user_version();
@@ -709,26 +923,133 @@ sub sync_offline_local_volumes {
     my $opts = $self->{opts};
 
     $self->log('info', "copying local disk images") if scalar(@volids);
-
+    my $forwarded = 0;
     foreach my $volid (@volids) {
        my $targetsid = $local_volumes->{$volid}->{targetsid};
-       my $bwlimit = $local_volumes->{$volid}->{bwlimit};
-       $bwlimit = $bwlimit * 1024 if defined($bwlimit); # storage_migrate uses 
bps
-
-       my $storage_migrate_opts = {
-           'ratelimit_bps' => $bwlimit,
-           'insecure' => $opts->{migration_type} eq 'insecure',
-           'with_snapshots' => $local_volumes->{$volid}->{snapshots},
-           'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
-       };
 
-       my $logfunc = sub { $self->log('info', $_[0]); };
-       my $new_volid = eval {
-           PVE::Storage::storage_migrate($storecfg, $volid, $self->{ssh_info},
-                                         $targetsid, $storage_migrate_opts, 
$logfunc);
-       };
-       if (my $err = $@) {
-           die "storage migration for '$volid' to storage '$targetsid' failed 
- $err\n";
+       my $new_volid;
+       
+       my $opts = $self->{opts};
+       if (my $remote = $opts->{remote}) {
+           my $remote_vmid = $remote->{vmid};
+           my ($sid, undef) = PVE::Storage::parse_volume_id($volid);
+           my (undef, $name, undef, undef, undef, undef, $format) = 
PVE::Storage::parse_volname($storecfg, $volid);
+           my $scfg = PVE::Storage::storage_config($storecfg, $sid);
+           PVE::Storage::activate_volumes($storecfg, [$volid]);
+
+           # use 'migrate' limit for transfer to other node
+           my $bwlimit_opts = {
+               storage => $targetsid,
+               bwlimit => $opts->{bwlimit},
+           };
+           my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', 
[$sid], $opts->{bwlimit});
+           my $remote_bwlimit = $self->write_tunnel($self->{tunnel}, 10, 
'bwlimit', $bwlimit_opts);
+           $remote_bwlimit = $remote_bwlimit->{bwlimit};
+           if (defined($remote_bwlimit)) {
+               $bwlimit = $remote_bwlimit if !defined($bwlimit);
+               $bwlimit = $remote_bwlimit if $remote_bwlimit < $bwlimit;
+           }
+
+           # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+           $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+
+           my $with_snapshots = $local_volumes->{$volid}->{snapshots} ? 1 : 0;
+           my $snapshot;
+           if ($scfg->{type} eq 'zfspool') {
+               $snapshot = '__migration__';
+               $with_snapshots = 1;
+               PVE::Storage::volume_snapshot($storecfg, $volid, $snapshot);
+           }
+
+           if ($self->{vmid} != $remote_vmid) {
+               $name =~ s/-$self->{vmid}-/-$remote_vmid-/g;
+               $name =~ s/^$self->{vmid}\//$remote_vmid\//;
+           }
+
+           my @export_formats = PVE::Storage::volume_export_formats($storecfg, 
$volid, undef, undef, $with_snapshots);
+
+           my $storage_migrate_opts = {
+               format => $format,
+               storage => $targetsid,
+               'with-snapshots' => $with_snapshots,
+               'allow-rename' => !$local_volumes->{$volid}->{is_vmstate},
+               'export-formats' => @export_formats,
+               volname => $name,
+           };
+           my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk-import', 
$storage_migrate_opts);
+           my $local = "/run/qemu-server/$self->{vmid}.storage";
+           if (!$forwarded) {
+               $forward_unix_socket->($self, $local, $res->{socket});
+               $forwarded = 1;
+           }
+           my $socket = IO::Socket::UNIX->new(Peer => $local, Type => 
SOCK_STREAM())
+               or die "failed to connect to websocket tunnel at $local\n";
+           # we won't be reading from the socket
+           shutdown($socket, 0);
+           my $send = ['pvesm', 'export', $volid, $res->{format}, '-', 
'-with-snapshots', $with_snapshots];
+           push @$send, '-snapshot', $snapshot if $snapshot;
+
+           my @cstream;
+           if (defined($bwlimit)) {
+               @cstream = ([ '/usr/bin/cstream', '-t', $bwlimit ]);
+               $self->log('info', "using a bandwidth limit of $bwlimit bps for 
transferring '$volid'");
+           }
+
+           eval {
+               PVE::Tools::run_command(
+                   [$send, @cstream],
+                   output => '>&'.fileno($socket),
+                   errfunc => sub { my $line = shift; $self->log('warn', 
$line); },
+               );
+           };
+           my $send_error = $@;
+
+           # don't close the connection entirely otherwise the
+           # receiving end might not get all buffered data (and
+           # fails with 'connection reset by peer')
+           shutdown($socket, 1);
+
+           # wait for the remote process to finish
+           while ($res = $self->write_tunnel($self->{tunnel}, 10, 
'query-disk-import')) {
+               if ($res->{status} eq 'pending') {
+                   $self->log('info', "waiting for disk import to finish..\n");
+                   sleep(1)
+               } elsif ($res->{status} eq 'complete') {
+                   $new_volid = $res->{volid};
+                   last;
+               } else {
+                   die "unknown query-disk-import result: $res->{status}\n";
+               }
+           }
+
+           # now close the socket
+           close($socket);
+           die $send_error if $send_error;
+       } else {
+           my $bwlimit = $local_volumes->{$volid}->{bwlimit};
+           $bwlimit = $bwlimit * 1024 if defined($bwlimit); # storage_migrate 
uses bps
+
+           my $storage_migrate_opts = {
+               'ratelimit_bps' => $bwlimit,
+               'insecure' => $opts->{migration_type} eq 'insecure',
+               'with_snapshots' => $local_volumes->{$volid}->{snapshots},
+               'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
+           };
+
+           my $logfunc = sub { $self->log('info', $_[0]); };
+           $new_volid = eval {
+               PVE::Storage::storage_migrate(
+                   $storecfg,
+                   $volid,
+                   $self->{ssh_info},
+                   $targetsid,
+                   $storage_migrate_opts,
+                   $logfunc,
+               );
+           };
+           if (my $err = $@) {
+               die "storage migration for '$volid' to storage '$targetsid' 
failed - $err\n";
+           }
        }
 
        $self->{volume_map}->{$volid} = $new_volid;
@@ -744,6 +1065,12 @@ sub sync_offline_local_volumes {
 sub cleanup_remotedisks {
     my ($self) = @_;
 
+    if ($self->{opts}->{remote}) {
+       $self->finish_tunnel($self->{tunnel}, 1);
+       delete $self->{tunnel};
+       return;
+    }
+
     my $local_volumes = $self->{local_volumes};
 
     foreach my $volid (values %{$self->{volume_map}}) {
@@ -793,8 +1120,84 @@ sub phase1 {
     $self->handle_replication($vmid);
 
     $self->sync_offline_local_volumes();
+    $self->phase1_remote($vmid) if $self->{opts}->{remote};
 };
 
+sub phase1_remote {
+    my ($self, $vmid) = @_;
+
+    my $remote_conf = PVE::QemuConfig->load_config($vmid);
+    PVE::QemuConfig->update_volume_ids($remote_conf, $self->{volume_map});
+
+    # TODO: check bridge availability earlier?
+    my $bridgemap = $self->{opts}->{bridgemap};
+    foreach my $opt (keys %$remote_conf) {
+       next if $opt !~ m/^net\d+$/;
+
+       next if !$remote_conf->{$opt};
+       my $d = PVE::QemuServer::parse_net($remote_conf->{$opt});
+       next if !$d || !$d->{bridge};
+
+       my $target_bridge = PVE::QemuServer::map_id($bridgemap, $d->{bridge});
+       $self->log('info', "mapped: $opt from $d->{bridge} to $target_bridge");
+       $d->{bridge} = $target_bridge;
+       $remote_conf->{$opt} = PVE::QemuServer::print_net($d);
+    }
+
+    my @online_local_volumes = $self->filter_local_volumes('online');
+
+    my $storage_map = $self->{opts}->{storagemap};
+    $self->{nbd} = {};
+    PVE::QemuConfig->foreach_volume($remote_conf, sub {
+       my ($ds, $drive) = @_;
+
+       # TODO eject CDROM?
+       return if PVE::QemuServer::drive_is_cdrom($drive);
+
+       my $volid = $drive->{file};
+       return if !$volid;
+
+       return if !grep { $_ eq $volid} @online_local_volumes;
+
+       my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
+       my $scfg = PVE::Storage::storage_config($self->{storecfg}, $storeid);
+       my $source_format = PVE::QemuServer::qemu_img_format($scfg, $volname);
+
+       # set by target cluster
+       my $oldvolid = delete $drive->{file};
+       delete $drive->{format};
+
+       my $targetsid = PVE::QemuServer::map_id($storage_map, $storeid);
+
+       my $params = {
+           format => $source_format,
+           storage => $targetsid,
+           drive => $drive,
+       };
+
+       $self->log('info', "Allocating volume for drive '$ds' on remote storage 
'$targetsid'..");
+       my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk', $params);
+
+       $self->log('info', "volume '$oldvolid' os '$res->{volid}' on the 
target\n");
+       $remote_conf->{$ds} = $res->{drivestr};
+       $self->{nbd}->{$ds} = $res;
+    });
+
+    my $conf_str = PVE::QemuServer::write_vm_config("remote", $remote_conf);
+
+    # TODO expose in PVE::Firewall?
+    my $vm_fw_conf_path = "/etc/pve/firewall/$vmid.fw";
+    my $fw_conf_str;
+    $fw_conf_str = PVE::Tools::file_get_contents($vm_fw_conf_path)
+       if -e $vm_fw_conf_path;
+    my $params = {
+       conf => $conf_str,
+       'firewall-config' => $fw_conf_str,
+    };
+
+    $self->write_tunnel($self->{tunnel}, 10, 'config', $params);
+}
+
 sub phase1_cleanup {
     my ($self, $vmid, $err) = @_;
 
@@ -825,7 +1228,6 @@ sub phase2_start_local_cluster {
     my $local_volumes = $self->{local_volumes};
     my @online_local_volumes = $self->filter_local_volumes('online');
 
-    $self->{storage_migration} = 1 if scalar(@online_local_volumes);
     my $start = $params->{start_params};
     my $migrate = $params->{migrate_opts};
 
@@ -948,10 +1350,34 @@ sub phase2_start_local_cluster {
     return ($tunnel_info, $spice_port);
 }
 
+sub phase2_start_remote_cluster {
+    my ($self, $vmid, $params) = @_;
+
+    die "insecure migration to remote cluster not implemented\n"
+       if $params->{migrate_opts}->{type} ne 'websocket';
+
+    my $remote_vmid = $self->{opts}->{remote}->{vmid};
+
+    my $res = $self->write_tunnel($self->{tunnel}, 10, "start", $params);
+
+    foreach my $drive (keys %{$res->{drives}}) {
+       $self->{stopnbd} = 1;
+       $self->{target_drive}->{$drive}->{drivestr} = 
$res->{drives}->{$drive}->{drivestr};
+       my $nbd_uri = $res->{drives}->{$drive}->{nbd_uri};
+       die "unexpected NBD uri for '$drive': $nbd_uri\n"
+           if $nbd_uri !~ 
s!/run/qemu-server/$remote_vmid\_!/run/qemu-server/$vmid\_!;
+
+       $self->{target_drive}->{$drive}->{nbd_uri} = $nbd_uri;
+    }
+
+    return ($res->{migrate}, $res->{spice_port});
+}
+
 sub phase2 {
     my ($self, $vmid) = @_;
 
     my $conf = $self->{vmconf};
+    my $local_volumes = $self->{local_volumes};
 
     # version > 0 for unix socket support
     my $nbd_protocol_version = 1;
@@ -983,10 +1409,42 @@ sub phase2 {
        },
     };
 
-    my ($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, 
$params);
+    my ($tunnel_info, $spice_port);
+
+    my @online_local_volumes = $self->filter_local_volumes('online');
+    $self->{storage_migration} = 1 if scalar(@online_local_volumes);
+
+    if (my $remote = $self->{opts}->{remote}) {
+       my $remote_vmid = $remote->{vmid};
+       $params->{migrate_opts}->{remote_node} = $self->{node};
+       ($tunnel_info, $spice_port) = $self->phase2_start_remote_cluster($vmid, 
$params);
+       die "only UNIX sockets are supported for remote migration\n"
+           if $tunnel_info->{proto} ne 'unix';
+
+       my $forwarded = {};
+       my $remote_socket = $tunnel_info->{addr};
+       my $local_socket = $remote_socket;
+       $local_socket =~ s/$remote_vmid/$vmid/g;
+       $tunnel_info->{addr} = $local_socket;
+
+       $self->log('info', "Setting up tunnel for '$local_socket'");
+       $forward_unix_socket->($self, $local_socket, $remote_socket);
+       $forwarded->{$local_socket} = 1;
+
+       foreach my $remote_socket (@{$tunnel_info->{unix_sockets}}) {
+           my $local_socket = $remote_socket;
+           $local_socket =~ s/$remote_vmid/$vmid/g;
+           next if $forwarded->{$local_socket};
+           $self->log('info', "Setting up tunnel for '$local_socket'");
+           $forward_unix_socket->($self, $local_socket, $remote_socket);
+           $forwarded->{$local_socket} = 1;
+       }
+    } else {
+       ($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, 
$params);
 
-    $self->log('info', "start remote tunnel");
-    $self->start_remote_tunnel($tunnel_info);
+       $self->log('info', "start remote tunnel");
+       $self->start_remote_tunnel($tunnel_info);
+    }
 
     my $migrate_uri = "$tunnel_info->{proto}:$tunnel_info->{addr}";
     $migrate_uri .= ":$tunnel_info->{port}"
@@ -996,8 +1454,6 @@ sub phase2 {
        $self->{storage_migration_jobs} = {};
        $self->log('info', "starting storage migration");
 
-       my @online_local_volumes = $self->filter_local_volumes('online');
-
        die "The number of local disks does not match between the source and 
the destination.\n"
            if (scalar(keys %{$self->{target_drive}}) != 
scalar(@online_local_volumes));
        foreach my $drive (keys %{$self->{target_drive}}){
@@ -1070,7 +1526,7 @@ sub phase2 {
     };
     $self->log('info', "migrate-set-parameters error: $@") if $@;
 
-    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga})) {
+    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga} && 
!$self->{opts}->{remote})) {
        my $rpcenv = PVE::RPCEnvironment::get();
        my $authuser = $rpcenv->get_user();
 
@@ -1267,11 +1723,15 @@ sub phase2_cleanup {
 
     my $nodename = PVE::INotify::nodename();
 
-    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', 
'--migratedfrom', $nodename];
-    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) 
};
-    if (my $err = $@) {
-        $self->log('err', $err);
-        $self->{errors} = 1;
+    if ($self->{tunnel} && $self->{tunnel}->{version} >= 2) {
+       $self->write_tunnel($self->{tunnel}, 10, 'stop');
+    } else {
+       my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', 
'--migratedfrom', $nodename];
+       eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub 
{}) };
+       if (my $err = $@) {
+           $self->log('err', $err);
+           $self->{errors} = 1;
+       }
     }
 
     # cleanup after stopping, otherwise disks might be in-use by target VM!
@@ -1304,7 +1764,7 @@ sub phase3_cleanup {
 
     my $tunnel = $self->{tunnel};
 
-    if ($self->{volume_map}) {
+    if ($self->{volume_map} && !$self->{opts}->{remote}) {
        my $target_drives = $self->{target_drive};
 
        # FIXME: for NBD storage migration we now only update the volid, and
@@ -1321,26 +1781,33 @@ sub phase3_cleanup {
 
     # transfer replication state before move config
     $self->transfer_replication_state() if $self->{is_replicated};
-    PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
+    if (!$self->{opts}->{remote}) {
+       PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
+    }
     $self->switch_replication_job_target() if $self->{is_replicated};
 
     if ($self->{livemigration}) {
        if ($self->{stopnbd}) {
            $self->log('info', "stopping NBD storage migration server on 
target.");
            # stop nbd server on remote vm - requirement for resume since 2.9
-           my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
+           if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 2) {
+               $self->write_tunnel($tunnel, 30, 'nbdstop');
+           } else {
+               my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
 
-           eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => 
sub {}) };
-           if (my $err = $@) {
-               $self->log('err', $err);
-               $self->{errors} = 1;
+               eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc 
=> sub {}) };
+               if (my $err = $@) {
+                   $self->log('err', $err);
+                   $self->{errors} = 1;
+               }
            }
        }
 
        # config moved and nbd server stopped - now we can resume vm on target
        if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 1) {
+           my $cmd = $tunnel->{version} == 1 ? "resume $vmid" : "resume";
            eval {
-               $self->write_tunnel($tunnel, 30, "resume $vmid");
+               $self->write_tunnel($tunnel, 30, $cmd);
            };
            if (my $err = $@) {
                $self->log('err', $err);
@@ -1360,18 +1827,24 @@ sub phase3_cleanup {
        }
 
        if ($self->{storage_migration} && 
PVE::QemuServer::parse_guest_agent($conf)->{fstrim_cloned_disks} && 
$self->{running}) {
-           my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 
'fstrim'];
-           eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => 
sub {}) };
+           if ($self->{opts}->{remote}) {
+               $self->write_tunnel($self->{tunnel}, 600, 'fstrim');
+           } else {
+               my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 
'fstrim'];
+               eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc 
=> sub {}) };
+           }
        }
     }
 
     # close tunnel on successful migration, on error phase2_cleanup closed it
-    if ($tunnel) {
+    if ($tunnel && $tunnel->{version} == 1) {
        eval { finish_tunnel($self, $tunnel);  };
        if (my $err = $@) {
            $self->log('err', $err);
            $self->{errors} = 1;
        }
+       $tunnel = undef;
+       delete $self->{tunnel};
     }
 
     eval {
@@ -1409,6 +1882,9 @@ sub phase3_cleanup {
 
     # destroy local copies
     foreach my $volid (@not_replicated_volumes) {
+       # remote is cleaned up below
+       next if $self->{opts}->{remote};
+
        eval { PVE::Storage::vdisk_free($self->{storecfg}, $volid); };
        if (my $err = $@) {
            $self->log('err', "removing local copy of '$volid' failed - $err");
@@ -1418,8 +1894,19 @@ sub phase3_cleanup {
     }
 
     # clear migrate lock
-    my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
-    $self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
+    if ($tunnel && $tunnel->{version} >= 2) {
+       $self->write_tunnel($tunnel, 10, "unlock");
+
+       $self->finish_tunnel($tunnel);
+    } else {
+       my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
+       $self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
+    }
+
+    if ($self->{opts}->{remote} && $self->{opts}->{delete}) {
+       eval { PVE::QemuServer::destroy_vm($self->{storecfg}, $vmid, 1, undef, 
0) };
+       warn "Failed to remove source VM - $@\n" if $@;
+    }
 }
 
 sub final_cleanup {
diff --git a/PVE/QemuServer.pm b/PVE/QemuServer.pm
index d494cc0..bf05da2 100644
--- a/PVE/QemuServer.pm
+++ b/PVE/QemuServer.pm
@@ -5384,7 +5384,11 @@ sub vm_start_nolock {
     my $defaults = load_defaults();
 
     # set environment variable useful inside network script
-    $ENV{PVE_MIGRATED_FROM} = $migratedfrom if $migratedfrom;
+    if ($migrate_opts->{remote_node}) {
+       $ENV{PVE_MIGRATED_FROM} = $migrate_opts->{remote_node};
+    } elsif ($migratedfrom) {
+       $ENV{PVE_MIGRATED_FROM} = $migratedfrom;
+    }
 
     PVE::GuestHelpers::exec_hookscript($conf, $vmid, 'pre-start', 1);
 
@@ -5621,7 +5625,7 @@ sub vm_start_nolock {
 
        my $migrate_storage_uri;
        # nbd_protocol_version > 0 for unix socket support
-       if ($nbd_protocol_version > 0 && $migration_type eq 'secure') {
+       if ($nbd_protocol_version > 0 && ($migration_type eq 'secure' || 
$migration_type eq 'websocket')) {
            my $socket_path = "/run/qemu-server/$vmid\_nbd.migrate";
            mon_cmd($vmid, "nbd-server-start", addr => { type => 'unix', data 
=> { path => $socket_path } } );
            $migrate_storage_uri = "nbd:unix:$socket_path";
-- 
2.30.2



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

Reply via email to