From: Stefan Hajnoczi <stefa...@redhat.com>

Send normal requests to the device and handle completions.

This is enough to get mount and basic I/O working.  The hiprio and
notifications queues still need to be implemented for full FUSE
functionality.

Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
---
 fs/fuse/fuse_i.h    |   3 +
 fs/fuse/virtio_fs.c | 529 +++++++++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 501 insertions(+), 31 deletions(-)

diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 60ebe3c2e2c3..3a91aa970566 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -452,6 +452,9 @@ struct fuse_req {
 
        /** Request is stolen from fuse_file->reserved_req */
        struct file *stolen_file;
+
+       /** virtio-fs's physically contiguous buffer for in and out args */
+       void *argbuf;
 };
 
 struct fuse_iqueue;
diff --git a/fs/fuse/virtio_fs.c b/fs/fuse/virtio_fs.c
index 8cdeb02f3778..fa99a31ee930 100644
--- a/fs/fuse/virtio_fs.c
+++ b/fs/fuse/virtio_fs.c
@@ -14,14 +14,35 @@
 static DEFINE_MUTEX(virtio_fs_mutex);
 static LIST_HEAD(virtio_fs_instances);
 
+/* Per-virtqueue state */
+struct virtio_fs_vq {
+       struct virtqueue *vq;     /* protected by fpq->lock */
+       struct work_struct done_work;
+       struct fuse_dev *fud;
+       char name[24];
+} ____cacheline_aligned_in_smp;
+
 /* A virtio-fs device instance */
 struct virtio_fs {
-       struct list_head list; /* on virtio_fs_instances */
+       struct list_head list;    /* on virtio_fs_instances */
        char *tag;
-       struct fuse_dev **fud; /* 1:1 mapping with request queues */
-       unsigned int num_queues;
+       struct virtio_fs_vq *vqs;
+       unsigned nvqs;            /* number of virtqueues */
+       unsigned num_queues;      /* number of request queues */
 };
 
+static inline struct virtio_fs_vq *vq_to_fsvq(struct virtqueue *vq)
+{
+       struct virtio_fs *fs = vq->vdev->priv;
+
+       return &fs->vqs[vq->index];
+}
+
+static inline struct fuse_pqueue *vq_to_fpq(struct virtqueue *vq)
+{
+       return &vq_to_fsvq(vq)->fud->pq;
+}
+
 /* Add a new instance to the list or return -EEXIST if tag name exists*/
 static int virtio_fs_add_instance(struct virtio_fs *fs)
 {
@@ -71,18 +92,17 @@ static void virtio_fs_free_devs(struct virtio_fs *fs)
 
        /* TODO lock */
 
-       if (!fs->fud)
-               return;
+       for (i = 0; i < fs->nvqs; i++) {
+               struct virtio_fs_vq *fsvq = &fs->vqs[i];
 
-       for (i = 0; i < fs->num_queues; i++) {
-               struct fuse_dev *fud = fs->fud[i];
+               if (!fsvq->fud)
+                       continue;
 
-               if (fud)
-                       fuse_dev_free(fud); /* TODO need to 
quiesce/end_requests/decrement dev_count */
-       }
+               flush_work(&fsvq->done_work);
 
-       kfree(fs->fud);
-       fs->fud = NULL;
+               fuse_dev_free(fsvq->fud); /* TODO need to 
quiesce/end_requests/decrement dev_count */
+               fsvq->fud = NULL;
+       }
 }
 
 /* Read filesystem name from virtio config into fs->tag (must kfree()). */
@@ -109,6 +129,210 @@ static int virtio_fs_read_tag(struct virtio_device *vdev, 
struct virtio_fs *fs)
        return 0;
 }
 
