The commit is pushed to "branch-rh9-5.14.0-427.44.1.vz9.80.x-ovz" and will 
appear at g...@bitbucket.org:openvz/vzkernel.git
after rh9-5.14.0-427.44.1.vz9.80.23
------>
commit 55d45425282dd8e1b1a126fe5ebcc2b7c0177c5d
Author: Alexey Kuznetsov <kuz...@virtuozzo.com>
Date:   Fri Mar 28 20:01:08 2025 +0800

    fs/fuse/kio: create krpc request in special thread
    
    Overhead of mapping rpc request is pretty high. And in cases
    when vstorage-mount even loop is saturated it makes sense
    to create shadow kernel thread which mm/files shared with
    user space.
    
    This is one series of patches (others are in user space),
    which increase raid read iops more than twice.
    
    It is en/disabled with module parameter "pcs_krpc_use_thread",
    which can be tuned at run time.
    
    Additionally, the patch fixes some old bugs of various fatality
    found during development and testing: buffer overflow and wrong error
    code returned. This part is the reason we push the patch to release,
    it is too intertangled with new request processing to be considered
    standalone.
    
    The patch is combo of two patches, merged together because the second
    one moves chunks of code around and it does not make sense to know about
    these bowel movent. Comment from the second patch:
    
    fs/fuse kio: properly return errors from sendmsg over kRPC
    
    Unifies the return of errors in sendmsg by passing the error as a
    result of krpc request completion. The error will then by returned
    to userpsace in recvmsg. However a linux error will be returned
    as return value of ioctl call, whereas a pcs error will be returned
    as the result of recvmsg.
    
    Signed-off-by: Alexey Kuznetsov <kuz...@virtuozzo.com>
    Signed-off-by: Liu Kui <kui....@virtuozzo.com>
    
    Feature: vStorage
---
 fs/fuse/kio/pcs/pcs_krpc.c | 145 ++++++++++++++++++++++++++++++++++++++-------
 fs/fuse/kio/pcs/pcs_krpc.h |   7 ++-
 2 files changed, 130 insertions(+), 22 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index 323205a3e2df..58a9ceebfee2 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -10,6 +10,7 @@
 #include <linux/file.h>
 #include <linux/anon_inodes.h>
 #include <linux/delay.h>
+#include <linux/fdtable.h>
 #include <linux/module.h>
 
 #include "pcs_types.h"
@@ -24,6 +25,10 @@ unsigned int pcs_krpc_csaccel;
 module_param(pcs_krpc_csaccel, uint, 0644);
 MODULE_PARM_DESC(pcs_krpc_csaccel, "Enable krpc local cs bypass");
 
+unsigned int pcs_krpc_use_thread = 1;
+module_param(pcs_krpc_use_thread, uint, 0644);
+MODULE_PARM_DESC(pcs_krpc_use_thread, "Offload creating the request to a 
thread");
+
 extern unsigned int pcs_krpc_version;
 
 struct kmem_cache *krpc_req_cachep;
@@ -82,7 +87,8 @@ static void krpc_req_complete(struct krpc_req *kreq, int 
error)
        comp->result = error;
 
        kreq_release_data_chunks(kreq);
-       pcs_mr_put(kreq->hdr_chunk.mr);
+       if (kreq->hdr_chunk.mr)
+               pcs_mr_put(kreq->hdr_chunk.mr);
 
        spin_lock(&krpc->lock);
        list_del(&kreq->link);
@@ -292,7 +298,10 @@ static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_re
                if (copy_to_user((void __user *)iocmsg->buf.addr, 
comp->_data_buf, comp->data_len))
                        res = -EFAULT;
        } else { /* response */
-               iocmsg->result = comp->result;
+               if (comp->result >= 0)
+                       iocmsg->result = comp->result;
+               else
+                       res = comp->result; /* internal error */
        }
 
        krpc_completion_free(comp);
@@ -449,25 +458,23 @@ static int try_local_bypass(struct pcs_krpc *krpc, struct 
krpc_req *kreq)
        return 0;
 }
 
-static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct 
pcs_krpc_ioc_sendmsg *iocmsg)
+static int kreq_make_sendmsg(struct krpc_req *kreq)
 {
-       struct krpc_req *kreq;
+       struct pcs_krpc *krpc = kreq->krpc;
        struct pcs_msg *msg;
        struct pcs_krpc_buf_desc *chunk_bd;
        struct krpc_chunk *chunk;
        int res, i;
        struct bio_vec *bvec;
+       struct pcs_krpc_ioc_sendmsg *iocmsg;
 
-       kreq = krpc_req_alloc();
-       if (!kreq)
-               return -ENOMEM;
+       iocmsg = &kreq->iocmsg;
 
        if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) {
-               kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO);
-               if (!kreq->data_chunks) {
-                       res = -ENOMEM;
-                       goto err_free_kreq;
-               }
+               kreq->data_chunks = kcalloc(iocmsg->nr_data_chunks, 
sizeof(struct krpc_chunk),
+                                 GFP_NOIO);
+               if (!kreq->data_chunks)
+                       return -ENOMEM;
        } else {
                kreq->data_chunks = &kreq->inline_data_chunks[0];
        }
@@ -536,7 +543,7 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_se
                        chunk->addr = chunk_bdzc->offset;
                        chunk->req = fuse_dev_find_request(chunk_bdzc->devfd, 
chunk_bdzc->unique);
                        if (!chunk->req || chunk->req->args->killed) {
-                               res = PCS_ERR_NET;
+                               res = PCS_ERR_INV_PARAMS;
                                goto err_free_data_chunk;
                        }
                        break;
@@ -580,7 +587,6 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_se
                goto err_free_data_chunk;
        }
        atomic_inc(&krpc->iocount);
-       kreq->krpc = pcs_krpc_get(krpc);
        list_add_tail(&kreq->link, &krpc->pending_queue);
        spin_unlock(&krpc->lock);
 
@@ -600,12 +606,93 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, 
struct pcs_krpc_ioc_se
        kreq_release_data_chunks(kreq);
        if (kreq->hdr_chunk.mr)
                pcs_mr_put(kreq->hdr_chunk.mr);
-
-err_free_kreq:
-       krpc_req_free(kreq);
        return res;
 }
 
