On Fri, Apr 27, 2018 at 1:36 AM, Dr. David Alan Gilbert <dgilb...@redhat.com> wrote: > * Lidong Chen (jemmy858...@gmail.com) wrote: >> This patch implements bi-directional RDMA QIOChannel. Because different >> threads may access RDMAQIOChannel concurrently, this patch use RCU to >> protect it. >> >> Signed-off-by: Lidong Chen <lidongc...@tencent.com> > > I'm a bit confused by this. > > I can see it's adding RCU to protect the rdma structures against > deletion from multiple threads; that I'm OK with in principal; is that > the only locking we need? (I guess the two directions are actually > separate RDMAContext's so maybe).
The qio_channel_rdma_close maybe invoked by migration thread and return path thread concurrently, so I use a mutex to protect it. If one thread invoke qio_channel_rdma_writev, another thread invokes qio_channel_rdma_readv, two threads will use separate RDMAContext, so it does not need a lock. If two threads invoke qio_channel_rdma_writev concurrently, it will need a lock to protect. but I find source qemu migration thread only invoke qio_channel_rdma_writev, the return path thread only invokes qio_channel_rdma_readv. The destination qemu only invoked qio_channel_rdma_readv by main thread before postcopy and or listen thread after postcopy. The destination qemu have already protected it by using qemu_mutex_lock(&mis->rp_mutex) when writing data to source qemu. But should we use qemu_mutex_lock to protect qio_channel_rdma_writev and qio_channel_rdma_readv? to avoid some change in future invoke qio_channel_rdma_writev or qio_channel_rdma_readv concurrently? > > But is there nothing else to make the QIOChannel bidirectional? > > Also, a lot seems dependent on listen_id, can you explain how that's > being used. The destination qemu is server side, so listen_id is not zero. the source qemu is client side, the listen_id is zero. I use listen_id to determine whether qemu is destination or source. for the destination qemu, if write data to source, it need use the return_path rdma, like this: if (rdma->listen_id) { rdma = rdma->return_path; } for the source qemu, if read data from destination, it also need use the return_path rdma. if (!rdma->listen_id) { rdma = rdma->return_path; } > > Finally, I don't think you have anywhere that destroys the new mutex you > added. I will fix this next version. > > Dave > P.S. Please cc Daniel Berrange on this series, since it's so much > IOChannel stuff. > >> --- >> migration/rdma.c | 162 >> +++++++++++++++++++++++++++++++++++++++++++++++++------ >> 1 file changed, 146 insertions(+), 16 deletions(-) >> >> diff --git a/migration/rdma.c b/migration/rdma.c >> index f5c1d02..0652224 100644 >> --- a/migration/rdma.c >> +++ b/migration/rdma.c >> @@ -86,6 +86,7 @@ static uint32_t known_capabilities = >> RDMA_CAPABILITY_PIN_ALL; >> " to abort!"); \ >> rdma->error_reported = 1; \ >> } \ >> + rcu_read_unlock(); \ >> return rdma->error_state; \ >> } \ >> } while (0) >> @@ -405,6 +406,7 @@ struct QIOChannelRDMA { >> RDMAContext *rdma; >> QEMUFile *file; >> bool blocking; /* XXX we don't actually honour this yet */ >> + QemuMutex lock; >> }; >> >> /* >> @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel >> *ioc, >> { >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); >> QEMUFile *f = rioc->file; >> - RDMAContext *rdma = rioc->rdma; >> + RDMAContext *rdma; >> int ret; >> ssize_t done = 0; >> size_t i; >> size_t len = 0; >> >> + rcu_read_lock(); >> + rdma = atomic_rcu_read(&rioc->rdma); >> + >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> + if (rdma->listen_id) { >> + rdma = rdma->return_path; >> + } >> + >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> CHECK_ERROR_STATE(); >> >> /* >> @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, >> ret = qemu_rdma_write_flush(f, rdma); >> if (ret < 0) { >> rdma->error_state = ret; >> + rcu_read_unlock(); >> return ret; >> } >> >> @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, >> >> if (ret < 0) { >> rdma->error_state = ret; >> + rcu_read_unlock(); >> return ret; >> } >> >> @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, >> } >> } >> >> + rcu_read_unlock(); >> return done; >> } >> >> @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel >> *ioc, >> Error **errp) >> { >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); >> - RDMAContext *rdma = rioc->rdma; >> + RDMAContext *rdma; >> RDMAControlHeader head; >> int ret = 0; >> ssize_t i; >> size_t done = 0; >> >> + rcu_read_lock(); >> + rdma = atomic_rcu_read(&rioc->rdma); >> + >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> + if (!rdma->listen_id) { >> + rdma = rdma->return_path; >> + } >> + >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> CHECK_ERROR_STATE(); >> >> for (i = 0; i < niov; i++) { >> @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, >> * were given and dish out the bytes until we run >> * out of bytes. >> */ >> - ret = qemu_rdma_fill(rioc->rdma, data, want, 0); >> + ret = qemu_rdma_fill(rdma, data, want, 0); >> done += ret; >> want -= ret; >> /* Got what we needed, so go to next iovec */ >> @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel >> *ioc, >> >> if (ret < 0) { >> rdma->error_state = ret; >> + rcu_read_unlock(); >> return ret; >> } >> >> /* >> * SEND was received with new bytes, now try again. >> */ >> - ret = qemu_rdma_fill(rioc->rdma, data, want, 0); >> + ret = qemu_rdma_fill(rdma, data, want, 0); >> done += ret; >> want -= ret; >> >> /* Still didn't get enough, so lets just return */ >> if (want) { >> if (done == 0) { >> + rcu_read_unlock(); >> return QIO_CHANNEL_ERR_BLOCK; >> } else { >> break; >> } >> } >> } >> + rcu_read_unlock(); >> return done; >> } >> >> @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source, >> GIOCondition cond = 0; >> *timeout = -1; >> >> + if ((rdma->listen_id && rsource->condition == G_IO_OUT) || >> + (!rdma->listen_id && rsource->condition == G_IO_IN)) { >> + rdma = rdma->return_path; >> + } >> + >> + if (!rdma) { >> + error_report("RDMAContext is NULL when prepare Gsource"); >> + return FALSE; >> + } >> + >> if (rdma->wr_data[0].control_len) { >> cond |= G_IO_IN; >> } >> @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source) >> RDMAContext *rdma = rsource->rioc->rdma; >> GIOCondition cond = 0; >> >> + if ((rdma->listen_id && rsource->condition == G_IO_OUT) || >> + (!rdma->listen_id && rsource->condition == G_IO_IN)) { >> + rdma = rdma->return_path; >> + } >> + >> + if (!rdma) { >> + error_report("RDMAContext is NULL when check Gsource"); >> + return FALSE; >> + } >> + >> if (rdma->wr_data[0].control_len) { >> cond |= G_IO_IN; >> } >> @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source, >> RDMAContext *rdma = rsource->rioc->rdma; >> GIOCondition cond = 0; >> >> + if ((rdma->listen_id && rsource->condition == G_IO_OUT) || >> + (!rdma->listen_id && rsource->condition == G_IO_IN)) { >> + rdma = rdma->return_path; >> + } >> + >> + if (!rdma) { >> + error_report("RDMAContext is NULL when dispatch Gsource"); >> + return FALSE; >> + } >> + >> if (rdma->wr_data[0].control_len) { >> cond |= G_IO_IN; >> } >> @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc, >> Error **errp) >> { >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); >> + RDMAContext *rdma; >> trace_qemu_rdma_close(); >> - if (rioc->rdma) { >> - if (!rioc->rdma->error_state) { >> - rioc->rdma->error_state = qemu_file_get_error(rioc->file); >> - } >> - qemu_rdma_cleanup(rioc->rdma); >> - g_free(rioc->rdma); >> - rioc->rdma = NULL; >> + >> + qemu_mutex_lock(&rioc->lock); >> + rdma = rioc->rdma; >> + if (!rdma) { >> + qemu_mutex_unlock(&rioc->lock); >> + return 0; >> + } >> + atomic_rcu_set(&rioc->rdma, NULL); >> + qemu_mutex_unlock(&rioc->lock); >> + >> + if (!rdma->error_state) { >> + rdma->error_state = qemu_file_get_error(rioc->file); >> + } >> + qemu_rdma_cleanup(rdma); >> + >> + if (rdma->return_path) { >> + qemu_rdma_cleanup(rdma->return_path); >> + g_free(rdma->return_path); >> } >> + >> + g_free(rdma); >> return 0; >> } >> >> @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void >> *opaque, >> size_t size, uint64_t *bytes_sent) >> { >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); >> - RDMAContext *rdma = rioc->rdma; >> + RDMAContext *rdma; >> int ret; >> >> + rcu_read_lock(); >> + rdma = atomic_rcu_read(&rioc->rdma); >> + >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> CHECK_ERROR_STATE(); >> >> if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { >> + rcu_read_unlock(); >> return RAM_SAVE_CONTROL_NOT_SUPP; >> } >> >> @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void >> *opaque, >> } >> } >> >> + rcu_read_unlock(); >> return RAM_SAVE_CONTROL_DELAYED; >> err: >> rdma->error_state = ret; >> + rcu_read_unlock(); >> return ret; >> } >> >> @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, >> void *opaque) >> RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, >> .repeat = 1 }; >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); >> - RDMAContext *rdma = rioc->rdma; >> - RDMALocalBlocks *local = &rdma->local_ram_blocks; >> + RDMAContext *rdma; >> + RDMALocalBlocks *local; >> RDMAControlHeader head; >> RDMARegister *reg, *registers; >> RDMACompress *comp; >> @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, >> void *opaque) >> int count = 0; >> int i = 0; >> >> + rcu_read_lock(); >> + rdma = atomic_rcu_read(&rioc->rdma); >> + >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> CHECK_ERROR_STATE(); >> >> + local = &rdma->local_ram_blocks; >> do { >> trace_qemu_rdma_registration_handle_wait(); >> >> @@ -3469,6 +3575,7 @@ out: >> if (ret < 0) { >> rdma->error_state = ret; >> } >> + rcu_read_unlock(); >> return ret; >> } >> >> @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, >> void *opaque, >> uint64_t flags, void *data) >> { >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); >> - RDMAContext *rdma = rioc->rdma; >> + RDMAContext *rdma; >> + >> + rcu_read_lock(); >> + rdma = atomic_rcu_read(&rioc->rdma); >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> >> CHECK_ERROR_STATE(); >> >> if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { >> + rcu_read_unlock(); >> return 0; >> } >> >> @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, >> void *opaque, >> qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); >> qemu_fflush(f); >> >> + rcu_read_unlock(); >> return 0; >> } >> >> @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, >> void *opaque, >> { >> Error *local_err = NULL, **errp = &local_err; >> QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); >> - RDMAContext *rdma = rioc->rdma; >> + RDMAContext *rdma; >> RDMAControlHeader head = { .len = 0, .repeat = 1 }; >> int ret = 0; >> >> + rcu_read_lock(); >> + rdma = atomic_rcu_read(&rioc->rdma); >> + if (!rdma) { >> + rcu_read_unlock(); >> + return -EIO; >> + } >> + >> CHECK_ERROR_STATE(); >> >> if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { >> + rcu_read_unlock(); >> return 0; >> } >> >> @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, >> void *opaque, >> qemu_rdma_reg_whole_ram_blocks : NULL); >> if (ret < 0) { >> ERROR(errp, "receiving remote info!"); >> + rcu_read_unlock(); >> return ret; >> } >> >> @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, >> void *opaque, >> "not identical on both the source and destination.", >> local->nb_blocks, nb_dest_blocks); >> rdma->error_state = -EINVAL; >> + rcu_read_unlock(); >> return -EINVAL; >> } >> >> @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, >> void *opaque, >> local->block[i].length, >> rdma->dest_blocks[i].length); >> rdma->error_state = -EINVAL; >> + rcu_read_unlock(); >> return -EINVAL; >> } >> local->block[i].remote_host_addr = >> @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, >> void *opaque, >> goto err; >> } >> >> + rcu_read_unlock(); >> return 0; >> err: >> rdma->error_state = ret; >> + rcu_read_unlock(); >> return ret; >> } >> >> @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, >> const char *mode) >> >> rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); >> rioc->rdma = rdma; >> + qemu_mutex_init(&rioc->lock); >> >> if (mode[0] == 'w') { >> rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); >> -- >> 1.8.3.1 >> > -- > Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK