In both sides. We still don't transmit anything through them, and we only build the RDMA connections.
Signed-off-by: Zhimin Feng <fengzhim...@huawei.com> --- migration/multifd.c | 103 ++++++++++++++++++++++++++++++++++++--- migration/multifd.h | 10 ++++ migration/rdma.c | 115 ++++++++++++++++++++++++++++++++------------ migration/rdma.h | 4 +- 4 files changed, 189 insertions(+), 43 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index 63678d7fdd..acdfd3d5b3 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -248,6 +248,19 @@ struct { int exiting; } *multifd_send_state; +int get_multifd_send_param(int id, MultiFDSendParams **param) +{ + int ret = 0; + + if (id < 0 || id >= migrate_multifd_channels()) { + ret = -1; + } else { + *param = &(multifd_send_state->params[id]); + } + + return ret; +} + /* * How we use multifd_send_state->pages and channel->pages? * @@ -410,6 +423,9 @@ void multifd_save_cleanup(void) p->packet_len = 0; g_free(p->packet); p->packet = NULL; + if (migrate_use_rdma()) { + g_free(p->rdma); + } } qemu_sem_destroy(&multifd_send_state->channels_ready); g_free(multifd_send_state->params); @@ -464,6 +480,27 @@ void multifd_send_sync_main(QEMUFile *f) trace_multifd_send_sync_main(multifd_send_state->packet_num); } +static void *multifd_rdma_send_thread(void *opaque) +{ + MultiFDSendParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} + static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; @@ -566,6 +603,12 @@ static void rdma_send_channel_create(MultiFDSendParams *p) { Error *local_err = NULL; + if (multifd_channel_rdma_connect(p)) { + error_setg(&local_err, "multifd: rdma channel %d not established", + p->id); + return ; + } + if (p->quit) { error_setg(&local_err, "multifd: send id %d already quit", p->id); return ; @@ -654,6 +697,19 @@ struct { uint64_t packet_num; } *multifd_recv_state; +int get_multifd_recv_param(int id, MultiFDRecvParams **param) +{ + int ret = 0; + + if (id < 0 || id >= migrate_multifd_channels()) { + ret = -1; + } else { + *param = &(multifd_recv_state->params[id]); + } + + return ret; +} + static void multifd_recv_terminate_threads(Error *err) { int i; @@ -724,6 +780,9 @@ int multifd_load_cleanup(Error **errp) p->packet_len = 0; g_free(p->packet); p->packet = NULL; + if (migrate_use_rdma()) { + g_free(p->rdma); + } } qemu_sem_destroy(&multifd_recv_state->sem_sync); g_free(multifd_recv_state->params); @@ -761,6 +820,27 @@ void multifd_recv_sync_main(void) trace_multifd_recv_sync_main(multifd_recv_state->packet_num); } +static void *multifd_rdma_recv_thread(void *opaque) +{ + MultiFDRecvParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem_sync); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} + static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; @@ -880,18 +960,24 @@ bool multifd_recv_all_channels_created(void) bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) { MultiFDRecvParams *p; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); Error *local_err = NULL; int id; - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - atomic_read(&multifd_recv_state->count)); - return false; + if (migrate_use_rdma()) { + id = multifd_recv_state->count; + } else { + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + error_propagate_prepend(errp, local_err, + "failed to receive packet" + " via multifd channel %d: ", + atomic_read(&multifd_recv_state->count)); + return false; + } } + trace_multifd_recv_new_channel(id); p = &multifd_recv_state->params[id]; @@ -903,6 +989,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) return false; } p->c = ioc; + p->file = rioc->file; object_ref(OBJECT(ioc)); /* initial packet */ p->num_packets = 1; diff --git a/migration/multifd.h b/migration/multifd.h index c9c11ad140..1eae427f8c 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -67,6 +67,10 @@ typedef struct { char *name; /* channel thread id */ QemuThread thread; + /* RDMAContext channel */ + RDMAContext *rdma; + /* communication channel */ + QEMUFile *file; /* communication channel */ QIOChannel *c; /* sem where to wait for more work */ @@ -108,6 +112,10 @@ typedef struct { char *name; /* channel thread id */ QemuThread thread; + /* RDMAContext channel */ + RDMAContext *rdma; + /* communication channel */ + QEMUFile *file; /* communication channel */ QIOChannel *c; /* this mutex protects the following parameters */ @@ -137,5 +145,7 @@ typedef struct { QemuSemaphore sem_sync; } MultiFDRecvParams; +int get_multifd_send_param(int id, MultiFDSendParams **param); +int get_multifd_recv_param(int id, MultiFDRecvParams **param); #endif diff --git a/migration/rdma.c b/migration/rdma.c index a76823986e..48615fcaad 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -94,6 +94,8 @@ static const char *wrid_desc[] = { [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", }; +static const char *rdma_host_port; + /* * Negotiate RDMA capabilities during connection-setup time. */ @@ -3122,6 +3124,33 @@ static int qemu_rdma_accept(RDMAContext *rdma) qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, NULL, (void *)(intptr_t)rdma->return_path); + } else if (migrate_use_multifd()) { + int thread_count; + int i; + MultiFDRecvParams *multifd_recv_param; + RDMAContext *multifd_rdma = NULL; + thread_count = migrate_multifd_channels(); + /* create the multifd channels for RDMA */ + for (i = 0; i < thread_count; i++) { + if (get_multifd_recv_param(i, &multifd_recv_param) < 0) { + error_report("rdma: error getting multifd_recv_param(%d)", i); + goto err_rdma_dest_wait; + } + + if (multifd_recv_param->rdma->cm_id == NULL) { + multifd_rdma = multifd_recv_param->rdma; + break; + } + } + + if (multifd_rdma) { + qemu_set_fd_handler(rdma->channel->fd, + rdma_accept_incoming_migration, + NULL, (void *)(intptr_t)multifd_rdma); + } else { + qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, + NULL, rdma); + } } else { qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, NULL, rdma); @@ -3744,7 +3773,7 @@ static void migration_rdma_process_incoming(QEMUFile *f, Error **errp) mis->from_src_file = f; qemu_file_set_blocking(f, false); - start_migration = migrate_use_multifd(); + start_migration = !migrate_use_multifd(); } else { ioc = QIO_CHANNEL(getQIOChannel(f)); /* Multiple connections */ @@ -3847,6 +3876,30 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp) goto err; } + if (migrate_use_multifd()) { + int thread_count; + int i; + int idx; + MultiFDRecvParams *multifd_recv_param; + thread_count = migrate_multifd_channels(); + for (i = 0; i < thread_count; i++) { + if (get_multifd_recv_param(i, &multifd_recv_param) < 0) { + error_report("rdma: error getting multifd_recv_param(%d)", i); + goto err; + } + + multifd_recv_param->rdma = qemu_rdma_data_init(host_port, + &local_err); + for (idx = 0; idx < RDMA_WRID_MAX; idx++) { + multifd_recv_param->rdma->wr_data[idx].control_len = 0; + multifd_recv_param->rdma->wr_data[idx].control_curr = NULL; + } + /* the CM channel and CM id is shared */ + multifd_recv_param->rdma->channel = rdma->channel; + multifd_recv_param->rdma->listen_id = rdma->listen_id; + } + } + qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, NULL, (void *)(intptr_t)rdma); return; @@ -3868,6 +3921,10 @@ void rdma_start_outgoing_migration(void *opaque, goto err; } + if (migrate_use_multifd()) { + rdma_host_port = g_strdup(host_port); + } + ret = qemu_rdma_source_init(rdma, s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); @@ -3918,44 +3975,38 @@ err: g_free(rdma_return_path); } -void *multifd_rdma_recv_thread(void *opaque) +int multifd_channel_rdma_connect(void *opaque) { - MultiFDRecvParams *p = opaque; + MultiFDSendParams *p = opaque; + Error *local_err = NULL; + int ret = 0; - while (true) { - qemu_mutex_lock(&p->mutex); - if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } - qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem_sync); + p->rdma = qemu_rdma_data_init(rdma_host_port, &local_err); + if (p->rdma == NULL) { + goto out; } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - return NULL; -} + ret = qemu_rdma_source_init(p->rdma, + migrate_use_rdma_pin_all(), + &local_err); + if (ret) { + goto out; + } -void *multifd_rdma_send_thread(void *opaque) -{ - MultiFDSendParams *p = opaque; + ret = qemu_rdma_connect(p->rdma, &local_err); + if (ret) { + goto out; + } - while (true) { - qemu_mutex_lock(&p->mutex); - if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } - qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); + p->file = qemu_fopen_rdma(p->rdma, "wb"); + if (p->file == NULL) { + goto out; } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); +out: + if (local_err) { + trace_multifd_send_error(p->id); + } - return NULL; + return ret; } diff --git a/migration/rdma.h b/migration/rdma.h index cb206c7004..ace6e5be90 100644 --- a/migration/rdma.h +++ b/migration/rdma.h @@ -263,9 +263,7 @@ struct QIOChannelRDMA { bool blocking; /* XXX we don't actually honour this yet */ }; - -void *multifd_rdma_recv_thread(void *opaque); -void *multifd_rdma_send_thread(void *opaque); +int multifd_channel_rdma_connect(void *opaque); void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp); -- 2.19.1