Signed-off-by: Zhimin Feng <fengzhim...@huawei.com> Signed-off-by: Chuan Zheng <zhengch...@huawei.com> --- migration/multifd.c | 3 ++ migration/rdma.c | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 2 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c index 9439b3c..c4d90ef 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -534,6 +534,9 @@ void multifd_send_terminate_threads(Error *err) qemu_mutex_lock(&p->mutex); p->quit = true; qemu_sem_post(&p->sem); + if (migrate_use_rdma()) { + qemu_sem_post(&p->sem_sync); + } qemu_mutex_unlock(&p->mutex); } } diff --git a/migration/rdma.c b/migration/rdma.c index a366849..3210e6e 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -3837,6 +3837,19 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) return rdma_block_notification_handle(opaque, data); case RAM_CONTROL_HOOK: + if (migrate_use_multifd()) { + int i; + MultiFDRecvParams *multifd_recv_param = NULL; + int thread_count = migrate_multifd_channels(); + /* Inform dest recv_thread to poll */ + for (i = 0; i < thread_count; i++) { + if (get_multifd_recv_param(i, &multifd_recv_param)) { + return -1; + } + qemu_sem_post(&multifd_recv_param->sem_sync); + } + } + return qemu_rdma_registration_handle(f, opaque); default: @@ -3909,6 +3922,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; trace_qemu_rdma_registration_stop_ram(); + if (migrate_use_multifd()) { + /* + * Inform the multifd channels to register memory + */ + int i; + int thread_count = migrate_multifd_channels(); + MultiFDSendParams *multifd_send_param = NULL; + for (i = 0; i < thread_count; i++) { + ret = get_multifd_send_param(i, &multifd_send_param); + if (ret) { + error_report("rdma: error getting multifd(%d)", i); + return ret; + } + + qemu_sem_post(&multifd_send_param->sem_sync); + } + } + /* * Make sure that we parallelize the pinning on both sides. * For very large guests, doing this serially takes a really @@ -3967,6 +3998,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, rdma->dest_blocks[i].remote_host_addr; local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; } + /* Wait for all multifd channels to complete registration */ + if (migrate_use_multifd()) { + int i; + int thread_count = migrate_multifd_channels(); + MultiFDSendParams *multifd_send_param = NULL; + for (i = 0; i < thread_count; i++) { + ret = get_multifd_send_param(i, &multifd_send_param); + if (ret) { + error_report("rdma: error getting multifd(%d)", i); + return ret; + } + + qemu_sem_wait(&multifd_send_param->sem); + } + } } trace_qemu_rdma_registration_stop(flags); @@ -3978,6 +4024,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, goto err; } + if (migrate_use_multifd()) { + /* + * Inform src send_thread to send FINISHED signal. + * Wait for multifd RDMA send threads to poll the CQE. + */ + int i; + int thread_count = migrate_multifd_channels(); + MultiFDSendParams *multifd_send_param = NULL; + for (i = 0; i < thread_count; i++) { + ret = get_multifd_send_param(i, &multifd_send_param); + if (ret < 0) { + goto err; + } + + qemu_sem_post(&multifd_send_param->sem_sync); + } + } + return 0; err: rdma->error_state = ret; @@ -4355,20 +4419,39 @@ static void *multifd_rdma_send_thread(void *opaque) { MultiFDSendParams *p = opaque; Error *local_err = NULL; + int ret = 0; + RDMAControlHeader head = { .len = 0, .repeat = 1 }; trace_multifd_send_thread_start(p->id); if (multifd_send_initial_packet(p, &local_err) < 0) { goto out; } + /* wait for semaphore notification to register memory */ + qemu_sem_wait(&p->sem_sync); + if (qemu_rdma_registration(p->rdma) < 0) { + goto out; + } + /* + * Inform the main RDMA thread to run when multifd + * RDMA thread have completed registration. + */ + qemu_sem_post(&p->sem); while (true) { + qemu_sem_wait(&p->sem_sync); qemu_mutex_lock(&p->mutex); if (p->quit) { qemu_mutex_unlock(&p->mutex); break; } qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); + + /* Send FINISHED to the destination */ + head.type = RDMA_CONTROL_REGISTER_FINISHED; + ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL); + if (ret < 0) { + return NULL; + } } out: @@ -4406,15 +4489,22 @@ static void multifd_rdma_send_channel_setup(MultiFDSendParams *p) static void *multifd_rdma_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; + int ret = 0; while (true) { + qemu_sem_wait(&p->sem_sync); + qemu_mutex_lock(&p->mutex); if (p->quit) { qemu_mutex_unlock(&p->mutex); break; } qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem_sync); + ret = qemu_rdma_registration_handle(p->file, p->c); + if (ret < 0) { + qemu_file_set_error(p->file, ret); + break; + } } qemu_mutex_lock(&p->mutex); -- 1.8.3.1