Overhead of mapping rpc request is pretty high. And in cases when vstorage-mount even loop is saturated it makes sense to create shadow kernel thread which mm/files shared with user space.
This is one series of patches (others are in user space), which increase raid read iops more than twice. It is en/disabled with module parameter "pcs_krpc_use_thread", which can be tuned at run time. Additionally, the patch fixes some old bugs of various fatality found during development and testing: buffer overflow and wrong error code returned. This part is the reason we push the patch to release, it is too intertangled with new request processing to be considered standalone. The patch is combo of two patches, merged together because the second one moves chunks of code around and it does not make sense to know about these bowel movent. Comment from the second patch: fs/fuse kio: properly return errors from sendmsg over kRPC Unifies the return of errors in sendmsg by passing the error as a result of krpc request completion. The error will then by returned to userpsace in recvmsg. However a linux error will be returned as return value of ioctl call, whereas a pcs error will be returned as the result of recvmsg. Signed-off-by: Alexey Kuznetsov <kuz...@virtuozzo.com> Signed-off-by: Liu Kui <kui....@virtuozzo.com> --- fs/fuse/kio/pcs/pcs_krpc.c | 145 ++++++++++++++++++++++++++++++++++++++------- fs/fuse/kio/pcs/pcs_krpc.h | 7 ++- 2 files changed, 130 insertions(+), 22 deletions(-) diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c index 323205a..a0e0799 100644 --- a/fs/fuse/kio/pcs/pcs_krpc.c +++ b/fs/fuse/kio/pcs/pcs_krpc.c @@ -10,6 +10,7 @@ #include <linux/file.h> #include <linux/anon_inodes.h> #include <linux/delay.h> +#include <linux/fdtable.h> #include <linux/module.h> #include "pcs_types.h" @@ -24,6 +25,10 @@ module_param(pcs_krpc_csaccel, uint, 0644); MODULE_PARM_DESC(pcs_krpc_csaccel, "Enable krpc local cs bypass"); +unsigned int pcs_krpc_use_thread = 1; +module_param(pcs_krpc_use_thread, uint, 0644); +MODULE_PARM_DESC(pcs_krpc_use_thread, "Offload creating the request to a thread"); + extern unsigned int pcs_krpc_version; struct kmem_cache *krpc_req_cachep; @@ -82,7 +87,8 @@ static void krpc_req_complete(struct krpc_req *kreq, int error) comp->result = error; kreq_release_data_chunks(kreq); - pcs_mr_put(kreq->hdr_chunk.mr); + if (kreq->hdr_chunk.mr) + pcs_mr_put(kreq->hdr_chunk.mr); spin_lock(&krpc->lock); list_del(&kreq->link); @@ -292,7 +298,10 @@ static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_re if (copy_to_user((void __user *)iocmsg->buf.addr, comp->_data_buf, comp->data_len)) res = -EFAULT; } else { /* response */ - iocmsg->result = comp->result; + if (comp->result >= 0) + iocmsg->result = comp->result; + else + res = comp->result; /* internal error */ } krpc_completion_free(comp); @@ -449,25 +458,23 @@ static int try_local_bypass(struct pcs_krpc *krpc, struct krpc_req *kreq) return 0; } -static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg) +static int kreq_make_sendmsg(struct krpc_req *kreq) { - struct krpc_req *kreq; + struct pcs_krpc *krpc = kreq->krpc; struct pcs_msg *msg; struct pcs_krpc_buf_desc *chunk_bd; struct krpc_chunk *chunk; int res, i; struct bio_vec *bvec; + struct pcs_krpc_ioc_sendmsg *iocmsg; - kreq = krpc_req_alloc(); - if (!kreq) - return -ENOMEM; + iocmsg = &kreq->iocmsg; if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) { - kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO); - if (!kreq->data_chunks) { - res = -ENOMEM; - goto err_free_kreq; - } + kreq->data_chunks = kcalloc(iocmsg->nr_data_chunks, sizeof(struct krpc_chunk), + GFP_NOIO); + if (!kreq->data_chunks) + return -ENOMEM; } else { kreq->data_chunks = &kreq->inline_data_chunks[0]; } @@ -536,7 +543,7 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se chunk->addr = chunk_bdzc->offset; chunk->req = fuse_dev_find_request(chunk_bdzc->devfd, chunk_bdzc->unique); if (!chunk->req || chunk->req->args->killed) { - res = PCS_ERR_NET; + res = PCS_ERR_INV_PARAMS; goto err_free_data_chunk; } break; @@ -580,7 +587,6 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se goto err_free_data_chunk; } atomic_inc(&krpc->iocount); - kreq->krpc = pcs_krpc_get(krpc); list_add_tail(&kreq->link, &krpc->pending_queue); spin_unlock(&krpc->lock); @@ -600,12 +606,93 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se kreq_release_data_chunks(kreq); if (kreq->hdr_chunk.mr) pcs_mr_put(kreq->hdr_chunk.mr); - -err_free_kreq: - krpc_req_free(kreq); return res; } +static void kreq_submit(struct krpc_req *kreq) +{ + int res; + + res = kreq_make_sendmsg(kreq); + if (res) { + kreq->data_chunks = &kreq->inline_data_chunks[0]; + kreq->data_len = 0; + kreq->nr_data_chunks = 0; + kreq->nr_data_bvecs = 0; + kreq->hdr_chunk.mr = NULL; + kreq->completion.xid = kreq->iocmsg.xid; + kreq->completion.private = kreq; + INIT_LIST_HEAD(&kreq->link); + + krpc_req_complete(kreq, res); + } +} + +static int krpc_threadfn(void *data) +{ + struct pcs_krpc_set *krpcs = data; + + for (;;) { + struct llist_node *ll; + + set_current_state(TASK_INTERRUPTIBLE); + + ll = llist_del_all(&krpcs->req_llist); + + if (ll == NULL) { + if (kthread_should_stop()) { + __set_current_state(TASK_RUNNING); + return 0; + } + schedule(); + continue; + } + + __set_current_state(TASK_RUNNING); + + while (ll) { + struct llist_node *next = ll->next; + struct krpc_req *kreq = container_of(ll, struct krpc_req, llist_link); + + kreq_submit(kreq); + + ll = next; + } + } +} + +static int pcs_krpc_ioctl_send_msg(struct krpc_req *kreq) +{ + struct task_struct *tsk; + struct pcs_cluster_core *cc; + + if (pcs_krpc_use_thread) { + cc = container_of(kreq->krpc->krpcs, struct pcs_cluster_core, krpcs); + tsk = cc->krpcs.krpc_task; + if (unlikely(tsk == NULL)) { + tsk = kthread_create(krpc_threadfn, &cc->krpcs, "krpc_send"); + if (tsk && !IS_ERR(tsk)) { + cc->krpcs.krpc_task = get_task_struct(tsk); + mmget(current->mm); + tsk->mm = current->mm; + tsk->active_mm = current->mm; + atomic_inc(¤t->files->count); + tsk->files = current->files; + } + } + + if (likely(tsk)) { + llist_add(&kreq->llist_link, &kreq->krpc->krpcs->req_llist); + wake_up_process(tsk); + return 0; + } + } + + kreq_submit(kreq); + + return 0; +} + static int pcs_krpc_abort(struct pcs_krpc *krpc) { struct krpc_req *kreq, *tmp; @@ -733,12 +820,23 @@ static long pcs_krpc_ioctl(struct file *file, unsigned int cmd, unsigned long ar switch (cmd) { case PCS_KRPC_IOC_SEND_MSG: { - struct pcs_krpc_ioc_sendmsg req; + struct krpc_req *kreq; - if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + kreq = krpc_req_alloc(); + if (!kreq) + return -ENOMEM; + + if (copy_from_user(&kreq->iocmsg, (void __user *)arg, sizeof(kreq->iocmsg))) { + krpc_req_free(kreq); return -EFAULT; + } - res = pcs_krpc_ioctl_send_msg(krpc, &req); + kreq->krpc = pcs_krpc_get(krpc); + res = pcs_krpc_ioctl_send_msg(kreq); + if (res) { + pcs_krpc_put(krpc); + krpc_req_free(kreq); + } break; } case PCS_KRPC_IOC_RECV_MSG: { @@ -1068,7 +1166,8 @@ void pcs_krpcset_init(struct pcs_krpc_set *krpcs) INIT_LIST_HEAD(&krpcs->list); krpcs->nkrpc = 0; - + krpcs->krpc_task = NULL; + init_llist_head(&krpcs->req_llist); spin_lock_init(&krpcs->lock); } @@ -1094,6 +1193,10 @@ void pcs_krpcset_fini(struct pcs_krpc_set *krpcs) } spin_unlock(&krpcs->lock); + if (krpcs->krpc_task) { + kthread_stop(krpcs->krpc_task); + put_task_struct(krpcs->krpc_task); + } BUG_ON(!list_empty(&krpcs->list)); BUG_ON(krpcs->nkrpc != 0); } diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h index c6b867b..8021b02 100644 --- a/fs/fuse/kio/pcs/pcs_krpc.h +++ b/fs/fuse/kio/pcs/pcs_krpc.h @@ -36,7 +36,9 @@ struct pcs_krpc_set { struct list_head list; unsigned int nkrpc; - spinlock_t lock; + spinlock_t lock; + struct task_struct *krpc_task; + struct llist_head req_llist; }; enum { @@ -127,6 +129,9 @@ struct krpc_req { struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES]; struct krpc_completion completion; + + struct llist_node llist_link; + struct pcs_krpc_ioc_sendmsg iocmsg; }; static inline u32 pcs_krpc_msg_size(u32 size, u8 flags) -- 1.8.3.1 _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel