This makes it clear that no thread handles any incoming message until all threads have been created.
Signed-off-by: Juan Quintela <quint...@redhat.com> --- migration/ram.c | 24 ++++++++++++++++++++++-- migration/trace-events | 1 + 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 4a6ae677a9..f1aec95f83 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -702,6 +702,8 @@ typedef struct { uint64_t num_pages; /* syncs main thread and channels */ QemuSemaphore sem_sync; + /* thread can continue */ + QemuSemaphore can_start; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -1313,6 +1315,7 @@ int multifd_load_cleanup(Error **errp) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem_sync); + qemu_sem_destroy(&p->can_start); g_free(p->name); p->name = NULL; multifd_pages_clear(p->pages); @@ -1366,6 +1369,9 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); rcu_register_thread(); + qemu_sem_wait(&p->can_start); + trace_multifd_recv_thread_can_start(p->id); + while (true) { uint32_t used; uint32_t flags; @@ -1445,6 +1451,7 @@ int multifd_load_setup(void) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem_sync, 0); + qemu_sem_init(&p->can_start, 0); p->quit = false; p->id = i; p->pages = multifd_pages_init(page_count); @@ -1477,6 +1484,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) { MultiFDRecvParams *p; Error *local_err = NULL; + bool last_one; int id; id = multifd_recv_initial_packet(ioc, &local_err); @@ -1506,8 +1514,20 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, QEMU_THREAD_JOINABLE); atomic_inc(&multifd_recv_state->count); - return atomic_read(&multifd_recv_state->count) == - migrate_multifd_channels(); + + last_one = atomic_read(&multifd_recv_state->count) + == migrate_multifd_channels(); + + if (last_one) { + int i; + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_sem_post(&p->can_start); + } + } + return last_one; } /** diff --git a/migration/trace-events b/migration/trace-events index dd13a5c4b1..9fbef614ab 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -86,6 +86,7 @@ multifd_recv_sync_main(long packet_num) "packet num %ld" multifd_recv_sync_main_signal(uint8_t id) "channel %d" multifd_recv_sync_main_wait(uint8_t id) "channel %d" multifd_recv_terminate_threads(bool error) "error %d" +multifd_recv_thread_can_start(uint8_t id) "channel %d" multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 multifd_recv_thread_start(uint8_t id) "%d" multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x next packet size %d" -- 2.21.0