+static void virtio_fs_notifications_done(struct virtqueue *vq)
+{
+       /* TODO */
+       dev_dbg(&vq->vdev->dev, "%s\n", __func__);
+}
+
+static void virtio_fs_notifications_done_work(struct work_struct *work)
+{
+       return;
+}
+
+static void virtio_fs_hiprio_done(struct virtqueue *vq)
+{
+       /* TODO */
+       dev_dbg(&vq->vdev->dev, "%s\n", __func__);
+}
+
+/* Allocate and copy args into req->argbuf */
+static int copy_args_to_argbuf(struct fuse_req *req)
+{
+       unsigned offset = 0;
+       unsigned num_in;
+       unsigned num_out;
+       unsigned len;
+       unsigned i;
+
+       num_in = req->in.numargs - req->in.argpages;
+       num_out = req->out.numargs - req->out.argpages;
+       len = fuse_len_args(num_in, (struct fuse_arg *)req->in.args) +
+             fuse_len_args(num_out, req->out.args);
+
+       req->argbuf = kmalloc(len, GFP_ATOMIC);
+       if (!req->argbuf)
+               return -ENOMEM;
+
+       for (i = 0; i < num_in; i++) {
+               memcpy(req->argbuf + offset,
+                      req->in.args[i].value,
+                      req->in.args[i].size);
+               offset += req->in.args[i].size;
+       }
+
+       return 0;
+}
+
+/* Copy args out of and free req->argbuf */
+static void copy_args_from_argbuf(struct fuse_req *req)
+{
+       unsigned remaining;
+       unsigned offset;
+       unsigned num_in;
+       unsigned num_out;
+       unsigned i;
+
+       remaining = req->out.h.len - sizeof(req->out.h);
+       num_in = req->in.numargs - req->in.argpages;
+       num_out = req->out.numargs - req->out.argpages;
+       offset = fuse_len_args(num_in, (struct fuse_arg *)req->in.args);
+
+       for (i = 0; i < num_out; i++) {
+               unsigned argsize = req->out.args[i].size;
+
+               if (req->out.argvar &&
+                   i == req->out.numargs - 1 &&
+                   argsize > remaining) {
+                       argsize = remaining;
+               }
+
+               memcpy(req->out.args[i].value, req->argbuf + offset, argsize);
+               offset += argsize;
+
+               if (i != req->out.numargs - 1)
+                       remaining -= argsize;
+       }
+
+       /* Store the actual size of the variable-length arg */
+       if (req->out.argvar)
+               req->out.args[req->out.numargs - 1].size = remaining;
+
+       kfree(req->argbuf);
+       req->argbuf = NULL;
+}
+
+/* Work function for request completion */
+static void virtio_fs_requests_done_work(struct work_struct *work)
+{
+       struct virtio_fs_vq *fsvq = container_of(work, struct virtio_fs_vq,
+                                                done_work);
+       struct fuse_pqueue *fpq = &fsvq->fud->pq;
+       struct fuse_conn *fc = fsvq->fud->fc;
+       struct virtqueue *vq = fsvq->vq;
+       struct fuse_req *req;
+       struct fuse_req *next;
+       LIST_HEAD(reqs);
+
+       /* Collect completed requests off the virtqueue */
+       spin_lock(&fpq->lock);
+       do {
+               unsigned len;
+
+               virtqueue_disable_cb(vq);
+
+               while ((req = virtqueue_get_buf(vq, &len)) != NULL)
+                       list_move_tail(&req->list, &reqs);
+       } while (!virtqueue_enable_cb(vq) && likely(!virtqueue_is_broken(vq)));
+       spin_unlock(&fpq->lock);
+
+       /* End requests */
+       list_for_each_entry_safe(req, next, &reqs, list) {
+               /* TODO check unique */
+               /* TODO fuse_len_args(out) against oh.len */
+
+               copy_args_from_argbuf(req);
+
+               /* TODO zeroing? */
+
+               spin_lock(&fpq->lock);
+               clear_bit(FR_SENT, &req->flags);
+               list_del_init(&req->list);
+               spin_unlock(&fpq->lock);
+
+               fuse_request_end(fc, req);
+       }
+}
+
+/* Virtqueue interrupt handler */
+static void virtio_fs_vq_done(struct virtqueue *vq)
+{
+       struct virtio_fs_vq *fsvq = vq_to_fsvq(vq);
+
+       dev_dbg(&vq->vdev->dev, "%s %s\n", __func__, fsvq->name);
+
+       schedule_work(&fsvq->done_work);
+}
+
+/* Initialize virtqueues */
+static int virtio_fs_setup_vqs(struct virtio_device *vdev,
+                              struct virtio_fs *fs)
+{
+       struct virtqueue **vqs;
+       vq_callback_t **callbacks;
+       const char **names;
+       unsigned i;
+       int ret;
+
+       virtio_cread(vdev, struct virtio_fs_config, num_queues,
+                    &fs->num_queues);
+       if (fs->num_queues == 0)
+               return -EINVAL;
+
+       fs->nvqs = 2 + fs->num_queues;
+
+       fs->vqs = devm_kcalloc(&vdev->dev, fs->nvqs, sizeof(fs->vqs[0]),
+                              GFP_KERNEL);
+       if (!fs->vqs)
+               return -ENOMEM;
+
+       vqs = kmalloc_array(fs->nvqs, sizeof(vqs[0]), GFP_KERNEL);
+       callbacks = kmalloc_array(fs->nvqs, sizeof(callbacks[0]), GFP_KERNEL);
+       names = kmalloc_array(fs->nvqs, sizeof(names[0]), GFP_KERNEL);
+       if (!vqs || !callbacks || !names) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       callbacks[0] = virtio_fs_notifications_done;
+       snprintf(fs->vqs[0].name, sizeof(fs->vqs[0].name), "notifications");
+       INIT_WORK(&fs->vqs[0].done_work, virtio_fs_notifications_done_work);
+       names[0] = fs->vqs[0].name;
+
+       callbacks[1] = virtio_fs_vq_done;
+       snprintf(fs->vqs[1].name, sizeof(fs->vqs[1].name), "hiprio");
+       names[1] = fs->vqs[1].name;
+
+       /* Initialize the requests virtqueues */
+       for (i = 2; i < fs->nvqs; i++) {
+               INIT_WORK(&fs->vqs[i].done_work, virtio_fs_requests_done_work);
+               snprintf(fs->vqs[i].name, sizeof(fs->vqs[i].name),
+                        "requests.%u", i - 2);
+               callbacks[i] = virtio_fs_vq_done;
+               names[i] = fs->vqs[i].name;
+       }
+
+       ret = virtio_find_vqs(vdev, fs->nvqs, vqs, callbacks, names, NULL);
+       if (ret < 0)
+               goto out;
+
+       for (i = 0; i < fs->nvqs; i++)
+               fs->vqs[i].vq = vqs[i];
+
+out:
+       kfree(names);
+       kfree(callbacks);
+       kfree(vqs);
+       return ret;
+}
+
+/* Free virtqueues (device must already be reset) */
+static void virtio_fs_cleanup_vqs(struct virtio_device *vdev,
+                                 struct virtio_fs *fs)
+{
+       vdev->config->del_vqs(vdev);
+}
+
 static int virtio_fs_probe(struct virtio_device *vdev)
 {
        struct virtio_fs *fs;
@@ -119,23 +343,32 @@ static int virtio_fs_probe(struct virtio_device *vdev)
                return -ENOMEM;
        vdev->priv = fs;
 
-       virtio_cread(vdev, struct virtio_fs_config, num_queues,
-                    &fs->num_queues);
-       if (fs->num_queues == 0) {
-               ret = -EINVAL;
+       ret = virtio_fs_read_tag(vdev, fs);
+       if (ret < 0)
                goto out;
-       }
 
-       ret = virtio_fs_read_tag(vdev, fs);
+       ret = virtio_fs_setup_vqs(vdev, fs);
        if (ret < 0)
                goto out;
 
+       /* TODO vq affinity */
+       /* TODO populate notifications vq */
+
+       /* Bring the device online in case the filesystem is mounted and
+        * requests need to be sent before we return.
+        */
+       virtio_device_ready(vdev);
+
        ret = virtio_fs_add_instance(fs);
        if (ret < 0)
-               goto out;
+               goto out_vqs;
 
        return 0;
 
+out_vqs:
+       vdev->config->reset(vdev);
+       virtio_fs_cleanup_vqs(vdev, fs);
+
 out:
        vdev->priv = NULL;
        return ret;
@@ -148,6 +381,7 @@ static void virtio_fs_remove(struct virtio_device *vdev)
        virtio_fs_free_devs(fs);
 
        vdev->config->reset(vdev);
+       virtio_fs_cleanup_vqs(vdev, fs);
 
        mutex_lock(&virtio_fs_mutex);
        list_del(&fs->list);
@@ -190,6 +424,234 @@ static struct virtio_driver virtio_fs_driver = {
 #endif
 };
 
+static void virtio_fs_wake_forget_and_unlock(struct fuse_iqueue *fiq)
+__releases(fiq->waitq.lock)
+{
+       /* TODO */
+       spin_unlock(&fiq->waitq.lock);
+}
+
+static void virtio_fs_wake_interrupt_and_unlock(struct fuse_iqueue *fiq)
+__releases(fiq->waitq.lock)
+{
+       /* TODO */
+       spin_unlock(&fiq->waitq.lock);
+}
+
+/* Return the number of scatter-gather list elements required */
+static unsigned sg_count_fuse_req(struct fuse_req *req)
+{
+       unsigned total_sgs = 1 /* fuse_in_header */;
+
+       if (req->in.numargs - req->in.argpages)
+               total_sgs += 1;
+
+       if (req->in.argpages)
+               total_sgs += req->num_pages;
+
+       if (!test_bit(FR_ISREPLY, &req->flags))
+               return total_sgs;
+
+       total_sgs += 1 /* fuse_out_header */;
+
+       if (req->out.numargs - req->out.argpages)
+               total_sgs += 1;
+
+       if (req->out.argpages)
+               total_sgs += req->num_pages;
+
+       return total_sgs;
+}
+
+/* Add pages to scatter-gather list and return number of elements used */
+static unsigned sg_init_fuse_pages(struct scatterlist *sg,
+                                  struct page **pages,
+                                  struct fuse_page_desc *page_descs,
+                                  unsigned num_pages)
+{
+       unsigned i;
+
+       for (i = 0; i < num_pages; i++) {
+               sg_init_table(&sg[i], 1);
+               sg_set_page(&sg[i], pages[i],
+                           page_descs[i].length,
+                           page_descs[i].offset);
+       }
+
+       return i;
+}
+
+/* Add args to scatter-gather list and return number of elements used */
+static unsigned sg_init_fuse_args(struct scatterlist *sg,
+                                 struct fuse_req *req,
+                                 struct fuse_arg *args,
+                                 unsigned numargs,
+                                 bool argpages,
+                                 void *argbuf,
+                                 unsigned *len_used)
+{
+       unsigned total_sgs = 0;
+       unsigned len;
+
+       len = fuse_len_args(numargs - argpages, args);
+       if (len)
+               sg_init_one(&sg[total_sgs++], argbuf, len);
+
+       if (argpages)
+               total_sgs += sg_init_fuse_pages(&sg[total_sgs],
+                                               req->pages,
+                                               req->page_descs,
+                                               req->num_pages);
+
+       if (len_used)
+               *len_used = len;
+
+       return total_sgs;
+}
+
+/* Add a request to a virtqueue and kick the device */
+static int virtio_fs_enqueue_req(struct virtqueue *vq, struct fuse_req *req)
+{
+       struct scatterlist *stack_sgs[6 /* requests need at least 4 elements 
*/];
+       struct scatterlist stack_sg[ARRAY_SIZE(stack_sgs)];
+       struct scatterlist **sgs = stack_sgs;
+       struct scatterlist *sg = stack_sg;
+       struct fuse_pqueue *fpq;
+       unsigned argbuf_used = 0;
+       unsigned out_sgs = 0;
+       unsigned in_sgs = 0;
+       unsigned total_sgs;
+       unsigned i;
+       int ret;
+       bool notify;
+
+       /* Does the sglist fit on the stack? */
+       total_sgs = sg_count_fuse_req(req);
+       if (total_sgs > ARRAY_SIZE(stack_sgs)) {
+               sgs = kmalloc_array(total_sgs, sizeof(sgs[0]), GFP_ATOMIC);
+               sg = kmalloc_array(total_sgs, sizeof(sg[0]), GFP_ATOMIC);
+               if (!sgs || !sg) {
+                       ret = -ENOMEM;
+                       goto out;
+               }
+       }
+
+       /* Use a bounce buffer since stack args cannot be mapped */
+       ret = copy_args_to_argbuf(req);
+       if (ret < 0)
+               goto out;
+
+       /* Request elements */
+       sg_init_one(&sg[out_sgs++], &req->in.h, sizeof(req->in.h));
+       out_sgs += sg_init_fuse_args(&sg[out_sgs], req,
+                                    (struct fuse_arg *)req->in.args,
+                                    req->in.numargs, req->in.argpages,
+                                    req->argbuf, &argbuf_used);
+
+       /* Reply elements */
+       if (test_bit(FR_ISREPLY, &req->flags)) {
+               sg_init_one(&sg[out_sgs + in_sgs++],
+                           &req->out.h, sizeof(req->out.h));
+               in_sgs += sg_init_fuse_args(&sg[out_sgs + in_sgs], req,
+                                           req->out.args, req->out.numargs,
+                                           req->out.argpages,
+                                           req->argbuf + argbuf_used, NULL);
+       }
+
+       BUG_ON(out_sgs + in_sgs != total_sgs);
+
+       for (i = 0; i < total_sgs; i++)
+               sgs[i] = &sg[i];
+
+       fpq = vq_to_fpq(vq);
+       spin_lock(&fpq->lock);
+
+       ret = virtqueue_add_sgs(vq, sgs, out_sgs, in_sgs, req, GFP_ATOMIC);
+       if (ret < 0) {
+               /* TODO handle full virtqueue */
+               spin_unlock(&fpq->lock);
+               goto out;
+       }
+
+       notify = virtqueue_kick_prepare(vq);
+
+       spin_unlock(&fpq->lock);
+
+       if (notify)
+               virtqueue_notify(vq);
+
+out:
+       if (ret < 0 && req->argbuf) {
+               kfree(req->argbuf);
+               req->argbuf = NULL;
+       }
+       if (sgs != stack_sgs) {
+               kfree(sgs);
+               kfree(sg);
+       }
+
+       return ret;
+}
+
+static void virtio_fs_wake_pending_and_unlock(struct fuse_iqueue *fiq)
+__releases(fiq->waitq.lock)
+{
+       unsigned queue_id = 2; /* TODO multiqueue */
+       struct virtio_fs *fs;
+       struct fuse_conn *fc;
+       struct fuse_req *req;
+       struct fuse_pqueue *fpq;
+       int ret;
+
+       BUG_ON(list_empty(&fiq->pending));
+       req = list_last_entry(&fiq->pending, struct fuse_req, list);
+       clear_bit(FR_PENDING, &req->flags);
+       list_del_init(&req->list);
+       BUG_ON(!list_empty(&fiq->pending));
+       spin_unlock(&fiq->waitq.lock);
+
+       fs = fiq->priv;
+       fc = fs->vqs[queue_id].fud->fc;
+
+       dev_dbg(&fs->vqs[queue_id].vq->vdev->dev,
+               "%s: opcode %u unique %#llx nodeid %#llx in.len %u out.len 
%u\n",
+               __func__, req->in.h.opcode, req->in.h.unique, req->in.h.nodeid,
+               req->in.h.len, fuse_len_args(req->out.numargs, req->out.args));
+
+       /* TODO put request onto fpq->io list? */
+
+       fpq = &fs->vqs[queue_id].fud->pq;
+       spin_lock(&fpq->lock);
+       if (!fpq->connected) {
+               spin_unlock(&fpq->lock);
+               req->out.h.error = -ENODEV;
+               printk(KERN_ERR "%s: disconnected\n", __func__);
+/*             fuse_request_end(fc, req);  unsafe due to fc->lock */
+               return;
+       }
+       list_add_tail(&req->list, fpq->processing);
+       spin_unlock(&fpq->lock);
+       set_bit(FR_SENT, &req->flags);
+       /* matches barrier in request_wait_answer() */
+       smp_mb__after_atomic();
+       /* TODO check for FR_INTERRUPTED? */
+
+       ret = virtio_fs_enqueue_req(fs->vqs[queue_id].vq, req);
+       if (ret < 0) {
+               req->out.h.error = ret;
+               printk(KERN_ERR "%s: virtio_fs_enqueue_req failed %d\n",
+                       __func__, ret);
+/*             fuse_request_end(fc, req);  unsafe due to fc->lock */
+               return;
+       }
+}
+
+const static struct fuse_iqueue_ops virtio_fs_fiq_ops = {
+       .wake_forget_and_unlock         = virtio_fs_wake_forget_and_unlock,
+       .wake_interrupt_and_unlock      = virtio_fs_wake_interrupt_and_unlock,
+       .wake_pending_and_unlock        = virtio_fs_wake_pending_and_unlock,
+};
+
 static int virtio_fs_fill_super(struct super_block *sb, void *data,
                                int silent)
 {
@@ -220,30 +682,35 @@ static int virtio_fs_fill_super(struct super_block *sb, 
void *data,
        }
 
        /* TODO lock */
-       if (fs->fud) {
+       if (fs->vqs[2].fud) {
                printk(KERN_ERR "virtio-fs: device already in use\n");
                err = -EBUSY;
                goto err;
        }
-       fs->fud = kcalloc(fs->num_queues, sizeof(fs->fud[0]), GFP_KERNEL);
-       if (!fs->fud) {
-               err = -ENOMEM;
-               goto err_fud;
-       }
 
-       err = fuse_fill_super_common(sb, &d, (void **)&fs->fud[0]);
+       /* TODO this sends FUSE_INIT and could cause hiprio or notifications
+        * virtqueue races since they haven't been set up yet!
+        */
+       err = fuse_fill_super_common(sb, &d, &virtio_fs_fiq_ops, fs,
+                                    (void **)&fs->vqs[2].fud);
        if (err < 0)
                goto err_fud;
 
-       fc = fs->fud[0]->fc;
+       fc = fs->vqs[2].fud->fc;
 
-       /* Allocate remaining fuse_devs */
        err = -ENOMEM;
        /* TODO take fuse_mutex around this loop? */
-       for (i = 1; i < fs->num_queues; i++) {
-               fs->fud[i] = fuse_dev_alloc(fc);
-               if (!fs->fud[i]) {
+       for (i = 0; i < fs->nvqs; i++) {
+               struct virtio_fs_vq *fsvq = &fs->vqs[i];
+
+               if (i == 2)
+                       continue; /* already initialized */
+
+               fsvq->fud = fuse_dev_alloc(fc);
+               if (!fsvq->fud) {
                        /* TODO */
+                       printk(KERN_ERR "%s: fuse_dev_alloc failed\n",
+                              __func__);
                }
                atomic_inc(&fc->dev_count);
        }
-- 
2.13.6

Reply via email to