From: fengzhimin <fengzhim...@huawei.com> Send the RAM block through MultiRDMA channels for using rdma-pin-all option, and we choose the channel to send data through polling the MultiRDMA thread.
Signed-off-by: fengzhimin <fengzhim...@huawei.com> --- migration/rdma.c | 66 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/migration/rdma.c b/migration/rdma.c index 425dfa709d..36261f1fc8 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -427,6 +427,8 @@ typedef struct { QEMUFile *file; /* sem where to wait for more work */ QemuSemaphore sem; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; /* this mutex protects the following parameters */ QemuMutex mutex; /* is this channel thread running */ @@ -439,6 +441,8 @@ struct { MultiRDMASendParams *params; /* number of created threads */ int count; + /* index of current RDMA channels */ + int current_RDMA_index; /* this mutex protects the following parameters */ QemuMutex mutex_sync; /* number of registered multiRDMA channels */ @@ -2043,6 +2047,18 @@ static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, return 0; } +/* + * Get the multiRDMA channel used to send data. + */ +static int get_multiRDMA_channel(void) +{ + int thread_count = migrate_multiRDMA_channels(); + multiRDMA_send_state->current_RDMA_index++; + multiRDMA_send_state->current_RDMA_index %= thread_count; + + return multiRDMA_send_state->current_RDMA_index; +} + /* * Write an actual chunk of memory using RDMA. * @@ -2068,6 +2084,16 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, .repeat = 1, }; + if (migrate_use_multiRDMA() && + migrate_use_rdma_pin_all()) { + /* The multiRDMA threads only send ram block */ + if (strcmp(block->block_name, "mach-virt.ram") == 0) { + int channel = get_multiRDMA_channel(); + rdma = multiRDMA_send_state->params[channel].rdma; + block = &(rdma->local_ram_blocks.block[current_index]); + } + } + retry: sge.addr = (uintptr_t)(block->local_host_addr + (current_addr - block->offset)); @@ -2285,8 +2311,22 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) } if (ret == 0) { - rdma->nb_sent++; - trace_qemu_rdma_write_flush(rdma->nb_sent); + if (migrate_use_multiRDMA() && + migrate_use_rdma_pin_all()) { + /* The multiRDMA threads only send ram block */ + RDMALocalBlock *block = NULL; + block = &(rdma->local_ram_blocks.block[rdma->current_index]); + if (strcmp(block->block_name, "mach-virt.ram") == 0) { + int current_RDMA = multiRDMA_send_state->current_RDMA_index; + multiRDMA_send_state->params[current_RDMA].rdma->nb_sent++; + } else { + rdma->nb_sent++; + trace_qemu_rdma_write_flush(rdma->nb_sent); + } + } else { + rdma->nb_sent++; + trace_qemu_rdma_write_flush(rdma->nb_sent); + } } rdma->current_length = 0; @@ -4015,11 +4055,15 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); if (migrate_use_multiRDMA()) { - /* Inform src send_thread to send FINISHED signal */ + /* + * Inform src send_thread to send FINISHED signal. + * Wait for multiRDMA send threads to poll the CQE. + */ int i; int thread_count = migrate_multiRDMA_channels(); for (i = 0; i < thread_count; i++) { qemu_sem_post(&multiRDMA_send_state->params[i].sem); + qemu_sem_wait(&multiRDMA_send_state->params[i].sem_sync); } } @@ -4592,12 +4636,25 @@ static void *multiRDMA_send_thread(void *opaque) } qemu_mutex_unlock(&p->mutex); + /* To complete polling(CQE) */ + while (rdma->nb_sent) { + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); + if (ret < 0) { + error_report("multiRDMA migration: " + "complete polling error!"); + return NULL; + } + } + /* Send FINISHED to the destination */ head.type = RDMA_CONTROL_REGISTER_FINISHED; ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); if (ret < 0) { return NULL; } + + /* sync main thread */ + qemu_sem_post(&p->sem_sync); } qemu_mutex_lock(&p->mutex); @@ -4637,6 +4694,7 @@ static int multiRDMA_save_setup(const char *host_port, Error **errp) thread_count); atomic_set(&multiRDMA_send_state->count, 0); atomic_set(&multiRDMA_send_state->reg_mr_channels, 0); + atomic_set(&multiRDMA_send_state->current_RDMA_index, 0); qemu_mutex_init(&multiRDMA_send_state->mutex_sync); qemu_sem_init(&multiRDMA_send_state->sem_sync, 0); @@ -4665,6 +4723,7 @@ static int multiRDMA_save_setup(const char *host_port, Error **errp) f = qemu_fopen_rdma(multiRDMA_send_state->params[i].rdma, "wb"); qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); p->quit = false; p->id = i; p->running = true; @@ -4730,6 +4789,7 @@ int multiRDMA_save_cleanup(void) MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; qemu_rdma_cleanup(multiRDMA_send_state->params[i].rdma); -- 2.19.1