Overhead of mapping rpc request is pretty high. And in cases
when vstorage-mount even loop is saturated it makes sense
to create shadow kernel thread which mm/files shared with
user space.

This is one series of patches (others are in user space),
which increase raid read iops more than twice.

It is en/disabled with module parameter "pcs_krpc_use_thread",
which can be tuned at run time.

Additionally, the patch fixes some old bugs of various fatality
found during development and testing: buffer overflow and wrong error
code returned. This part is the reason we push the patch to release,
it is too intertangled with new request processing to be considered
standalone.

The patch is combo of two patches, merged together because the second
one moves chunks of code around and it does not make sense to know about
these bowel movent. Comment from the second patch:

fs/fuse kio: properly return errors from sendmsg over kRPC

Unifies the return of errors in sendmsg by passing the error as a
result of krpc request completion. The error will then by returned
to userpsace in recvmsg. However a linux error will be returned
as return value of ioctl call, whereas a pcs error will be returned
as the result of recvmsg.

Signed-off-by: Alexey Kuznetsov <kuz...@virtuozzo.com>
Signed-off-by: Liu Kui <kui....@virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_krpc.c | 145 ++++++++++++++++++++++++++++++++++++++-------
 fs/fuse/kio/pcs/pcs_krpc.h |   7 ++-
 2 files changed, 130 insertions(+), 22 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index 323205a..a0e0799 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -10,6 +10,7 @@
 #include <linux/file.h>
 #include <linux/anon_inodes.h>
 #include <linux/delay.h>
+#include <linux/fdtable.h>
 #include <linux/module.h>
 
 #include "pcs_types.h"
@@ -24,6 +25,10 @@
 module_param(pcs_krpc_csaccel, uint, 0644);
 MODULE_PARM_DESC(pcs_krpc_csaccel, "Enable krpc local cs bypass");
 
+unsigned int pcs_krpc_use_thread = 1;
+module_param(pcs_krpc_use_thread, uint, 0644);
+MODULE_PARM_DESC(pcs_krpc_use_thread, "Offload creating the request to a 
thread");
+
 extern unsigned int pcs_krpc_version;
 
 struct kmem_cache *krpc_req_cachep;
@@ -82,7 +87,8 @@ static void krpc_req_complete(struct krpc_req *kreq, int 
error)
        comp->result = error;
 
        kreq_release_data_chunks(kreq);
-       pcs_mr_put(kreq->hdr_chunk.mr);
+       if (kreq->hdr_chunk.mr)
+               pcs_mr_put(kreq->hdr_chunk.mr);
 
        spin_lock(&krpc->lock);
        list_del(&kreq->link);
@@ -292,7 +298,10 @@ static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_re
                if (copy_to_user((void __user *)iocmsg->buf.addr, 
comp->_data_buf, comp->data_len))
                        res = -EFAULT;
        } else { /* response */
-               iocmsg->result = comp->result;
+               if (comp->result >= 0)
+                       iocmsg->result = comp->result;
+               else
+                       res = comp->result; /* internal error */
        }
 
        krpc_completion_free(comp);
@@ -449,25 +458,23 @@ static int try_local_bypass(struct pcs_krpc *krpc, struct 
krpc_req *kreq)
        return 0;
 }
 
-static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct 
pcs_krpc_ioc_sendmsg *iocmsg)
+static int kreq_make_sendmsg(struct krpc_req *kreq)
 {
-       struct krpc_req *kreq;
+       struct pcs_krpc *krpc = kreq->krpc;
        struct pcs_msg *msg;
        struct pcs_krpc_buf_desc *chunk_bd;
        struct krpc_chunk *chunk;
        int res, i;
        struct bio_vec *bvec;
+       struct pcs_krpc_ioc_sendmsg *iocmsg;
 
-       kreq = krpc_req_alloc();
-       if (!kreq)
-               return -ENOMEM;
+       iocmsg = &kreq->iocmsg;
 
        if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) {
-               kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO);
-               if (!kreq->data_chunks) {
-                       res = -ENOMEM;
-                       goto err_free_kreq;
-               }
+               kreq->data_chunks = kcalloc(iocmsg->nr_data_chunks, 
sizeof(struct krpc_chunk),
+                                 GFP_NOIO);
+               if (!kreq->data_chunks)
+                       return -ENOMEM;
        } else {
                kreq->data_chunks = &kreq->inline_data_chunks[0];
        }
@@ -536,7 +543,7 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_se
                        chunk->addr = chunk_bdzc->offset;
                        chunk->req = fuse_dev_find_request(chunk_bdzc->devfd, 
chunk_bdzc->unique);
                        if (!chunk->req || chunk->req->args->killed) {
-                               res = PCS_ERR_NET;
+                               res = PCS_ERR_INV_PARAMS;
                                goto err_free_data_chunk;
                        }
                        break;
@@ -580,7 +587,6 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_se
                goto err_free_data_chunk;
        }
        atomic_inc(&krpc->iocount);
-       kreq->krpc = pcs_krpc_get(krpc);
        list_add_tail(&kreq->link, &krpc->pending_queue);
        spin_unlock(&krpc->lock);
 
@@ -600,12 +606,93 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_se
        kreq_release_data_chunks(kreq);
        if (kreq->hdr_chunk.mr)
                pcs_mr_put(kreq->hdr_chunk.mr);
-
-err_free_kreq:
-       krpc_req_free(kreq);
        return res;
 }
 
+static void kreq_submit(struct krpc_req *kreq)
+{
+       int res;
+
+       res = kreq_make_sendmsg(kreq);
+       if (res) {
+               kreq->data_chunks = &kreq->inline_data_chunks[0];
+               kreq->data_len = 0;
+               kreq->nr_data_chunks = 0;
+               kreq->nr_data_bvecs = 0;
+               kreq->hdr_chunk.mr = NULL;
+               kreq->completion.xid = kreq->iocmsg.xid;
+               kreq->completion.private = kreq;
+               INIT_LIST_HEAD(&kreq->link);
+
+               krpc_req_complete(kreq, res);
+       }
+}
+
+static int krpc_threadfn(void *data)
+{
+       struct pcs_krpc_set *krpcs = data;
+
+       for (;;) {
+               struct llist_node *ll;
+
+               set_current_state(TASK_INTERRUPTIBLE);
+
+               ll = llist_del_all(&krpcs->req_llist);
+
+               if (ll == NULL) {
+                       if (kthread_should_stop()) {
+                               __set_current_state(TASK_RUNNING);
+                               return 0;
+                       }
+                       schedule();
+                       continue;
+               }
+
+               __set_current_state(TASK_RUNNING);
+
+               while (ll) {
+                       struct llist_node *next = ll->next;
+                       struct krpc_req *kreq = container_of(ll, struct 
krpc_req, llist_link);
+
+                       kreq_submit(kreq);
+
+                       ll = next;
+               }
+       }
+}
+
+static int pcs_krpc_ioctl_send_msg(struct krpc_req *kreq)
+{
+       struct task_struct *tsk;
+       struct pcs_cluster_core *cc;
+
+       if (pcs_krpc_use_thread) {
+               cc = container_of(kreq->krpc->krpcs, struct pcs_cluster_core, 
krpcs);
+               tsk = cc->krpcs.krpc_task;
+               if (unlikely(tsk == NULL)) {
+                       tsk = kthread_create(krpc_threadfn, &cc->krpcs, 
"krpc_send");
+                       if (tsk && !IS_ERR(tsk)) {
+                               cc->krpcs.krpc_task = get_task_struct(tsk);
+                               mmget(current->mm);
+                               tsk->mm = current->mm;
+                               tsk->active_mm = current->mm;
+                               atomic_inc(&current->files->count);
+                               tsk->files = current->files;
+                       }
+               }
+
+               if (likely(tsk)) {
+                       llist_add(&kreq->llist_link, 
&kreq->krpc->krpcs->req_llist);
+                       wake_up_process(tsk);
+                       return 0;
+               }
+       }
+
+       kreq_submit(kreq);
+
+       return 0;
+}
+
 static int pcs_krpc_abort(struct pcs_krpc *krpc)
 {
        struct krpc_req *kreq, *tmp;
@@ -733,12 +820,23 @@ static long pcs_krpc_ioctl(struct file *file, unsigned 
int cmd, unsigned long ar
 
        switch (cmd) {
        case PCS_KRPC_IOC_SEND_MSG: {
-               struct pcs_krpc_ioc_sendmsg req;
+               struct krpc_req *kreq;
 
-               if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+               kreq = krpc_req_alloc();
+               if (!kreq)
+                       return -ENOMEM;
+
+               if (copy_from_user(&kreq->iocmsg, (void __user *)arg, 
sizeof(kreq->iocmsg))) {
+                       krpc_req_free(kreq);
                        return -EFAULT;
+               }
 
-               res = pcs_krpc_ioctl_send_msg(krpc, &req);
+               kreq->krpc = pcs_krpc_get(krpc);
+               res = pcs_krpc_ioctl_send_msg(kreq);
+               if (res) {
+                       pcs_krpc_put(krpc);
+                       krpc_req_free(kreq);
+               }
                break;
        }
        case PCS_KRPC_IOC_RECV_MSG: {
@@ -1068,7 +1166,8 @@ void pcs_krpcset_init(struct pcs_krpc_set *krpcs)
 
        INIT_LIST_HEAD(&krpcs->list);
        krpcs->nkrpc = 0;
-
+       krpcs->krpc_task = NULL;
+       init_llist_head(&krpcs->req_llist);
        spin_lock_init(&krpcs->lock);
 }
 
@@ -1094,6 +1193,10 @@ void pcs_krpcset_fini(struct pcs_krpc_set *krpcs)
        }
        spin_unlock(&krpcs->lock);
 
+       if (krpcs->krpc_task) {
+               kthread_stop(krpcs->krpc_task);
+               put_task_struct(krpcs->krpc_task);
+       }
        BUG_ON(!list_empty(&krpcs->list));
        BUG_ON(krpcs->nkrpc != 0);
 }
diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
index c6b867b..8021b02 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.h
+++ b/fs/fuse/kio/pcs/pcs_krpc.h
@@ -36,7 +36,9 @@ struct pcs_krpc_set {
        struct list_head                list;
        unsigned int                    nkrpc;
 
-       spinlock_t                              lock;
+       spinlock_t                      lock;
+       struct task_struct              *krpc_task;
+       struct llist_head               req_llist;
 };
 
 enum {
@@ -127,6 +129,9 @@ struct krpc_req {
        struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES];
 
        struct krpc_completion completion;
+
+       struct llist_node           llist_link;
+       struct pcs_krpc_ioc_sendmsg iocmsg;
 };
 
 static inline u32 pcs_krpc_msg_size(u32 size, u8 flags)
-- 
1.8.3.1

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to