From: "Dr. David Alan Gilbert" <dgilb...@redhat.com> Open a return path, and handle messages that are received upon it.
Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com> --- include/migration/migration.h | 10 +++ migration.c | 181 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 190 insertions(+), 1 deletion(-) diff --git a/include/migration/migration.h b/include/migration/migration.h index 12e640d..b87c289 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -47,6 +47,14 @@ enum mig_rpcomm_cmd { MIG_RPCOMM_ACK, /* data (seq: be32 ) */ MIG_RPCOMM_AFTERLASTVALID }; + +/* Source side RP state */ +struct MigrationRetPathState { + uint32_t latest_ack; + QemuThread rp_thread; + bool error; +}; + typedef struct MigrationState MigrationState; /* State for the incoming migration */ @@ -69,9 +77,11 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + QEMUFile *return_path; int state; MigrationParams params; + struct MigrationRetPathState rp_state; double mbps; int64_t total_time; int64_t downtime; diff --git a/migration.c b/migration.c index 5ba8f3e..ee6db1d 100644 --- a/migration.c +++ b/migration.c @@ -246,6 +246,23 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) return head; } +/* + * Return true if we're already in the middle of a migration + * (i.e. any of the active or setup states) + */ +static bool migration_already_active(MigrationState *ms) +{ + switch (ms->state) { + case MIG_STATE_ACTIVE: + case MIG_STATE_SETUP: + return true; + + default: + return false; + + } +} + static void get_xbzrle_cache_stats(MigrationInfo *info) { if (migrate_use_xbzrle()) { @@ -371,6 +388,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) } } +static void migrate_fd_cleanup_src_rp(MigrationState *ms) +{ + QEMUFile *rp = ms->return_path; + + /* + * When stuff goes wrong (e.g. failing destination) on the rp, it can get + * cleaned up from a few threads; make sure not to do it twice in parallel + */ + rp = atomic_cmpxchg(&ms->return_path, rp, NULL); + if (rp) { + DPRINTF("cleaning up return path\n"); + qemu_fclose(rp); + } +} + static void migrate_fd_cleanup(void *opaque) { MigrationState *s = opaque; @@ -378,6 +410,8 @@ static void migrate_fd_cleanup(void *opaque) qemu_bh_delete(s->cleanup_bh); s->cleanup_bh = NULL; + migrate_fd_cleanup_src_rp(s); + if (s->file) { trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) int old_state ; trace_migrate_fd_cancel(); + if (s->return_path) { + /* shutdown the rp socket, so causing the rp thread to shutdown */ + qemu_file_shutdown(s->return_path); + } + do { old_state = s->state; if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) { @@ -655,8 +694,148 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } -/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print something to indicate why + */ +static void source_return_path_bad(MigrationState *s) +{ + s->rp_state.error = true; + migrate_fd_cleanup_src_rp(s); +} +/* + * Handles messages sent on the return path towards the source VM + * + */ +static void *source_return_path_thread(void *opaque) +{ + MigrationState *ms = opaque; + QEMUFile *rp = ms->return_path; + uint16_t expected_len, header_len, header_com; + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32; + int res; + + DPRINTF("RP: %s entry", __func__); + while (rp && !qemu_file_get_error(rp) && + migration_already_active(ms)) { + DPRINTF("RP: %s top of loop", __func__); + header_com = qemu_get_be16(rp); + header_len = qemu_get_be16(rp); + + switch (header_com) { + case MIG_RPCOMM_SHUT: + case MIG_RPCOMM_ACK: + expected_len = 4; + break; + + default: + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", + header_com, header_len); + source_return_path_bad(ms); + goto out; + } + + if (header_len > expected_len) { + error_report("RP: Received command 0x%04x with" + "incorrect length %d expecting %d", + header_com, header_len, + expected_len); + source_return_path_bad(ms); + goto out; + } + + /* We know we've got a valid header by this point */ + res = qemu_get_buffer(rp, buf, header_len); + if (res != header_len) { + DPRINTF("RP: Failed to read command data"); + source_return_path_bad(ms); + goto out; + } + + /* OK, we have the command and the data */ + switch (header_com) { + case MIG_RPCOMM_SHUT: + tmp32 = be32_to_cpup((uint32_t *)buf); + if (tmp32) { + error_report("RP: Sibling indicated error %d", tmp32); + source_return_path_bad(ms); + } else { + DPRINTF("RP: SHUT received"); + } + /* + * We'll let the main thread deal with closing the RP + * we could do a shutdown(2) on it, but we're the only user + * anyway, so there's nothing gained. + */ + goto out; + + case MIG_RPCOMM_ACK: + tmp32 = be32_to_cpup((uint32_t *)buf); + DPRINTF("RP: Received ACK 0x%x", tmp32); + atomic_xchg(&ms->rp_state.latest_ack, tmp32); + break; + + default: + /* This shouldn't happen because we should catch this above */ + DPRINTF("RP: Bad header_com in dispatch"); + } + /* Latest command processed, now leave a gap for the next one */ + header_com = MIG_RPCOMM_INVALID; + } + if (rp && qemu_file_get_error(rp)) { + DPRINTF("%s: rp bad at end", __func__); + source_return_path_bad(ms); + } + + DPRINTF("%s: Bottom exit", __func__); + +out: + return NULL; +} + +__attribute__ (( unused )) /* Until later in patch series */ +static int open_outgoing_return_path(MigrationState *ms) +{ + + ms->return_path = qemu_file_get_return_path(ms->file); + if (!ms->return_path) { + return -1; + } + + DPRINTF("%s: starting thread", __func__); + qemu_thread_create(&ms->rp_state.rp_thread, "return path", + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); + + DPRINTF("%s: continuing", __func__); + + return 0; +} + +__attribute__ (( unused )) /* Until later in patch series */ +static void await_outgoing_return_path_close(MigrationState *ms) +{ + /* + * If this is a normal exit then the destination will send a SHUT and the + * rp_thread will exit, however if there's an error we need to cause + * it to exit, which we can do by a shutdown. + * (canceling must also shutdown to stop us getting stuck here if + * the destination died at just the wrong place) + */ + if (qemu_file_get_error(ms->file) && ms->return_path) { + qemu_file_shutdown(ms->return_path); + } + DPRINTF("%s: Joining", __func__); + qemu_thread_join(&ms->rp_state.rp_thread); + DPRINTF("%s: Exit", __func__); +} + +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque; -- 1.9.3