+static void kreq_submit(struct krpc_req *kreq)
+{
+       int res;
+
+       res = kreq_make_sendmsg(kreq);
+       if (res) {
+               kreq->data_chunks = &kreq->inline_data_chunks[0];
+               kreq->data_len = 0;
+               kreq->nr_data_chunks = 0;
+               kreq->nr_data_bvecs = 0;
+               kreq->hdr_chunk.mr = NULL;
+               kreq->completion.xid = kreq->iocmsg.xid;
+               kreq->completion.private = kreq;
+               INIT_LIST_HEAD(&kreq->link);
+
+               krpc_req_complete(kreq, res);
+       }
+}
+
+static int krpc_threadfn(void *data)
+{
+       struct pcs_krpc_set *krpcs = data;
+
+       for (;;) {
+               struct llist_node *ll;
+
+               set_current_state(TASK_INTERRUPTIBLE);
+
+               ll = llist_del_all(&krpcs->req_llist);
+
+               if (ll == NULL) {
+                       if (kthread_should_stop()) {
+                               __set_current_state(TASK_RUNNING);
+                               return 0;
+                       }
+                       schedule();
+                       continue;
+               }
+
+               __set_current_state(TASK_RUNNING);
+
+               while (ll) {
+                       struct llist_node *next = ll->next;
+                       struct krpc_req *kreq = container_of(ll, struct 
krpc_req, llist_link);
+
+                       kreq_submit(kreq);
+
+                       ll = next;
+               }
+       }
+}
+
+static int pcs_krpc_ioctl_send_msg(struct krpc_req *kreq)
+{
+       struct task_struct *tsk;
+       struct pcs_cluster_core *cc;
+
+       if (pcs_krpc_use_thread) {
+               cc = container_of(kreq->krpc->krpcs, struct pcs_cluster_core, 
krpcs);
+               tsk = cc->krpcs.krpc_task;
+               if (unlikely(tsk == NULL)) {
+                       tsk = kthread_create(krpc_threadfn, &cc->krpcs, 
"krpc_send");
+                       if (tsk && !IS_ERR(tsk)) {
+                               cc->krpcs.krpc_task = get_task_struct(tsk);
+                               mmget(current->mm);
+                               tsk->mm = current->mm;
+                               tsk->active_mm = current->mm;
+                               atomic_inc(&current->files->count);
+                               tsk->files = current->files;
+                       }
+               }
+
+               if (likely(tsk)) {
+                       llist_add(&kreq->llist_link, 
&kreq->krpc->krpcs->req_llist);
+                       wake_up_process(tsk);
+                       return 0;
+               }
+       }
+
+       kreq_submit(kreq);
+
+       return 0;
+}
+
 static int pcs_krpc_abort(struct pcs_krpc *krpc)
 {
        struct krpc_req *kreq, *tmp;
@@ -733,12 +820,23 @@ static long pcs_krpc_ioctl(struct file *file, unsigned 
int cmd, unsigned long ar
 
        switch (cmd) {
        case PCS_KRPC_IOC_SEND_MSG: {
-               struct pcs_krpc_ioc_sendmsg req;
+               struct krpc_req *kreq;
 
-               if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+               kreq = krpc_req_alloc();
+               if (!kreq)
+                       return -ENOMEM;
+
+               if (copy_from_user(&kreq->iocmsg, (void __user *)arg, 
sizeof(kreq->iocmsg))) {
+                       krpc_req_free(kreq);
                        return -EFAULT;
+               }
 
-               res = pcs_krpc_ioctl_send_msg(krpc, &req);
+               kreq->krpc = pcs_krpc_get(krpc);
+               res = pcs_krpc_ioctl_send_msg(kreq);
+               if (res) {
+                       pcs_krpc_put(krpc);
+                       krpc_req_free(kreq);
+               }
                break;
        }
        case PCS_KRPC_IOC_RECV_MSG: {
@@ -1068,7 +1166,8 @@ void pcs_krpcset_init(struct pcs_krpc_set *krpcs)
 
        INIT_LIST_HEAD(&krpcs->list);
        krpcs->nkrpc = 0;
-
+       krpcs->krpc_task = NULL;
+       init_llist_head(&krpcs->req_llist);
        spin_lock_init(&krpcs->lock);
 }
 
@@ -1094,6 +1193,10 @@ void pcs_krpcset_fini(struct pcs_krpc_set *krpcs)
        }
        spin_unlock(&krpcs->lock);
 
+       if (krpcs->krpc_task) {
+               kthread_stop(krpcs->krpc_task);
+               put_task_struct(krpcs->krpc_task);
+       }
        BUG_ON(!list_empty(&krpcs->list));
        BUG_ON(krpcs->nkrpc != 0);
 }
diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
index c6b867b5fa75..8021b0262560 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.h
+++ b/fs/fuse/kio/pcs/pcs_krpc.h
@@ -36,7 +36,9 @@ struct pcs_krpc_set {
        struct list_head                list;
        unsigned int                    nkrpc;
 
-       spinlock_t                              lock;
+       spinlock_t                      lock;
+       struct task_struct              *krpc_task;
+       struct llist_head               req_llist;
 };
 
 enum {
@@ -127,6 +129,9 @@ struct krpc_req {
        struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES];
 
        struct krpc_completion completion;
+
+       struct llist_node           llist_link;
+       struct pcs_krpc_ioc_sendmsg iocmsg;
 };
 
 static inline u32 pcs_krpc_msg_size(u32 size, u8 flags)
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to