Migration ends correctly, but there is still a race between clean up and last synchronization.
Signed-off-by: Juan Quintela <quint...@redhat.com> --- migration/ram.c | 132 ++++++++++++++++++++++++++++++++++++++++++++++--- migration/trace-events | 3 +- 2 files changed, 126 insertions(+), 9 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 264d2e462a..577b448db3 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -398,6 +398,19 @@ static void compress_threads_save_setup(void) /* Multiple fd's */ +#define MULTIFD_MAGIC 0x112233d +#define MULTIFD_VERSION 1 + +typedef struct { + uint32_t magic; + uint32_t version; + uint32_t size; + uint32_t used; + uint32_t seq; + char ramblock[256]; + ram_addr_t offset[]; +} __attribute__((packed)) MultiFDPacket_t; + typedef struct { /* number of used pages */ uint32_t used; @@ -407,6 +420,8 @@ typedef struct { uint32_t seq; struct iovec *iov; RAMBlock *block; + uint32_t packet_len; + MultiFDPacket_t *packet; } multifd_pages_t; struct MultiFDSendParams { @@ -447,6 +462,8 @@ static void multifd_pages_init(multifd_pages_t **ppages, size_t size) pages->allocated = size; pages->iov = g_new0(struct iovec, size); + pages->packet_len = sizeof(MultiFDPacket_t) + sizeof(ram_addr_t) * size; + pages->packet = g_malloc0(pages->packet_len); *ppages = pages; } @@ -458,6 +475,9 @@ static void multifd_pages_clear(multifd_pages_t *pages) pages->block = NULL; g_free(pages->iov); pages->iov = NULL; + pages->packet_len = 0; + g_free(pages->packet); + pages->packet = NULL; g_free(pages); } @@ -499,7 +519,6 @@ int multifd_save_cleanup(Error **errp) if (p->running) { qemu_thread_join(&p->thread); - p->running = false; } socket_send_channel_destroy(p->c); p->c = NULL; @@ -535,7 +554,16 @@ static void multifd_send_sync_main(void) qemu_sem_post(&p->sem); } for (i = 0; i < migrate_multifd_channels(); i++) { - qemu_sem_wait(&multifd_send_state->sem_main); + MultiFDSendParams *p = &multifd_send_state->params[i]; + bool wait = true; + + qemu_mutex_lock(&p->mutex); + wait = p->running; + qemu_mutex_unlock(&p->mutex); + + if (wait) { + qemu_sem_wait(&multifd_send_state->sem_main); + } } trace_multifd_send_sync_main(); } @@ -575,16 +603,37 @@ static void *multifd_send_thread(void *opaque) continue; } if (p->quit) { + p->running = false; qemu_mutex_unlock(&p->mutex); break; } if (p->pages->used) { + MultiFDPacket_t *packet = p->pages->packet; + Error *local_err = NULL; + size_t ret; + + packet->used = p->pages->used; p->pages->used = 0; qemu_mutex_unlock(&p->mutex); + packet->magic = MULTIFD_MAGIC; + packet->version = MULTIFD_VERSION; - trace_multifd_send(p->id, p->pages->seq, p->pages->used); - /* ToDo: send page here */ - + strncpy(packet->ramblock, p->pages->block->idstr, 256); + packet->size = migrate_multifd_page_count(); + packet->seq = p->pages->seq; + ret = qio_channel_write_all(p->c, (void *)packet, + p->pages->packet_len, &local_err); + if (ret != 0) { + terminate_multifd_send_threads(local_err); + return NULL; + } + trace_multifd_send(p->id, p->pages->seq, packet->used); + ret = qio_channel_writev_all(p->c, p->pages->iov, + packet->used, &local_err); + if (ret != 0) { + terminate_multifd_send_threads(local_err); + return NULL; + } qemu_mutex_lock(&multifd_send_state->mutex); p->done = true; p->packets_sent++; @@ -763,7 +812,6 @@ int multifd_load_cleanup(Error **errp) if (p->running) { qemu_thread_join(&p->thread); - p->running = false; } socket_recv_channel_unref(p->c); p->c = NULL; @@ -801,17 +849,48 @@ static void multifd_recv_sync_main(void) qemu_sem_post(&p->sem); } for (i = 0; i < migrate_multifd_channels(); i++) { - qemu_sem_wait(&multifd_recv_state->sem_main); + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + bool wait = true; + + qemu_mutex_lock(&p->mutex); + wait = p->running && !p->quit; + qemu_mutex_unlock(&p->mutex); + + if (wait) { + qemu_sem_wait(&multifd_recv_state->sem_main); + } } trace_multifd_recv_sync_main(); } +static gboolean recv_channel_ready(QIOChannel *ioc, + GIOCondition condition, + gpointer opaque) +{ + MultiFDRecvParams *p = opaque; + + if (condition != G_IO_IN) { + return G_SOURCE_REMOVE; + } + + qemu_mutex_lock(&p->mutex); + p->done = false; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + + return G_SOURCE_CONTINUE; + +} + static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; trace_multifd_recv_thread_start(p->id); + qio_channel_add_watch(p->c, G_IO_IN | G_IO_HUP | G_IO_ERR, + recv_channel_ready, p, NULL); + while (true) { qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); @@ -821,14 +900,50 @@ static void *multifd_recv_thread(void *opaque) qemu_sem_post(&multifd_recv_state->sem_main); continue; } + if (!p->done) { + MultiFDPacket_t *packet = p->pages->packet; + RAMBlock *block; + Error *local_err = NULL; + size_t ret; + int i; + + qemu_mutex_unlock(&p->mutex); + + ret = qio_channel_read_all(p->c, (void *)packet, + p->pages->packet_len, &local_err); + if (ret != 0) { + terminate_multifd_recv_threads(local_err); + return NULL; + } + block = qemu_ram_block_by_name(packet->ramblock); + p->pages->seq = packet->seq; + for (i = 0; i < packet->used; i++) { + p->pages->iov[i].iov_base = block->host + packet->offset[i]; + p->pages->iov[i].iov_len = TARGET_PAGE_SIZE; + } + trace_multifd_recv(p->id, p->pages->seq, packet->used); + ret = qio_channel_readv_all(p->c, p->pages->iov, + packet->used, &local_err); + if (ret != 0) { + terminate_multifd_recv_threads(local_err); + return NULL; + } + qemu_mutex_lock(&p->mutex); + p->done = true; + p->packets_recv++; + qemu_mutex_unlock(&p->mutex); + + continue; + } if (p->quit) { + p->running = false; qemu_mutex_unlock(&p->mutex); break; } qemu_mutex_unlock(&p->mutex); } - trace_multifd_recv_thread_end(p->id); + trace_multifd_recv_thread_end(p->id, p->packets_recv); return NULL; } @@ -854,6 +969,7 @@ int multifd_load_setup(void) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); p->quit = false; + p->done = true; p->id = i; p->name = g_strdup_printf("multifdrecv_%d", i); multifd_pages_init(&p->pages, migrate_multifd_page_count()); diff --git a/migration/trace-events b/migration/trace-events index f6ab2c7bcb..e9f1aae985 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -82,8 +82,9 @@ multifd_recv_sync_main(void) "" multifd_send_thread_start(int id) "%d" multifd_send_thread_end(char id, uint32_t packets) "channel %d packets %d" multifd_recv_thread_start(int id) "%d" -multifd_recv_thread_end(int id) "%d" +multifd_recv_thread_end(char id, uint32_t packets) "channel %d packets %d" multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d" +multifd_recv(char id, int seq, int num) "channel %d sequence %d num pages %d" # migration/migration.c await_return_path_close_on_source_close(void) "" -- 2.14.3