* Daniel P. Berrange (berra...@redhat.com) wrote: > This converts the RDMA code to provide a subclass of > QIOChannel that uses RDMA for the data transport. > > This implementation of RDMA does not correctly > handle non-blocking mode. Reads might block > if there was not already some pending data > and writes will block until all data is sent. > This flawed behaviour was already present in > the existing impl, so appears to not be a > critical problem at this time. It should be > on the list of things to fix in the future > though. > > The RDMA code would be much better off it it could > be split up in a generic RDMA layer, a QIOChannel > impl based on RMDA, and then the RMDA migration > glue. This is left as a future exercise for the brave. > > Signed-off-by: Daniel P. Berrange <berra...@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilb...@redhat.com> (I don't know that much about the GSource stuff, but it looks consistent) > +static gboolean > +qio_channel_rdma_source_prepare(GSource *source, > + gint *timeout) > +{ > + QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; > + RDMAContext *rdma = rsource->rioc->rdma; > + GIOCondition cond = 0; > + *timeout = -1; > + > + if (rdma->wr_data[0].control_len) { > + cond |= G_IO_IN; > + } > + cond |= G_IO_OUT; > + > + return cond & rsource->condition; > +} I guess you could make that: *timeout = -1; return qio_channel_rdma_source_check(source); Dave > +static gboolean > +qio_channel_rdma_source_check(GSource *source) > +{ > + QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; > + RDMAContext *rdma = rsource->rioc->rdma; > + GIOCondition cond = 0; > + > + if (rdma->wr_data[0].control_len) { > + cond |= G_IO_IN; > + } > + cond |= G_IO_OUT; > + > + return cond & rsource->condition; > +} > + > +static gboolean > +qio_channel_rdma_source_dispatch(GSource *source, > + GSourceFunc callback, > + gpointer user_data) > +{ > + QIOChannelFunc func = (QIOChannelFunc)callback; > + QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; > + RDMAContext *rdma = rsource->rioc->rdma; > + GIOCondition cond = 0; > + > + if (rdma->wr_data[0].control_len) { > + cond |= G_IO_IN; > + } > + cond |= G_IO_OUT; > + > + return (*func)(QIO_CHANNEL(rsource->rioc), > + (cond & rsource->condition), > + user_data); > +} > + > +static void > +qio_channel_rdma_source_finalize(GSource *source) > +{ > + QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; > + > + object_unref(OBJECT(ssource->rioc)); > +} > + > +GSourceFuncs qio_channel_rdma_source_funcs = { > + qio_channel_rdma_source_prepare, > + qio_channel_rdma_source_check, > + qio_channel_rdma_source_dispatch, > + qio_channel_rdma_source_finalize > +}; > + > +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, > + GIOCondition condition) > { > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > + QIOChannelRDMASource *ssource; > + GSource *source; > + > + source = g_source_new(&qio_channel_rdma_source_funcs, > + sizeof(QIOChannelRDMASource)); > + ssource = (QIOChannelRDMASource *)source; > + > + ssource->rioc = rioc; > + object_ref(OBJECT(rioc)); > + > + ssource->condition = condition; > + > + return source; > +} > + > + > +static int qio_channel_rdma_close(QIOChannel *ioc, > + Error **errp) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > trace_qemu_rdma_close(); > - QEMUFileRDMA *r = opaque; > - if (r->rdma) { > - qemu_rdma_cleanup(r->rdma); > - g_free(r->rdma); > + if (rioc->rdma) { > + qemu_rdma_cleanup(rioc->rdma); > + g_free(rioc->rdma); > + rioc->rdma = NULL; > } > - g_free(r); > return 0; > } > > @@ -2694,8 +2850,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void > *opaque, > ram_addr_t block_offset, ram_addr_t offset, > size_t size, uint64_t *bytes_sent) > { > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > int ret; > > CHECK_ERROR_STATE(); > @@ -2949,8 +3105,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, > void *opaque) > }; > RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, > .repeat = 1 }; > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > RDMALocalBlocks *local = &rdma->local_ram_blocks; > RDMAControlHeader head; > RDMARegister *reg, *registers; > @@ -3205,9 +3361,10 @@ out: > * We've already built our local RAMBlock list, but not yet sent the list to > * the source. > */ > -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char > *name) > +static int > +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) > { > - RDMAContext *rdma = rfile->rdma; > + RDMAContext *rdma = rioc->rdma; > int curr; > int found = -1; > > @@ -3249,8 +3406,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, > uint64_t flags, void *data) > static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, > uint64_t flags, void *data) > { > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > > CHECK_ERROR_STATE(); > > @@ -3269,8 +3426,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, > void *opaque, > uint64_t flags, void *data) > { > Error *local_err = NULL, **errp = &local_err; > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > RDMAControlHeader head = { .len = 0, .repeat = 1 }; > int ret = 0; > > @@ -3366,55 +3523,74 @@ err: > return ret; > } > > -static int qemu_rdma_get_fd(void *opaque) > -{ > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > - > - return rdma->comp_channel->fd; > -} > - > -static const QEMUFileOps rdma_read_ops = { > - .get_buffer = qemu_rdma_get_buffer, > - .get_fd = qemu_rdma_get_fd, > - .close = qemu_rdma_close, > -}; > - > static const QEMUFileHooks rdma_read_hooks = { > .hook_ram_load = rdma_load_hook, > }; > > -static const QEMUFileOps rdma_write_ops = { > - .put_buffer = qemu_rdma_put_buffer, > - .close = qemu_rdma_close, > -}; > - > static const QEMUFileHooks rdma_write_hooks = { > .before_ram_iterate = qemu_rdma_registration_start, > .after_ram_iterate = qemu_rdma_registration_stop, > .save_page = qemu_rdma_save_page, > }; > > -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) > + > +static void qio_channel_rdma_finalize(Object *obj) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); > + if (rioc->rdma) { > + qemu_rdma_cleanup(rioc->rdma); > + g_free(rioc->rdma); > + rioc->rdma = NULL; > + } > +} > + > +static void qio_channel_rdma_class_init(ObjectClass *klass, > + void *class_data G_GNUC_UNUSED) > +{ > + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); > + > + ioc_klass->io_writev = qio_channel_rdma_writev; > + ioc_klass->io_readv = qio_channel_rdma_readv; > + ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; > + ioc_klass->io_close = qio_channel_rdma_close; > + ioc_klass->io_create_watch = qio_channel_rdma_create_watch; > +} > + > +static const TypeInfo qio_channel_rdma_info = { > + .parent = TYPE_QIO_CHANNEL, > + .name = TYPE_QIO_CHANNEL_RDMA, > + .instance_size = sizeof(QIOChannelRDMA), > + .instance_finalize = qio_channel_rdma_finalize, > + .class_init = qio_channel_rdma_class_init, > +}; > + > +static void qio_channel_rdma_register_types(void) > +{ > + type_register_static(&qio_channel_rdma_info); > +} > + > +type_init(qio_channel_rdma_register_types); > + > +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) > { > - QEMUFileRDMA *r; > + QIOChannelRDMA *rioc; > > if (qemu_file_mode_is_not_valid(mode)) { > return NULL; > } > > - r = g_new0(QEMUFileRDMA, 1); > - r->rdma = rdma; > + rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); > + rioc->rdma = rdma; > > if (mode[0] == 'w') { > - r->file = qemu_fopen_ops(r, &rdma_write_ops); > - qemu_file_set_hooks(r->file, &rdma_write_hooks); > + rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); > + qemu_file_set_hooks(rioc->file, &rdma_write_hooks); > } else { > - r->file = qemu_fopen_ops(r, &rdma_read_ops); > - qemu_file_set_hooks(r->file, &rdma_read_hooks); > + rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); > + qemu_file_set_hooks(rioc->file, &rdma_read_hooks); > } > > - return r->file; > + return rioc->file; > } > > static void rdma_accept_incoming_migration(void *opaque) > -- > 2.5.0 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK