* David Gibson (da...@gibson.dropbear.id.au) wrote: > On Fri, Mar 20, 2015 at 06:17:31PM +0000, Dr. David Alan Gilbert wrote: > > * David Gibson (da...@gibson.dropbear.id.au) wrote: > > > On Wed, Feb 25, 2015 at 04:51:35PM +0000, Dr. David Alan Gilbert (git) > > > wrote: > > > > From: "Dr. David Alan Gilbert" <dgilb...@redhat.com> > > > > > > > > Open a return path, and handle messages that are received upon it. > > > > > > > > Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com> > > > > --- > > > > include/migration/migration.h | 8 ++ > > > > migration/migration.c | 178 > > > > +++++++++++++++++++++++++++++++++++++++++- > > > > trace-events | 13 +++ > > > > 3 files changed, 198 insertions(+), 1 deletion(-) > > > > > > > > diff --git a/include/migration/migration.h > > > > b/include/migration/migration.h > > > > index 6775747..5242ead 100644 > > > > --- a/include/migration/migration.h > > > > +++ b/include/migration/migration.h > > > > @@ -73,6 +73,14 @@ struct MigrationState > > > > > > > > int state; > > > > MigrationParams params; > > > > + > > > > + /* State related to return path */ > > > > + struct { > > > > + QEMUFile *file; > > > > + QemuThread rp_thread; > > > > + bool error; > > > > + } rp_state; > > > > + > > > > double mbps; > > > > int64_t total_time; > > > > int64_t downtime; > > > > diff --git a/migration/migration.c b/migration/migration.c > > > > index 80d234c..34cd4fe 100644 > > > > --- a/migration/migration.c > > > > +++ b/migration/migration.c > > > > @@ -237,6 +237,23 @@ MigrationCapabilityStatusList > > > > *qmp_query_migrate_capabilities(Error **errp) > > > > return head; > > > > } > > > > > > > > +/* > > > > + * Return true if we're already in the middle of a migration > > > > + * (i.e. any of the active or setup states) > > > > + */ > > > > +static bool migration_already_active(MigrationState *ms) > > > > +{ > > > > + switch (ms->state) { > > > > + case MIG_STATE_ACTIVE: > > > > + case MIG_STATE_SETUP: > > > > + return true; > > > > + > > > > + default: > > > > + return false; > > > > + > > > > + } > > > > +} > > > > + > > > > static void get_xbzrle_cache_stats(MigrationInfo *info) > > > > { > > > > if (migrate_use_xbzrle()) { > > > > @@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s, > > > > int old_state, int new_state) > > > > } > > > > } > > > > > > > > +static void migrate_fd_cleanup_src_rp(MigrationState *ms) > > > > +{ > > > > + QEMUFile *rp = ms->rp_state.file; > > > > + > > > > + /* > > > > + * When stuff goes wrong (e.g. failing destination) on the rp, it > > > > can get > > > > + * cleaned up from a few threads; make sure not to do it twice in > > > > parallel > > > > + */ > > > > + rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL); > > > > > > A cmpxchg seems dangerously subtle for such a basic and infrequent > > > operation, but ok. > > > > I'll take other suggestions; but I'm trying to just do > > 'if the qemu_file still exists close it', and it didn't seem > > worth introducing another state variable to atomically update > > when we've already got the file pointer itself. > > Yes, I see the rationale. My concern is just that the more atomicity > mechanisms are scattered through the code, the harder it is to analyze > and be sure you haven't missed race cases (or introduced then with a > future change). > > In short, I prefer to see a simple-as-possible, and preferably > documented, consistent overall concurrency scheme for a data > structure, rather than scattered atomic ops for various variable where > it's difficult to see how all the pieces might relate together. > > > > > + if (rp) { > > > > + trace_migrate_fd_cleanup_src_rp(); > > > > + qemu_fclose(rp); > > > > + } > > > > +} > > > > + > > > > static void migrate_fd_cleanup(void *opaque) > > > > { > > > > MigrationState *s = opaque; > > > > @@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque) > > > > qemu_bh_delete(s->cleanup_bh); > > > > s->cleanup_bh = NULL; > > > > > > > > + migrate_fd_cleanup_src_rp(s); > > > > + > > > > if (s->file) { > > > > trace_migrate_fd_cleanup(); > > > > qemu_mutex_unlock_iothread(); > > > > @@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s) > > > > QEMUFile *f = migrate_get_current()->file; > > > > trace_migrate_fd_cancel(); > > > > > > > > + if (s->rp_state.file) { > > > > + /* shutdown the rp socket, so causing the rp thread to > > > > shutdown */ > > > > + qemu_file_shutdown(s->rp_state.file); > > > > > > I missed where qemu_file_shutdown() was implemented. Does this > > > introduce a leftover socket dependency? > > > > No, it shouldn't. The shutdown() causes a shutdown(2) syscall to > > be issued on the socket stopping anything blocking on it; it then > > gets closed at the end after the rp thread has exited. > > > Sorry, that's not what I meant. I mean is this a hole in the > abstraction of the QemuFile, because it assumes that what you're > dealing with here is indeed a socket, rather than something else?
It's just a dependency that we have a shutdown method on the qemu_file we're using; if it's not a socket then whatever it is, if we're going to use it for a rp then it needs to implement something equivalent. > > > > + } > > > > + > > > > do { > > > > old_state = s->state; > > > > if (old_state != MIG_STATE_SETUP && old_state != > > > > MIG_STATE_ACTIVE) { > > > > @@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void) > > > > return s->xbzrle_cache_size; > > > > } > > > > > > > > -/* migration thread support */ > > > > +/* > > > > + * Something bad happened to the RP stream, mark an error > > > > + * The caller shall print something to indicate why > > > > + */ > > > > +static void source_return_path_bad(MigrationState *s) > > > > +{ > > > > + s->rp_state.error = true; > > > > + migrate_fd_cleanup_src_rp(s); > > > > +} > > > > + > > > > +/* > > > > + * Handles messages sent on the return path towards the source VM > > > > + * > > > > + */ > > > > +static void *source_return_path_thread(void *opaque) > > > > +{ > > > > + MigrationState *ms = opaque; > > > > + QEMUFile *rp = ms->rp_state.file; > > > > + uint16_t expected_len, header_len, header_com; > > > > + const int max_len = 512; > > > > + uint8_t buf[max_len]; > > > > + uint32_t tmp32; > > > > + int res; > > > > + > > > > + trace_source_return_path_thread_entry(); > > > > + while (rp && !qemu_file_get_error(rp) && > > > > + migration_already_active(ms)) { > > > > + trace_source_return_path_thread_loop_top(); > > > > + header_com = qemu_get_be16(rp); > > > > + header_len = qemu_get_be16(rp); > > > > + > > > > + switch (header_com) { > > > > + case MIG_RP_CMD_SHUT: > > > > + case MIG_RP_CMD_PONG: > > > > + expected_len = 4; > > > > > > Could the knowledge of expected lengths be folded into the switch > > > below? Switching twice on the same thing is a bit icky. > > > > No, because the length at this point is used to valdiate the > > length field in the header prior to reading the body. > > The other switch processes the contents of the body that > > have been read. > > Ok. > > > > > + break; > > > > + > > > > + default: > > > > + error_report("RP: Received invalid cmd 0x%04x length > > > > 0x%04x", > > > > + header_com, header_len); > > > > + source_return_path_bad(ms); > > > > + goto out; > > > > + } > > > > > > > > + if (header_len > expected_len) { > > > > + error_report("RP: Received command 0x%04x with" > > > > + "incorrect length %d expecting %d", > > > > + header_com, header_len, > > > > + expected_len); > > > > + source_return_path_bad(ms); > > > > + goto out; > > > > + } > > > > + > > > > + /* We know we've got a valid header by this point */ > > > > + res = qemu_get_buffer(rp, buf, header_len); > > > > + if (res != header_len) { > > > > + trace_source_return_path_thread_failed_read_cmd_data(); > > > > + source_return_path_bad(ms); > > > > + goto out; > > > > + } > > > > + > > > > + /* OK, we have the command and the data */ > > > > + switch (header_com) { > > > > + case MIG_RP_CMD_SHUT: > > > > + tmp32 = be32_to_cpup((uint32_t *)buf); > > > > + trace_source_return_path_thread_shut(tmp32); > > > > + if (tmp32) { > > > > + error_report("RP: Sibling indicated error %d", tmp32); > > > > + source_return_path_bad(ms); > > > > + } > > > > + /* > > > > + * We'll let the main thread deal with closing the RP > > > > + * we could do a shutdown(2) on it, but we're the only user > > > > + * anyway, so there's nothing gained. > > > > + */ > > > > + goto out; > > > > + > > > > + case MIG_RP_CMD_PONG: > > > > + tmp32 = be32_to_cpup((uint32_t *)buf); > > > > + trace_source_return_path_thread_pong(tmp32); > > > > + break; > > > > + > > > > + default: > > > > + /* This shouldn't happen because we should catch this > > > > above */ > > > > + trace_source_return_path_bad_header_com(); > > > > + } > > > > + /* Latest command processed, now leave a gap for the next one > > > > */ > > > > + header_com = MIG_RP_CMD_INVALID; > > > > > > This assignment will always get overwritten. > > > > Thanks; gone - it's a left over from an old version. > > > > > > + } > > > > + if (rp && qemu_file_get_error(rp)) { > > > > + trace_source_return_path_thread_bad_end(); > > > > + source_return_path_bad(ms); > > > > + } > > > > + > > > > + trace_source_return_path_thread_end(); > > > > +out: > > > > + return NULL; > > > > +} > > > > + > > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > > +static int open_outgoing_return_path(MigrationState *ms) > > > > > > Uh.. surely this should be open_incoming_return_path(); it's designed > > > to be used on the source side, AFAICT. > > > > > > > +{ > > > > + > > > > + ms->rp_state.file = qemu_file_get_return_path(ms->file); > > > > + if (!ms->rp_state.file) { > > > > + return -1; > > > > + } > > > > + > > > > + trace_open_outgoing_return_path(); > > > > + qemu_thread_create(&ms->rp_state.rp_thread, "return path", > > > > + source_return_path_thread, ms, > > > > QEMU_THREAD_JOINABLE); > > > > + > > > > + trace_open_outgoing_return_path_continue(); > > > > + > > > > + return 0; > > > > +} > > > > + > > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > > +static void await_outgoing_return_path_close(MigrationState *ms) > > > > > > Likewise "incoming" here, surely. > > > > I've changed those two to open_source_return_path() which seems less > > ambiguous; > > that OK? > > Uh.. not really, it just moves the ambiguity to a different place (is > "source return path" the return path *on* the source or *to* the > source). > > Perhaps "open_return_path_on_source" and > "await_return_path_close_on_source"? I'm not particularly fond of > those, but they're the best I've come up with yet. Done. Dave > > -- > David Gibson | I'll have my music baroque, and my code > david AT gibson.dropbear.id.au | minimalist, thank you. NOT _the_ > _other_ > | _way_ _around_! > http://www.ozlabs.org/~dgibson -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK