* Zhimin Feng (fengzhim...@huawei.com) wrote: > From: fengzhimin <fengzhim...@huawei.com> > > In both sides. We still don't transmit anything through them, > and we only build the RDMA connections. > > Signed-off-by: fengzhimin <fengzhim...@huawei.com> > --- > migration/rdma.c | 253 +++++++++++++++++++++++++++++++++++++++++------ > 1 file changed, 223 insertions(+), 30 deletions(-) > > diff --git a/migration/rdma.c b/migration/rdma.c > index 992e5abfed..5b780bef36 100644 > --- a/migration/rdma.c > +++ b/migration/rdma.c > @@ -403,6 +403,10 @@ typedef struct { > char *name; > /* channel thread id */ > QemuThread thread; > + /* RDMAContext channel */ > + RDMAContext *rdma; > + /* communication channel */ > + QEMUFile *file; > /* sem where to wait for more work */ > QemuSemaphore sem; > /* this mutex protects the following parameters */ > @@ -429,6 +433,10 @@ typedef struct { > char *name; > /* channel thread id */ > QemuThread thread; > + /* RDMAContext channel */ > + RDMAContext *rdma; > + /* communication channel */ > + QEMUFile *file; > /* sem where to wait for more work */ > QemuSemaphore sem; > /* this mutex protects the following parameters */ > @@ -3417,6 +3425,27 @@ static int qemu_rdma_accept(RDMAContext *rdma) > qemu_set_fd_handler(rdma->channel->fd, > rdma_accept_incoming_migration, > NULL, > (void *)(intptr_t)rdma->return_path); > + } else if (migrate_use_multiRDMA()) { > + int thread_count; > + int i; > + RDMAContext *multi_rdma = NULL; > + thread_count = migrate_multiRDMA_channels(); > + /* create the multi Thread RDMA channels */ > + for (i = 0; i < thread_count; i++) { > + if (multiRDMA_recv_state->params[i].rdma->cm_id == NULL) { > + multi_rdma = multiRDMA_recv_state->params[i].rdma; > + break; > + } > + } > + > + if (multi_rdma) { > + qemu_set_fd_handler(rdma->channel->fd, > + rdma_accept_incoming_migration, > + NULL, (void *)(intptr_t)multi_rdma); > + } else { > + qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, > + NULL, rdma); > + } > } else { > qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, > NULL, rdma); > @@ -4029,6 +4058,58 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, > const char *mode) > return rioc->file; > } > > +static void *multiRDMA_recv_thread(void *opaque) > +{ > + MultiRDMARecvParams *p = opaque; > + > + while (true) { > + qemu_mutex_lock(&p->mutex); > + if (p->quit) { > + qemu_mutex_unlock(&p->mutex); > + break; > + } > + qemu_mutex_unlock(&p->mutex); > + qemu_sem_wait(&p->sem); > + } > + > + qemu_mutex_lock(&p->mutex); > + p->running = false; > + qemu_mutex_unlock(&p->mutex); > + > + return NULL; > +} > + > +static void multiRDMA_recv_new_channel(QEMUFile *f, int id) > +{ > + MultiRDMARecvParams *p; > + Error *local_err = NULL; > + > + p = &multiRDMA_recv_state->params[id]; > + if (p->file != NULL) { > + error_setg(&local_err, > + "multiRDMA: received id '%d' already setup'", id); > + return ; > + } > + p->file = f; > + > + qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, p, > + QEMU_THREAD_JOINABLE); > + atomic_inc(&multiRDMA_recv_state->count); > +} > + > +static void migration_multiRDMA_process_incoming(QEMUFile *f, RDMAContext > *rdma) > +{ > + MigrationIncomingState *mis = migration_incoming_get_current(); > + > + if (!mis->from_src_file) { > + rdma->migration_started_on_destination = 1; > + migration_incoming_setup(f); > + migration_incoming_process(); > + } else { > + multiRDMA_recv_new_channel(f, multiRDMA_recv_state->count); > + } > +} > + > static void rdma_accept_incoming_migration(void *opaque) > { > RDMAContext *rdma = opaque; > @@ -4057,29 +4138,13 @@ static void rdma_accept_incoming_migration(void > *opaque) > return; > } > > - rdma->migration_started_on_destination = 1; > - migration_fd_process_incoming(f); > -} > - > -static void *multiRDMA_recv_thread(void *opaque) > -{ > - MultiRDMARecvParams *p = opaque; > - > - while (true) { > - qemu_mutex_lock(&p->mutex); > - if (p->quit) { > - qemu_mutex_unlock(&p->mutex); > - break; > - } > - qemu_mutex_unlock(&p->mutex); > - qemu_sem_wait(&p->sem); > + if (migrate_use_multiRDMA()) { > + /* build the multiRDMA channels */ > + migration_multiRDMA_process_incoming(f, rdma); > + } else { > + rdma->migration_started_on_destination = 1; > + migration_fd_process_incoming(f); > } > - > - qemu_mutex_lock(&p->mutex); > - p->running = false; > - qemu_mutex_unlock(&p->mutex); > - > - return NULL; > } > > static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma, > @@ -4087,6 +4152,7 @@ static int multiRDMA_load_setup(const char *host_port, > RDMAContext *rdma, > { > uint8_t i; > int thread_count; > + int idx; > > thread_count = migrate_multiRDMA_channels(); > if (multiRDMA_recv_state == NULL) { > @@ -4099,15 +4165,21 @@ static int multiRDMA_load_setup(const char > *host_port, RDMAContext *rdma, > for (i = 0; i < thread_count; i++) { > MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i]; > > + p->rdma = qemu_rdma_data_init(host_port, errp); > + for (idx = 0; idx < RDMA_WRID_MAX; idx++) { > + p->rdma->wr_data[idx].control_len = 0; > + p->rdma->wr_data[idx].control_curr = NULL; > + } > + /* the CM channel and CM id is shared */ > + p->rdma->channel = rdma->channel; > + p->rdma->listen_id = rdma->listen_id; > + > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > p->quit = false; > p->id = i; > p->running = true; > p->name = g_strdup_printf("multiRDMARecv_%d", i); > - qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, > - p, QEMU_THREAD_JOINABLE); > - atomic_inc(&multiRDMA_recv_state->count); > } > } > > @@ -4155,6 +4227,7 @@ void rdma_start_incoming_migration(const char > *host_port, Error **errp) > qemu_rdma_return_path_dest_init(rdma_return_path, rdma); > } > > + /* initialize the RDMAContext for multiRDMA */ > if (migrate_use_multiRDMA()) { > if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) { > ERROR(errp, "init multiRDMA failure!"); > @@ -4193,10 +4266,29 @@ static void *multiRDMA_send_thread(void *opaque) > return NULL; > } > > +static void multiRDMA_send_new_channel(QEMUFile *f, int id) > +{ > + MultiRDMASendParams *p; > + Error *local_err = NULL; > + > + p = &multiRDMA_send_state->params[id]; > + if (p->file != NULL) { > + error_setg(&local_err, > + "multiRDMA: send id '%d' already setup'", id); > + return ; > + } > + p->file = f; > + > + qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, > + p, QEMU_THREAD_JOINABLE); > + atomic_inc(&multiRDMA_send_state->count); > +} > + > static int multiRDMA_save_setup(const char *host_port, Error **errp) > { > int thread_count; > uint8_t i; > + int ret; > > thread_count = migrate_multiRDMA_channels(); > multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state)); > @@ -4207,6 +4299,27 @@ static int multiRDMA_save_setup(const char *host_port, > Error **errp) > > for (i = 0; i < thread_count; i++) { > MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; > + QEMUFile *f = NULL; > + > + p->rdma = qemu_rdma_data_init(host_port, errp); > + if (p->rdma == NULL) { > + ERROR(errp, "init RDMA data failure for multi channel %d!", i); > + goto err; > + } > + > + ret = qemu_rdma_source_init(p->rdma, migrate_use_rdma_pin_all(), > errp); > + if (ret) { > + ERROR(errp, "init RDMA source failure for multi channel %d!", i); > + goto err; > + } > + > + ret = qemu_rdma_connect(p->rdma, errp); > + if (ret) { > + ERROR(errp, "connect multi channel %d failure!", i); > + goto err; > + } > + > + f = qemu_fopen_rdma(multiRDMA_send_state->params[i].rdma, "wb"); > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > p->quit = false; > @@ -4214,12 +4327,20 @@ static int multiRDMA_save_setup(const char > *host_port, Error **errp) > p->running = true; > p->name = g_strdup_printf("multiRDMASend_%d", i); > > - qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p, > - QEMU_THREAD_JOINABLE); > - atomic_inc(&multiRDMA_send_state->count); > + multiRDMA_send_new_channel(f, i); > } > > return 0; > + > +err: > + for (i = 0; i < thread_count; i++) { > + g_free(multiRDMA_send_state->params[i].rdma); > + } > + > + g_free(multiRDMA_send_state->params); > + g_free(multiRDMA_send_state); > + > + return -1;
This err path doesn't look enough - don't you have to do the equivalent of qemu_rdma_cleanup for each channel that did succesfully connect, and then also the one that's failed (perhaps after the first step)? > } > > static void multiRDMA_send_terminate_threads(void) > @@ -4268,6 +4389,8 @@ int multiRDMA_save_cleanup(void) > qemu_sem_destroy(&p->sem); > g_free(p->name); > p->name = NULL; > + qemu_rdma_cleanup(multiRDMA_send_state->params[i].rdma); > + g_free(multiRDMA_send_state->params[i].rdma); > } > > qemu_sem_destroy(&multiRDMA_send_state->sem_sync); > @@ -4292,6 +4415,71 @@ static void multiRDMA_recv_terminate_threads(void) > } > } > > +static void qemu_multiRDMA_load_cleanup(RDMAContext *rdma) > +{ > + int idx; > + > + if (rdma->cm_id && rdma->connected) { > + if ((rdma->error_state || > + migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && > + !rdma->received_error) { > + RDMAControlHeader head = { .len = 0, > + .type = RDMA_CONTROL_ERROR, > + .repeat = 1, > + }; > + error_report("Early error. Sending error."); > + qemu_rdma_post_send_control(rdma, NULL, &head); > + } > + > + rdma_disconnect(rdma->cm_id); > + trace_qemu_rdma_cleanup_disconnect(); > + rdma->connected = false; > + } > + > + g_free(rdma->dest_blocks); > + rdma->dest_blocks = NULL; > + > + for (idx = 0; idx < RDMA_WRID_MAX; idx++) { > + if (rdma->wr_data[idx].control_mr) { > + rdma->total_registrations--; > + ibv_dereg_mr(rdma->wr_data[idx].control_mr); > + } > + rdma->wr_data[idx].control_mr = NULL; > + } > + > + if (rdma->local_ram_blocks.block) { > + while (rdma->local_ram_blocks.nb_blocks) { > + rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); > + } > + } > + > + if (rdma->qp) { > + rdma_destroy_qp(rdma->cm_id); > + rdma->qp = NULL; > + } > + if (rdma->cq) { > + ibv_destroy_cq(rdma->cq); > + rdma->cq = NULL; > + } > + if (rdma->comp_channel) { > + ibv_destroy_comp_channel(rdma->comp_channel); > + rdma->comp_channel = NULL; > + } > + if (rdma->pd) { > + ibv_dealloc_pd(rdma->pd); > + rdma->pd = NULL; > + } > + if (rdma->cm_id) { > + rdma_destroy_id(rdma->cm_id); > + rdma->cm_id = NULL; > + } > + > + rdma->listen_id = NULL; > + rdma->channel = NULL; > + g_free(rdma->host); > + rdma->host = NULL; > +} > + > int multiRDMA_load_cleanup(void) > { > int i; > @@ -4323,6 +4511,8 @@ int multiRDMA_load_cleanup(void) > qemu_sem_destroy(&p->sem); > g_free(p->name); > p->name = NULL; > + qemu_multiRDMA_load_cleanup(multiRDMA_recv_state->params[i].rdma); > + g_free(multiRDMA_recv_state->params[i].rdma); > } > > qemu_sem_destroy(&multiRDMA_recv_state->sem_sync); > @@ -4386,15 +4576,18 @@ void rdma_start_outgoing_migration(void *opaque, > > trace_rdma_start_outgoing_migration_after_rdma_connect(); > > + s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); > + /* create multiRDMA channel */ > if (migrate_use_multiRDMA()) { > if (multiRDMA_save_setup(host_port, errp) != 0) { > ERROR(errp, "init multiRDMA channels failure!"); > goto err; > } > + migrate_fd_connect(s, NULL); > + } else { > + migrate_fd_connect(s, NULL); > } If that's on both sides of the 'if' then it should move to here. > - s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); > - migrate_fd_connect(s, NULL); > return; > err: > g_free(rdma); > -- > 2.19.1 > > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK