From: fengzhimin <fengzhim...@huawei.com> Creation of the RDMA threads, nothing inside yet.
Signed-off-by: fengzhimin <fengzhim...@huawei.com> --- migration/migration.c | 1 + migration/migration.h | 2 + migration/rdma.c | 283 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 286 insertions(+) diff --git a/migration/migration.c b/migration/migration.c index 5756a4806e..f8d4eb657e 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -1546,6 +1546,7 @@ static void migrate_fd_cleanup(MigrationState *s) qemu_mutex_lock_iothread(); multifd_save_cleanup(); + multiRDMA_save_cleanup(); qemu_mutex_lock(&s->qemu_file_lock); tmp = s->to_dst_file; s->to_dst_file = NULL; diff --git a/migration/migration.h b/migration/migration.h index 4192c22d8c..d69a3fe4e9 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -272,6 +272,8 @@ void migration_incoming_process(void); bool migration_has_all_channels(void); int migrate_multiRDMA_channels(void); +int multiRDMA_save_cleanup(void); +int multiRDMA_load_cleanup(void); uint64_t migrate_max_downtime(void); diff --git a/migration/rdma.c b/migration/rdma.c index e241dcb992..992e5abfed 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -395,6 +395,58 @@ typedef struct RDMAContext { bool is_return_path; } RDMAContext; +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* sem where to wait for more work */ + QemuSemaphore sem; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; +} MultiRDMASendParams; + +struct { + MultiRDMASendParams *params; + /* number of created threads */ + int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} *multiRDMA_send_state; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* sem where to wait for more work */ + QemuSemaphore sem; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; +} MultiRDMARecvParams; + +struct { + MultiRDMARecvParams *params; + /* number of created threads */ + int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} *multiRDMA_recv_state; + #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" #define QIO_CHANNEL_RDMA(obj) \ OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA) @@ -3018,6 +3070,7 @@ static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) if (rcu->rdmaout) { qemu_rdma_cleanup(rcu->rdmaout); } + multiRDMA_load_cleanup(); g_free(rcu->rdmain); g_free(rcu->rdmaout); @@ -3919,6 +3972,7 @@ static void qio_channel_rdma_finalize(Object *obj) g_free(rioc->rdmaout); rioc->rdmaout = NULL; } + multiRDMA_load_cleanup(); } static void qio_channel_rdma_class_init(ObjectClass *klass, @@ -4007,6 +4061,59 @@ static void rdma_accept_incoming_migration(void *opaque) migration_fd_process_incoming(f); } +static void *multiRDMA_recv_thread(void *opaque) +{ + MultiRDMARecvParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} + +static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma, + Error **errp) +{ + uint8_t i; + int thread_count; + + thread_count = migrate_multiRDMA_channels(); + if (multiRDMA_recv_state == NULL) { + multiRDMA_recv_state = g_malloc0(sizeof(*multiRDMA_recv_state)); + multiRDMA_recv_state->params = g_new0(MultiRDMARecvParams, + thread_count); + atomic_set(&multiRDMA_recv_state->count, 0); + qemu_sem_init(&multiRDMA_recv_state->sem_sync, 0); + + for (i = 0; i < thread_count; i++) { + MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem, 0); + p->quit = false; + p->id = i; + p->running = true; + p->name = g_strdup_printf("multiRDMARecv_%d", i); + qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, + p, QEMU_THREAD_JOINABLE); + atomic_inc(&multiRDMA_recv_state->count); + } + } + + return 0; +} + void rdma_start_incoming_migration(const char *host_port, Error **errp) { int ret; @@ -4048,6 +4155,13 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp) qemu_rdma_return_path_dest_init(rdma_return_path, rdma); } + if (migrate_use_multiRDMA()) { + if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) { + ERROR(errp, "init multiRDMA failure!"); + goto err; + } + } + qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, NULL, (void *)(intptr_t)rdma); return; @@ -4055,6 +4169,167 @@ err: error_propagate(errp, local_err); g_free(rdma); g_free(rdma_return_path); + multiRDMA_load_cleanup(); +} + +static void *multiRDMA_send_thread(void *opaque) +{ + MultiRDMASendParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} + +static int multiRDMA_save_setup(const char *host_port, Error **errp) +{ + int thread_count; + uint8_t i; + + thread_count = migrate_multiRDMA_channels(); + multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state)); + multiRDMA_send_state->params = g_new0(MultiRDMASendParams, + thread_count); + atomic_set(&multiRDMA_send_state->count, 0); + qemu_sem_init(&multiRDMA_send_state->sem_sync, 0); + + for (i = 0; i < thread_count; i++) { + MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem, 0); + p->quit = false; + p->id = i; + p->running = true; + p->name = g_strdup_printf("multiRDMASend_%d", i); + + qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p, + QEMU_THREAD_JOINABLE); + atomic_inc(&multiRDMA_send_state->count); + } + + return 0; +} + +static void multiRDMA_send_terminate_threads(void) +{ + int i; + int thread_count = migrate_multiRDMA_channels(); + + for (i = 0; i < thread_count; i++) { + MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } +} + +int multiRDMA_save_cleanup(void) +{ + int i; + int ret = 0; + int thread_count = migrate_multiRDMA_channels(); + + if (!migrate_use_multiRDMA()) { + return 0; + } + + /* prevent double free */ + if (multiRDMA_send_state == NULL) { + return 0; + } + + /* notify multi RDMA threads to exit */ + multiRDMA_send_terminate_threads(); + + /* wait for multi RDMA send threads to be exit */ + for (i = 0; i < thread_count; i++) { + MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; + + qemu_thread_join(&p->thread); + } + + for (i = 0; i < thread_count; i++) { + MultiRDMASendParams *p = &multiRDMA_send_state->params[i]; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); + g_free(p->name); + p->name = NULL; + } + + qemu_sem_destroy(&multiRDMA_send_state->sem_sync); + g_free(multiRDMA_send_state); + multiRDMA_send_state = NULL; + + return ret; +} + +static void multiRDMA_recv_terminate_threads(void) +{ + int i; + int thread_count = migrate_multiRDMA_channels(); + + for (i = 0; i < thread_count; i++) { + MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } +} + +int multiRDMA_load_cleanup(void) +{ + int i; + int ret = 0; + int thread_count = migrate_multiRDMA_channels(); + + if (!migrate_use_multiRDMA()) { + return 0; + } + + /* prevent double free */ + if (multiRDMA_recv_state == NULL) { + return 0; + } + + /* notify multi RDMA recv threads to exit */ + multiRDMA_recv_terminate_threads(); + + /* wait for multi RDMA threads to be exit */ + for (i = 0; i < thread_count; i++) { + MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i]; + + qemu_thread_join(&p->thread); + } + + for (i = 0; i < thread_count; i++) { + MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i]; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); + g_free(p->name); + p->name = NULL; + } + + qemu_sem_destroy(&multiRDMA_recv_state->sem_sync); + g_free(multiRDMA_recv_state); + multiRDMA_recv_state = NULL; + + return ret; } void rdma_start_outgoing_migration(void *opaque, @@ -4111,10 +4386,18 @@ void rdma_start_outgoing_migration(void *opaque, trace_rdma_start_outgoing_migration_after_rdma_connect(); + if (migrate_use_multiRDMA()) { + if (multiRDMA_save_setup(host_port, errp) != 0) { + ERROR(errp, "init multiRDMA channels failure!"); + goto err; + } + } + s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); migrate_fd_connect(s, NULL); return; err: g_free(rdma); g_free(rdma_return_path); + multiRDMA_save_cleanup(); } -- 2.19.1