> > +static coroutine_fn void vu_client_next_trip(VuClient *client); > > + > > +static coroutine_fn void vu_client_trip(void *opaque) > > +{ > > + VuClient *client = opaque; > > + > > + vu_dispatch(&client->parent); > > + client->co_trip = NULL; > > + if (!client->closed) { > > + vu_client_next_trip(client); > > + } > > +}
> > The last part is very untypical coroutine code: It says that we want to spawn a new coroutine with vu_client_trip() as its entry point, and then terminates the current one. > > Why don't we just put the whole thing in a while (!client->closed) loop and stay in the same coroutine instead of terminating the old one and starting a new one all the time? > > +static coroutine_fn void vu_client_next_trip(VuClient *client) > > +{ > > + if (!client->co_trip) { > > + client->co_trip = qemu_coroutine_create(vu_client_trip, client); > > + aio_co_schedule(client->ioc->ctx, client->co_trip); > > + } > > +} > > + > > +static void vu_client_start(VuClient *client) > > +{ > > + client->co_trip = qemu_coroutine_create(vu_client_trip, client); > > + aio_co_enter(client->ioc->ctx, client->co_trip); > > +} > This is essentially a duplicate of vu_client_next_trip(). The only place where it is called (vu_accept()) knows that client->co_trip is already NULL, so it could just call vu_client_next_trip(). > Or in fact, if vu_client_trip() gets turned into a loop, it's > vu_client_next_trip() that becomes unnecessary. This part of code is an imitation of nbd_client_trip in nbd/server.c. I think the reason to repeatedly create/start/terminate vu_client_trip is to support BlockBackendAioNotifier. In v5, I will keep running the spawned coroutine in a loop until being informed of the change of AioContext of the block device backend, i.e. vu_client_trip will only be restarted when the block device backend is attached to a different AiOContext. > > + if (rc != sizeof(eventfd_t)) { > > + if (errno == EAGAIN) { > > + qio_channel_yield(data->ioc, G_IO_IN); > > + } else if (errno != EINTR) { > > + data->co = NULL; > > + return; > > + } > > + } else { > > + vq->handler(dev, index); > > + } > > + data->co = NULL; > > + vu_kick_cb_next(client, data); > This can be a loop, too, instead of terminating the coroutine and starting a new one for the same function. In v5, I plan to use aio_set_fd_handler to set a read hander which is a wrapper for vu_kick_cb to deal with kick events since eventfd doesn't have the short read issue like socket. Thus vu_kick_cb in libvhost-user can be re-used. My only concern is if this could lead to worse performance in comparison to keep reading from eventfd until getting EAGAIN errno. On Tue, Feb 25, 2020 at 11:44 PM Kevin Wolf <kw...@redhat.com> wrote: > > Am 18.02.2020 um 06:07 hat Coiby Xu geschrieben: > > Sharing QEMU devices via vhost-user protocol > > > > Signed-off-by: Coiby Xu <coiby...@gmail.com> > > --- > > util/Makefile.objs | 3 + > > util/vhost-user-server.c | 427 +++++++++++++++++++++++++++++++++++++++ > > util/vhost-user-server.h | 56 +++++ > > 3 files changed, 486 insertions(+) > > create mode 100644 util/vhost-user-server.c > > create mode 100644 util/vhost-user-server.h > > > > diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h > > new file mode 100644 > > index 0000000000..ff6d3145cd > > --- /dev/null > > +++ b/util/vhost-user-server.h > > @@ -0,0 +1,56 @@ > > +#include "io/channel-socket.h" > > +#include "io/channel-file.h" > > +#include "io/net-listener.h" > > +#include "contrib/libvhost-user/libvhost-user.h" > > +#include "standard-headers/linux/virtio_blk.h" > > +#include "qemu/error-report.h" > > + > > +typedef struct VuClient VuClient; > > I find the terminology a bit confusing here: VuClient is really the > connection to a single client, but it's part of the server. The name > gives the impression as if this were client-side code. (This is > something that already tends to confuse me in the NBD code.) > > I'm not sure what a better name could be, though. Maybe > VuServerConnevtion or VuExportClient or VuExportConnection? > > > +typedef struct VuServer { > > + QIONetListener *listener; > > + AioContext *ctx; > > + QTAILQ_HEAD(, VuClient) clients; > > + void (*device_panic_notifier)(struct VuClient *client) ; > > + int max_queues; > > + const VuDevIface *vu_iface; > > + /* > > + * @ptr_in_device: VuServer pointer memory location in vhost-user > > device > > + * struct, so later container_of can be used to get device destruct > > + */ > > + void *ptr_in_device; > > + bool close; > > +} VuServer; > > + > > +typedef struct kick_info { > > + VuDev *vu_dev; > > I suppose this could specifically be VuClient? > > > + int fd; /*kick fd*/ > > + long index; /*queue index*/ > > + QIOChannel *ioc; /*I/O channel for kick fd*/ > > + QIOChannelFile *fioc; /*underlying data channel for kick fd*/ > > + Coroutine *co; > > +} kick_info; > > + > > +struct VuClient { > > + VuDev parent; > > + VuServer *server; > > + QIOChannel *ioc; /* The current I/O channel */ > > + QIOChannelSocket *sioc; /* The underlying data channel */ > > + Coroutine *co_trip; > > + struct kick_info *kick_info; > > If each struct kick_info (btw, QEMU coding style requires CamelCase) has > exactly one VuClient and each VuClient has exactly on kick_info, should > this be a single struct containing both? > > [ Coming back from reading the code below - it's because this is in > fact an array. This should be made clear in the definition. ] > > > + QTAILQ_ENTRY(VuClient) next; > > + bool closed; > > +}; > > + > > + > > +VuServer *vhost_user_server_start(uint16_t max_queues, > > + SocketAddress *unix_socket, > > + AioContext *ctx, > > + void *server_ptr, > > + void *device_panic_notifier, > > + const VuDevIface *vu_iface, > > + Error **errp); > > + > > +void vhost_user_server_stop(VuServer *server); > > + > > +void change_vu_context(AioContext *ctx, VuServer *server); > > Let's call this vhost_user_server_set_aio_context() for consistency. > > > diff --git a/util/Makefile.objs b/util/Makefile.objs > > index 11262aafaf..5e450e501c 100644 > > --- a/util/Makefile.objs > > +++ b/util/Makefile.objs > > @@ -36,6 +36,9 @@ util-obj-y += readline.o > > util-obj-y += rcu.o > > util-obj-$(CONFIG_MEMBARRIER) += sys_membarrier.o > > util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o > > +ifdef CONFIG_LINUX > > +util-obj-y += vhost-user-server.o > > +endif > > util-obj-y += qemu-coroutine-sleep.o > > util-obj-y += qemu-co-shared-resource.o > > util-obj-y += coroutine-$(CONFIG_COROUTINE_BACKEND).o > > diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c > > new file mode 100644 > > index 0000000000..70ff6d6701 > > --- /dev/null > > +++ b/util/vhost-user-server.c > > @@ -0,0 +1,427 @@ > > +/* > > + * Sharing QEMU devices via vhost-user protocol > > + * > > + * Author: Coiby Xu <coiby...@gmail.com> > > + * > > + * This work is licensed under the terms of the GNU GPL, version 2 or > > + * later. See the COPYING file in the top-level directory. > > + */ > > +#include "qemu/osdep.h" > > +#include <sys/eventfd.h> > > +#include "qemu/main-loop.h" > > +#include "vhost-user-server.h" > > + > > +static void vmsg_close_fds(VhostUserMsg *vmsg) > > +{ > > + int i; > > + for (i = 0; i < vmsg->fd_num; i++) { > > + close(vmsg->fds[i]); > > + } > > +} > > + > > +static void vmsg_unblock_fds(VhostUserMsg *vmsg) > > +{ > > + int i; > > + for (i = 0; i < vmsg->fd_num; i++) { > > + qemu_set_nonblock(vmsg->fds[i]); > > + } > > +} > > + > > + > > +static void close_client(VuClient *client) > > +{ > > + vu_deinit(&client->parent); > > + client->sioc = NULL; > > + object_unref(OBJECT(client->ioc)); > > + client->closed = true; > > + > > +} > > + > > +static void panic_cb(VuDev *vu_dev, const char *buf) > > +{ > > + if (buf) { > > + error_report("vu_panic: %s", buf); > > + } > > + > > + VuClient *client = container_of(vu_dev, VuClient, parent); > > + VuServer *server = client->server; > > Please put declarations at the start of the block. > > > + if (!client->closed) { > > + close_client(client); > > + QTAILQ_REMOVE(&server->clients, client, next); > > + } > > + > > + if (server->device_panic_notifier) { > > + server->device_panic_notifier(client); > > + } > > +} > > + > > + > > + > > +static bool coroutine_fn > > +vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) > > +{ > > + struct iovec iov = { > > + .iov_base = (char *)vmsg, > > + .iov_len = VHOST_USER_HDR_SIZE, > > + }; > > + int rc, read_bytes = 0; > > + /* > > + * VhostUserMsg is a packed structure, gcc will complain about passing > > + * pointer to a packed structure member if we pass &VhostUserMsg.fd_num > > + * and &VhostUserMsg.fds directly when calling qio_channel_readv_full, > > + * thus two temporary variables nfds and fds are used here. > > + */ > > + size_t nfds = 0, nfds_t = 0; > > + int *fds = NULL, *fds_t = NULL; > > + VuClient *client = container_of(vu_dev, VuClient, parent); > > + QIOChannel *ioc = client->ioc; > > + > > + Error *erp; > > The convention is to call this local_err. It should be initialised as > NULL. > > > + assert(qemu_in_coroutine()); > > + do { > > + /* > > + * qio_channel_readv_full may have short reads, keeping calling it > > + * until getting VHOST_USER_HDR_SIZE or 0 bytes in total > > + */ > > + rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, &erp); > > + if (rc < 0) { > > + if (rc == QIO_CHANNEL_ERR_BLOCK) { > > + qio_channel_yield(ioc, G_IO_IN); > > + continue; > > + } else { > > + error_report("Error while recvmsg: %s", strerror(errno)); > > I don't think, qio_channel_*() promise anything about the value in > errno. (They also don't promise to use recvmsg().) > > Instead, use error_report_err() because erp contains the real error > message. > > > + return false; > > + } > > + } > > + read_bytes += rc; > > + fds = g_renew(int, fds_t, nfds + nfds_t); > > + memcpy(fds + nfds, fds_t, nfds_t); > > + nfds += nfds_t; > > + if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) { > > + break; > > + } > > + } while (true); > > + > > + vmsg->fd_num = nfds; > > + memcpy(vmsg->fds, fds, nfds * sizeof(int)); > > + g_free(fds); > > + /* qio_channel_readv_full will make socket fds blocking, unblock them > > */ > > + vmsg_unblock_fds(vmsg); > > + if (vmsg->size > sizeof(vmsg->payload)) { > > + error_report("Error: too big message request: %d, " > > + "size: vmsg->size: %u, " > > + "while sizeof(vmsg->payload) = %zu", > > + vmsg->request, vmsg->size, sizeof(vmsg->payload)); > > + goto fail; > > + } > > + > > + struct iovec iov_payload = { > > + .iov_base = (char *)&vmsg->payload, > > + .iov_len = vmsg->size, > > + }; > > + if (vmsg->size) { > > + rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &erp); > > + if (rc == -1) { > > + error_report("Error while reading: %s", strerror(errno)); > > error_report_err() again. > > > + goto fail; > > + } > > + } > > + > > + return true; > > + > > +fail: > > + vmsg_close_fds(vmsg); > > + > > + return false; > > +} > > + > > + > > +static coroutine_fn void vu_client_next_trip(VuClient *client); > > + > > +static coroutine_fn void vu_client_trip(void *opaque) > > +{ > > + VuClient *client = opaque; > > + > > + vu_dispatch(&client->parent); > > + client->co_trip = NULL; > > + if (!client->closed) { > > + vu_client_next_trip(client); > > + } > > +} > > The last part is very untypical coroutine code: It says that we want to > spawn a new coroutine with vu_client_trip() as its entry point, and then > terminates the current one. > > Why don't we just put the whole thing in a while (!client->closed) loop > and stay in the same coroutine instead of terminating the old one and > starting a new one all the time? > > > +static coroutine_fn void vu_client_next_trip(VuClient *client) > > +{ > > + if (!client->co_trip) { > > + client->co_trip = qemu_coroutine_create(vu_client_trip, client); > > + aio_co_schedule(client->ioc->ctx, client->co_trip); > > + } > > +} > > + > > +static void vu_client_start(VuClient *client) > > +{ > > + client->co_trip = qemu_coroutine_create(vu_client_trip, client); > > + aio_co_enter(client->ioc->ctx, client->co_trip); > > +} > > This is essentially a duplicate of vu_client_next_trip(). The only > place where it is called (vu_accept()) knows that client->co_trip is > already NULL, so it could just call vu_client_next_trip(). > > Or in fact, if vu_client_trip() gets turned into a loop, it's > vu_client_next_trip() that becomes unnecessary. > > > +static void coroutine_fn vu_kick_cb_next(VuClient *client, > > + kick_info *data); > > + > > +static void coroutine_fn vu_kick_cb(void *opaque) > > +{ > > + kick_info *data = (kick_info *) opaque; > > + int index = data->index; > > + VuDev *dev = data->vu_dev; > > + VuClient *client; > > + client = container_of(dev, VuClient, parent); > > + VuVirtq *vq = &dev->vq[index]; > > + int sock = vq->kick_fd; > > + if (sock == -1) { > > + return; > > + } > > + assert(sock == data->fd); > > + eventfd_t kick_data; > > + ssize_t rc; > > + /* > > + * When eventfd is closed, the revent is POLLNVAL (=G_IO_NVAL) and > > + * reading eventfd will return errno=EBADF (Bad file number). > > + * Calling qio_channel_yield(ioc, G_IO_IN) will set reading handler > > + * for QIOChannel, but aio_dispatch_handlers will only dispatch > > + * G_IO_IN | G_IO_HUP | G_IO_ERR revents while ignoring > > + * G_IO_NVAL (POLLNVAL) revents. > > + * > > + * Thus when eventfd is closed by vhost-user client, QEMU will ignore > > + * G_IO_NVAL and keeping polling by repeatedly calling qemu_poll_ns > > which > > + * will lead to 100% CPU usage. > > + * > > + * To aovid this issue, make sure set_watch and remove_watch use the > > same > > s/aovid/avoid/ > > > + * AIOContext for QIOChannel. Thus remove_watch will eventually > > succefully > > + * remove eventfd from the set of file descriptors polled for > > + * corresponding GSource. > > + */ > > + rc = read(sock, &kick_data, sizeof(eventfd_t)); > > Why not a QIOChannel function like for vu_message_read() above? > > > + if (rc != sizeof(eventfd_t)) { > > + if (errno == EAGAIN) { > > + qio_channel_yield(data->ioc, G_IO_IN); > > + } else if (errno != EINTR) { > > + data->co = NULL; > > + return; > > + } > > + } else { > > + vq->handler(dev, index); > > + } > > + data->co = NULL; > > + vu_kick_cb_next(client, data); > > This can be a loop, too, instead of terminating the coroutine and > starting a new one for the same function. > > > +} > > + > > +static void coroutine_fn vu_kick_cb_next(VuClient *client, > > + kick_info *cb_data) > > +{ > > + if (!cb_data->co) { > > + cb_data->co = qemu_coroutine_create(vu_kick_cb, cb_data); > > + aio_co_schedule(client->ioc->ctx, cb_data->co); > > + } > > +} > > Kevin > -- Best regards, Coiby