> > + unsigned int num_reqs; > > + QemuMutex num_reqs_lock; > > OK the only reason this lock is needed is because > you want to drain outside the thread. > Won't it be better to queue process the drain request through > the thread? > You won't need any locks then.
Draining is processed in the thread. This lock is only needed to use it together with no_reqs_cond, because userspace threads do not have something like wait_event. Direct usage of futexes would let you remove the lock, but it's not portable. Paolo > > + QemuCond no_reqs_cond; > > +}; > > + > > +/* Raise an interrupt to signal guest, if necessary */ > > +static void notify_guest(VirtIOBlockDataPlane *s) > > +{ > > + if (!vring_should_notify(s->vdev, &s->vring)) { > > + return; > > + } > > + > > + event_notifier_set(s->guest_notifier); > > +} > > + > > +static void complete_request(struct iocb *iocb, ssize_t ret, void > > *opaque) > > +{ > > + VirtIOBlockDataPlane *s = opaque; > > + VirtIOBlockRequest *req = container_of(iocb, > > VirtIOBlockRequest, iocb); > > + struct virtio_blk_inhdr hdr; > > + int len; > > + > > + if (likely(ret >= 0)) { > > + hdr.status = VIRTIO_BLK_S_OK; > > + len = ret; > > + } else { > > + hdr.status = VIRTIO_BLK_S_IOERR; > > + len = 0; > > + } > > + > > + trace_virtio_blk_data_plane_complete_request(s, req->head, > > ret); > > + > > + qemu_iovec_from_buf(req->inhdr, 0, &hdr, sizeof(hdr)); > > + qemu_iovec_destroy(req->inhdr); > > + g_slice_free(QEMUIOVector, req->inhdr); > > + > > + /* According to the virtio specification len should be the > > number of bytes > > + * written to, but for virtio-blk it seems to be the number of > > bytes > > + * transferred plus the status bytes. > > + */ > > + vring_push(&s->vring, req->head, len + sizeof(hdr)); > > + > > + qemu_mutex_lock(&s->num_reqs_lock); > > + if (--s->num_reqs == 0) { > > + qemu_cond_broadcast(&s->no_reqs_cond); > > + } > > + qemu_mutex_unlock(&s->num_reqs_lock); > > +} > > + > > +static void fail_request_early(VirtIOBlockDataPlane *s, unsigned > > int head, > > + QEMUIOVector *inhdr, unsigned char > > status) > > +{ > > + struct virtio_blk_inhdr hdr = { > > + .status = status, > > + }; > > + > > + qemu_iovec_from_buf(inhdr, 0, &hdr, sizeof(hdr)); > > + qemu_iovec_destroy(inhdr); > > + g_slice_free(QEMUIOVector, inhdr); > > + > > + vring_push(&s->vring, head, sizeof(hdr)); > > + notify_guest(s); > > +} > > + > > +static int process_request(IOQueue *ioq, struct iovec iov[], > > + unsigned int out_num, unsigned int > > in_num, > > + unsigned int head) > > +{ > > + VirtIOBlockDataPlane *s = container_of(ioq, > > VirtIOBlockDataPlane, ioqueue); > > + struct iovec *in_iov = &iov[out_num]; > > + struct virtio_blk_outhdr outhdr; > > + QEMUIOVector *inhdr; > > + size_t in_size; > > + > > + /* Copy in outhdr */ > > + if (unlikely(iov_to_buf(iov, out_num, 0, &outhdr, > > + sizeof(outhdr)) != sizeof(outhdr))) { > > + error_report("virtio-blk request outhdr too short"); > > + return -EFAULT; > > + } > > + iov_discard(&iov, &out_num, sizeof(outhdr)); > > + > > + /* Grab inhdr for later */ > > + in_size = iov_size(in_iov, in_num); > > + if (in_size < sizeof(struct virtio_blk_inhdr)) { > > + error_report("virtio_blk request inhdr too short"); > > + return -EFAULT; > > + } > > + inhdr = g_slice_new(QEMUIOVector); > > + qemu_iovec_init(inhdr, 1); > > + qemu_iovec_concat_iov(inhdr, in_iov, in_num, > > + in_size - sizeof(struct virtio_blk_inhdr), > > + sizeof(struct virtio_blk_inhdr)); > > + iov_discard(&in_iov, &in_num, -sizeof(struct > > virtio_blk_inhdr)); > > + > > + /* TODO Linux sets the barrier bit even when not advertised! > > */ > > + outhdr.type &= ~VIRTIO_BLK_T_BARRIER; > > + > > + struct iocb *iocb; > > + switch (outhdr.type & (VIRTIO_BLK_T_OUT | > > VIRTIO_BLK_T_SCSI_CMD | > > + VIRTIO_BLK_T_FLUSH)) { > > + case VIRTIO_BLK_T_IN: > > + iocb = ioq_rdwr(ioq, true, in_iov, in_num, outhdr.sector * > > 512); > > + break; > > + > > + case VIRTIO_BLK_T_OUT: > > + iocb = ioq_rdwr(ioq, false, iov, out_num, outhdr.sector * > > 512); > > + break; > > + > > + case VIRTIO_BLK_T_SCSI_CMD: > > + /* TODO support SCSI commands */ > > + fail_request_early(s, head, inhdr, VIRTIO_BLK_S_UNSUPP); > > + return 0; > > + > > + case VIRTIO_BLK_T_FLUSH: > > + /* TODO fdsync not supported by Linux AIO, do it > > synchronously here! */ > > + fdatasync(s->fd); > > + fail_request_early(s, head, inhdr, VIRTIO_BLK_S_OK); > > + return 0; > > + > > + default: > > + error_report("virtio-blk unsupported request type %#x", > > outhdr.type); > > + qemu_iovec_destroy(inhdr); > > + g_slice_free(QEMUIOVector, inhdr); > > + return -EFAULT; > > + } > > + > > + /* Fill in virtio block metadata needed for completion */ > > + VirtIOBlockRequest *req = container_of(iocb, > > VirtIOBlockRequest, iocb); > > + req->head = head; > > + req->inhdr = inhdr; > > + return 0; > > +} > > + > > +static bool handle_notify(EventHandler *handler) > > +{ > > + VirtIOBlockDataPlane *s = container_of(handler, > > VirtIOBlockDataPlane, > > + notify_handler); > > + > > + /* There is one array of iovecs into which all new requests > > are extracted > > + * from the vring. Requests are read from the vring and the > > translated > > + * descriptors are written to the iovecs array. The iovecs do > > not have to > > + * persist across handle_notify() calls because the kernel > > copies the > > + * iovecs on io_submit(). > > + * > > + * Handling io_submit() EAGAIN may require storing the > > requests across > > + * handle_notify() calls until the kernel has sufficient > > resources to > > + * accept more I/O. This is not implemented yet. > > + */ > > + struct iovec iovec[VRING_MAX]; > > + struct iovec *end = &iovec[VRING_MAX]; > > + struct iovec *iov = iovec; > > + > > + /* When a request is read from the vring, the index of the > > first descriptor > > + * (aka head) is returned so that the completed request can be > > pushed onto > > + * the vring later. > > + * > > + * The number of hypervisor read-only iovecs is out_num. The > > number of > > + * hypervisor write-only iovecs is in_num. > > + */ > > + int head; > > + unsigned int out_num = 0, in_num = 0; > > + unsigned int num_queued; > > + > > + for (;;) { > > + /* Disable guest->host notifies to avoid unnecessary > > vmexits */ > > + vring_set_notification(s->vdev, &s->vring, false); > > + > > + for (;;) { > > + head = vring_pop(s->vdev, &s->vring, iov, end, > > &out_num, &in_num); > > + if (head < 0) { > > + break; /* no more requests */ > > + } > > + > > + trace_virtio_blk_data_plane_process_request(s, > > out_num, in_num, > > + head); > > + > > + if (process_request(&s->ioqueue, iov, out_num, in_num, > > head) < 0) { > > + vring_set_broken(&s->vring); > > + break; > > + } > > + iov += out_num + in_num; > > + } > > + > > + if (likely(head == -EAGAIN)) { /* vring emptied */ > > + /* Re-enable guest->host notifies and stop processing > > the vring. > > + * But if the guest has snuck in more descriptors, > > keep processing. > > + */ > > + vring_set_notification(s->vdev, &s->vring, true); > > + smp_mb(); > > + if (!vring_more_avail(&s->vring)) { > > + break; > > + } > > + } else { /* head == -ENOBUFS or fatal error, iovecs[] is > > depleted */ > > + /* Since there are no iovecs[] left, stop processing > > for now. Do > > + * not re-enable guest->host notifies since the I/O > > completion > > + * handler knows to check for more vring descriptors > > anyway. > > + */ > > + break; > > + } > > + } > > + > > + num_queued = ioq_num_queued(&s->ioqueue); > > + if (num_queued > 0) { > > + qemu_mutex_lock(&s->num_reqs_lock); > > + s->num_reqs += num_queued; > > + qemu_mutex_unlock(&s->num_reqs_lock); > > + > > + int rc = ioq_submit(&s->ioqueue); > > + if (unlikely(rc < 0)) { > > + fprintf(stderr, "ioq_submit failed %d\n", rc); > > + exit(1); > > + } > > + } > > + return true; > > +} > > + > > +static bool handle_io(EventHandler *handler) > > +{ > > + VirtIOBlockDataPlane *s = container_of(handler, > > VirtIOBlockDataPlane, > > + io_handler); > > + > > + if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) > > { > > + notify_guest(s); > > + } > > + > > + /* If there were more requests than iovecs, the vring will not > > be empty yet > > + * so check again. There should now be enough resources to > > process more > > + * requests. > > + */ > > + if (unlikely(vring_more_avail(&s->vring))) { > > + return handle_notify(&s->notify_handler); > > + } > > + > > + return true; > > +} > > + > > +static void *data_plane_thread(void *opaque) > > +{ > > + VirtIOBlockDataPlane *s = opaque; > > + event_poll_run(&s->event_poll); > > + return NULL; > > +} > > + > > +static void start_data_plane_bh(void *opaque) > > +{ > > + VirtIOBlockDataPlane *s = opaque; > > + > > + qemu_bh_delete(s->start_bh); > > + s->start_bh = NULL; > > + qemu_thread_create(&s->thread, data_plane_thread, > > + s, QEMU_THREAD_JOINABLE); > > +} > > + > > +VirtIOBlockDataPlane *virtio_blk_data_plane_create(VirtIODevice > > *vdev, int fd) > > +{ > > + VirtIOBlockDataPlane *s; > > + > > + s = g_new0(VirtIOBlockDataPlane, 1); > > + s->vdev = vdev; > > + s->fd = fd; > > + return s; > > +} > > + > > +void virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s) > > +{ > > + if (!s) { > > + return; > > + } > > + virtio_blk_data_plane_stop(s); > > + g_free(s); > > +} > > + > > +/* Block until pending requests have completed > > + * > > + * The vring continues to be serviced so ensure no new requests > > will be added > > + * to avoid races. > > This comment confuses me. "avoid races" is a kind of vague > comment that does not really help. > > This function does not seem to ensure > no new requests - it simply waits until num requests > gets to 0. But requests could get added right afterwards > and it won't help. > > Could be comment be made more clear please? > > > + */ > > +void virtio_blk_data_plane_drain(VirtIOBlockDataPlane *s) > > +{ > > + qemu_mutex_lock(&s->num_reqs_lock); > > + while (s->num_reqs > 0) { > > + qemu_cond_wait(&s->no_reqs_cond, &s->num_reqs_lock); > > + } > > + qemu_mutex_unlock(&s->num_reqs_lock); > > +} > > + > > +void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s) > > +{ > > + VirtQueue *vq; > > + int i; > > + > > + if (s->started) { > > + return; > > + } > > + > > + vq = virtio_get_queue(s->vdev, 0); > > + if (!vring_setup(&s->vring, s->vdev, 0)) { > > + return; > > + } > > + > > + event_poll_init(&s->event_poll); > > + > > + /* Set up guest notifier (irq) */ > > + if > > (s->vdev->binding->set_guest_notifiers(s->vdev->binding_opaque, > > + true) != 0) { > > + fprintf(stderr, "virtio-blk failed to set guest notifier, > > " > > + "ensure -enable-kvm is set\n"); > > + exit(1); > > + } > > + s->guest_notifier = virtio_queue_get_guest_notifier(vq); > > + > > + /* Set up virtqueue notify */ > > + if > > (s->vdev->binding->set_host_notifier(s->vdev->binding_opaque, > > + 0, true) != 0) { > > + fprintf(stderr, "virtio-blk failed to set host > > notifier\n"); > > + exit(1); > > + } > > + event_poll_add(&s->event_poll, &s->notify_handler, > > + virtio_queue_get_host_notifier(vq), > > + handle_notify); > > + > > + /* Set up ioqueue */ > > + ioq_init(&s->ioqueue, s->fd, REQ_MAX); > > + for (i = 0; i < ARRAY_SIZE(s->requests); i++) { > > + ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb); > > + } > > + event_poll_add(&s->event_poll, &s->io_handler, > > + ioq_get_notifier(&s->ioqueue), handle_io); > > + > > + s->started = true; > > + trace_virtio_blk_data_plane_start(s); > > + > > + /* Kick right away to begin processing requests already in > > vring */ > > + event_notifier_set(virtio_queue_get_host_notifier(vq)); > > + > > + /* Spawn thread in BH so it inherits iothread cpusets */ > > + s->start_bh = qemu_bh_new(start_data_plane_bh, s); > > + qemu_bh_schedule(s->start_bh); > > +} > > + > > +void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s) > > +{ > > + if (!s->started) { > > + return; > > + } > > + s->started = false; > > + trace_virtio_blk_data_plane_stop(s); > > + > > + /* Stop thread or cancel pending thread creation BH */ > > + if (s->start_bh) { > > + qemu_bh_delete(s->start_bh); > > + s->start_bh = NULL; > > + } else { > > + virtio_blk_data_plane_drain(s); > > + event_poll_stop(&s->event_poll); > > + qemu_thread_join(&s->thread); > > + } > > + > > + ioq_cleanup(&s->ioqueue); > > + > > + s->vdev->binding->set_host_notifier(s->vdev->binding_opaque, > > 0, false); > > + > > + event_poll_cleanup(&s->event_poll); > > + > > + /* Clean up guest notifier (irq) */ > > + s->vdev->binding->set_guest_notifiers(s->vdev->binding_opaque, > > false); > > + > > + vring_teardown(&s->vring); > > +} > > diff --git a/hw/dataplane/virtio-blk.h b/hw/dataplane/virtio-blk.h > > new file mode 100644 > > index 0000000..ddf1115 > > --- /dev/null > > +++ b/hw/dataplane/virtio-blk.h > > @@ -0,0 +1,41 @@ > > +/* > > + * Dedicated thread for virtio-blk I/O processing > > + * > > + * Copyright 2012 IBM, Corp. > > + * Copyright 2012 Red Hat, Inc. and/or its affiliates > > + * > > + * Authors: > > + * Stefan Hajnoczi <stefa...@redhat.com> > > + * > > + * This work is licensed under the terms of the GNU GPL, version 2 > > or later. > > + * See the COPYING file in the top-level directory. > > + * > > + */ > > + > > +#ifndef HW_DATAPLANE_VIRTIO_BLK_H > > +#define HW_DATAPLANE_VIRTIO_BLK_H > > + > > +#include "hw/virtio.h" > > + > > +typedef struct VirtIOBlockDataPlane VirtIOBlockDataPlane; > > + > > +#ifdef CONFIG_VIRTIO_BLK_DATA_PLANE > > +VirtIOBlockDataPlane *virtio_blk_data_plane_create(VirtIODevice > > *vdev, int fd); > > +void virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s); > > +void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s); > > +void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s); > > +void virtio_blk_data_plane_drain(VirtIOBlockDataPlane *s); > > +#else > > +static inline VirtIOBlockDataPlane *virtio_blk_data_plane_create( > > + VirtIODevice *vdev, int fd) > > +{ > > + return NULL; > > +} > > + > > +static inline void > > virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s) {} > > +static inline void > > virtio_blk_data_plane_start(VirtIOBlockDataPlane *s) {} > > +static inline void virtio_blk_data_plane_stop(VirtIOBlockDataPlane > > *s) {} > > +static inline void > > virtio_blk_data_plane_drain(VirtIOBlockDataPlane *s) {} > > +#endif > > + > > +#endif /* HW_DATAPLANE_VIRTIO_BLK_H */ > > diff --git a/trace-events b/trace-events > > index a9a791b..1edc2ae 100644 > > --- a/trace-events > > +++ b/trace-events > > @@ -98,6 +98,12 @@ virtio_blk_rw_complete(void *req, int ret) "req > > %p ret %d" > > virtio_blk_handle_write(void *req, uint64_t sector, size_t > > nsectors) "req %p sector %"PRIu64" nsectors %zu" > > virtio_blk_handle_read(void *req, uint64_t sector, size_t > > nsectors) "req %p sector %"PRIu64" nsectors %zu" > > > > +# hw/dataplane/virtio-blk.c > > +virtio_blk_data_plane_start(void *s) "dataplane %p" > > +virtio_blk_data_plane_stop(void *s) "dataplane %p" > > +virtio_blk_data_plane_process_request(void *s, unsigned int > > out_num, unsigned int in_num, unsigned int head) "dataplane %p > > out_num %u in_num %u head %u" > > +virtio_blk_data_plane_complete_request(void *s, unsigned int head, > > int ret) "dataplane %p head %u ret %d" > > + > > # hw/dataplane/vring.c > > vring_setup(uint64_t physical, void *desc, void *avail, void > > *used) "vring physical %#"PRIx64" desc %p avail %p used %p" > > > > -- > > 1.8.0 >