From: fengzhimin <fengzhim...@huawei.com> In both sides. We still don't transmit anything through them, and we only build the RDMA connections.
Signed-off-by: fengzhimin <fengzhim...@huawei.com> --- migration/rdma.c | 253 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 223 insertions(+), 30 deletions(-) diff --git a/migration/rdma.c b/migration/rdma.c index 992e5abfed..5b780bef36 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -403,6 +403,10 @@ typedef struct { char *name; /* channel thread id */ QemuThread thread; + /* RDMAContext channel */ + RDMAContext *rdma; + /* communication channel */ + QEMUFile *file; /* sem where to wait for more work */ QemuSemaphore sem; /* this mutex protects the following parameters */ @@ -429,6 +433,10 @@ typedef struct { char *name; /* channel thread id */ QemuThread thread; + /* RDMAContext channel */ + RDMAContext *rdma; + /* communication channel */ + QEMUFile *file; /* sem where to wait for more work */ QemuSemaphore sem; /* this mutex protects the following parameters */ @@ -3417,6 +3425,27 @@ static int qemu_rdma_accept(RDMAContext *rdma) qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, NULL, (void *)(intptr_t)rdma->return_path); + } else if (migrate_use_multiRDMA()) { + int thread_count; + int i; + RDMAContext *multi_rdma = NULL; + thread_count = migrate_multiRDMA_channels(); + /* create the multi Thread RDMA channels */ + for (i = 0; i < thread_count; i++) { + if (multiRDMA_recv_state->params[i].rdma->cm_id == NULL) { + multi_rdma = multiRDMA_recv_state->params[i].rdma; + break; + } + } + + if (multi_rdma) { + qemu_set_fd_handler(rdma->channel->fd, + rdma_accept_incoming_migration, + NULL, (void *)(intptr_t)multi_rdma); + } else { + qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, + NULL, rdma); + } } else { qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, NULL, rdma); @@ -4029,6 +4058,58 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) return rioc->file; } +static void *multiRDMA_recv_thread(void *opaque) +{ + MultiRDMARecvParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} + +static void multiRDMA_recv_new_channel(QEMUFile *f, int id) +{ + MultiRDMARecvParams *p; + Error *local_err = NULL; + + p = &multiRDMA_recv_state->params[id]; + if (p->file != NULL) { + error_setg(&local_err, + "multiRDMA: received id '%d' already setup'", id); + return ; + } + p->file = f; + + qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, p, + QEMU_THREAD_JOINABLE); + atomic_inc(&multiRDMA_recv_state->count); +} + +static void migration_multiRDMA_process_incoming(QEMUFile *f, RDMAContext *rdma) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + + if (!mis->from_src_file) { + rdma->migration_started_on_destination = 1; + migration_incoming_setup(f); + migration_incoming_process(); + } else { + multiRDMA_recv_new_channel(f, multiRDMA_recv_state->count); + } +} + static void rdma_accept_incoming_migration(void *opaque) { RDMAContext *rdma = opaque; @@ -4057,29 +4138,13 @@ static void rdma_accept_incoming_migration(void *opaque) return; } - rdma->migration_started_on_destination = 1; - migration_fd_process_incoming(f); -} - -static void *multiRDMA_recv_thread(void *opaque) -{ - MultiRDMARecvParams *p = opaque; - - while (true) { - qemu_mutex_lock(&p->mutex); - if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } - qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); + if (migrate_use_multiRDMA()) { + /* build the multiRDMA channels */ + migration_multiRDMA_process_incoming(f, rdma); + } else { + rdma->migration_started_on_destination = 1; + migration_fd_process_incoming(f); } - - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - return NULL; } static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma, @@ -4087,6 +4152,7 @@ static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma, { uint8_t i; int thread_count; + int idx; thread_count = migrate_multiRDMA_channels(); if (multiRDMA_recv_state == NULL) { @@ -4099,15 +4165,21 @@ static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma, for (i = 0; i < thread_count; i++) { MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i]; + p->rdma = qemu_rdma_data_init(host_port, errp); + for (idx = 0; idx < RDMA_WRID_MAX; idx++) { + p->rdma->wr_data[idx].control_len = 0; + p->rdma->wr_data[idx].control_curr = NULL; + } + /* the CM channel and CM id is shared */ + p->rdma->channel = rdma->channel; + p->rdma->listen_id = rdma->listen_id; + qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); p->quit = false; p->id = i; p->running = true; p->name = g_strdup_printf("multiRDMARecv_%d", i); - qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, - p, QEMU_THREAD_JOINABLE); - atomic_inc(&multiRDMA_recv_state->count); } } @@ -4155,6 +4227,7 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp) qemu_rdma_return_path_dest_init(rdma_return_path, rdma); } + /* initialize the RDMAContext for multiRDMA */ if (migrate_use_multiRDMA()) { if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) { ERROR(errp, "init multiRDMA failure!"); @@ -4193,10 +4266,29 @@ static void *multiRDMA_send_thread(void *opaque) return NULL; } +static void multiRDMA_send_new_channel(QEMUFile *f, int id) +{ + MultiRDMASendParams *p; + Error *local_err = NULL; + + p = &multiRDMA_send_state->params[id]; + if (p->file != NULL) { + error_setg(&local_err, + "multiRDMA: send id '%d' already setup'", id); + return ; + } + p->file = f; + + qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, + p, QEMU_THREAD_JOINABLE); + atomic_inc(&multiRDMA_send_state->count); +} + static int multiRDMA_save_setup(const char *host_port, Error **errp) { int thread_count; uint8_t i; + int ret; thread_count = migrate_multiRDMA_channels(); multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state)); @@ -4207,6 +4299,27 @@ static int multiRDMA_save_setup(const char *host_port, Error **errp) for (i = 0; i < thread_count; i++) { MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; + QEMUFile *f = NULL; + + p->rdma = qemu_rdma_data_init(host_port, errp); + if (p->rdma == NULL) { + ERROR(errp, "init RDMA data failure for multi channel %d!", i); + goto err; + } + + ret = qemu_rdma_source_init(p->rdma, migrate_use_rdma_pin_all(), errp); + if (ret) { + ERROR(errp, "init RDMA source failure for multi channel %d!", i); + goto err; + } + + ret = qemu_rdma_connect(p->rdma, errp); + if (ret) { + ERROR(errp, "connect multi channel %d failure!", i); + goto err; + } + + f = qemu_fopen_rdma(multiRDMA_send_state->params[i].rdma, "wb"); qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); p->quit = false; @@ -4214,12 +4327,20 @@ static int multiRDMA_save_setup(const char *host_port, Error **errp) p->running = true; p->name = g_strdup_printf("multiRDMASend_%d", i); - qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p, - QEMU_THREAD_JOINABLE); - atomic_inc(&multiRDMA_send_state->count); + multiRDMA_send_new_channel(f, i); } return 0; + +err: + for (i = 0; i < thread_count; i++) { + g_free(multiRDMA_send_state->params[i].rdma); + } + + g_free(multiRDMA_send_state->params); + g_free(multiRDMA_send_state); + + return -1; } static void multiRDMA_send_terminate_threads(void) @@ -4268,6 +4389,8 @@ int multiRDMA_save_cleanup(void) qemu_sem_destroy(&p->sem); g_free(p->name); p->name = NULL; + qemu_rdma_cleanup(multiRDMA_send_state->params[i].rdma); + g_free(multiRDMA_send_state->params[i].rdma); } qemu_sem_destroy(&multiRDMA_send_state->sem_sync); @@ -4292,6 +4415,71 @@ static void multiRDMA_recv_terminate_threads(void) } } +static void qemu_multiRDMA_load_cleanup(RDMAContext *rdma) +{ + int idx; + + if (rdma->cm_id && rdma->connected) { + if ((rdma->error_state || + migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && + !rdma->received_error) { + RDMAControlHeader head = { .len = 0, + .type = RDMA_CONTROL_ERROR, + .repeat = 1, + }; + error_report("Early error. Sending error."); + qemu_rdma_post_send_control(rdma, NULL, &head); + } + + rdma_disconnect(rdma->cm_id); + trace_qemu_rdma_cleanup_disconnect(); + rdma->connected = false; + } + + g_free(rdma->dest_blocks); + rdma->dest_blocks = NULL; + + for (idx = 0; idx < RDMA_WRID_MAX; idx++) { + if (rdma->wr_data[idx].control_mr) { + rdma->total_registrations--; + ibv_dereg_mr(rdma->wr_data[idx].control_mr); + } + rdma->wr_data[idx].control_mr = NULL; + } + + if (rdma->local_ram_blocks.block) { + while (rdma->local_ram_blocks.nb_blocks) { + rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); + } + } + + if (rdma->qp) { + rdma_destroy_qp(rdma->cm_id); + rdma->qp = NULL; + } + if (rdma->cq) { + ibv_destroy_cq(rdma->cq); + rdma->cq = NULL; + } + if (rdma->comp_channel) { + ibv_destroy_comp_channel(rdma->comp_channel); + rdma->comp_channel = NULL; + } + if (rdma->pd) { + ibv_dealloc_pd(rdma->pd); + rdma->pd = NULL; + } + if (rdma->cm_id) { + rdma_destroy_id(rdma->cm_id); + rdma->cm_id = NULL; + } + + rdma->listen_id = NULL; + rdma->channel = NULL; + g_free(rdma->host); + rdma->host = NULL; +} + int multiRDMA_load_cleanup(void) { int i; @@ -4323,6 +4511,8 @@ int multiRDMA_load_cleanup(void) qemu_sem_destroy(&p->sem); g_free(p->name); p->name = NULL; + qemu_multiRDMA_load_cleanup(multiRDMA_recv_state->params[i].rdma); + g_free(multiRDMA_recv_state->params[i].rdma); } qemu_sem_destroy(&multiRDMA_recv_state->sem_sync); @@ -4386,15 +4576,18 @@ void rdma_start_outgoing_migration(void *opaque, trace_rdma_start_outgoing_migration_after_rdma_connect(); + s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); + /* create multiRDMA channel */ if (migrate_use_multiRDMA()) { if (multiRDMA_save_setup(host_port, errp) != 0) { ERROR(errp, "init multiRDMA channels failure!"); goto err; } + migrate_fd_connect(s, NULL); + } else { + migrate_fd_connect(s, NULL); } - s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); - migrate_fd_connect(s, NULL); return; err: g_free(rdma); -- 2.19.1