Currently there are 2 connections for every RPC, one in userspace,
one in kernel. This wastes a lot of resources on client hosts from
a huge cluster. It's therefore desirable to eliminate the connection
in userspace by using kernel RPC directly from userspace.

The newly implemented pcs_krpc serves as the glue layer between
userspace RPC and kernel RPC. It provides execution context for
forwarding messages from/to userspace RPC to/from kernel RPC.
It also providee APIs for userspace to send/receive RPC messages.

https://pmc.acronis.work/browse/VSTOR-82613

Signed-off-by: Liu Kui <kui....@virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_krpc.c      | 817 ++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_krpc.h      | 144 ++++++
 fs/fuse/kio/pcs/pcs_krpc_prot.h |  44 ++
 3 files changed, 1005 insertions(+)
 create mode 100644 fs/fuse/kio/pcs/pcs_krpc.c
 create mode 100644 fs/fuse/kio/pcs/pcs_krpc.h
 create mode 100644 fs/fuse/kio/pcs/pcs_krpc_prot.h

diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
new file mode 100644
index 000000000000..567d8a0c884f
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -0,0 +1,817 @@
+/*
+ * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
+ */
+
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/rbtree.h>
+#include <linux/refcount.h>
+#include <linux/file.h>
+#include <linux/anon_inodes.h>
+
+#include "pcs_types.h"
+#include "pcs_cluster.h"
+#include "pcs_cs_prot.h"
+#include "pcs_req.h"
+#include "pcs_krpc.h"
+
+struct kmem_cache *krpc_req_cachep;
+
+static int krpc_completion_post(struct pcs_krpc *krpc, struct krpc_completion 
*comp)
+__releases(krpc->lock)
+{
+       /* post complete only when connected */
+       if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
+               spin_unlock(&krpc->lock);
+               return -1;
+       }
+       list_add_tail(&comp->link, &krpc->completion_queue);
+       krpc->nr_completion++;
+       spin_unlock(&krpc->lock);
+
+       wake_up_poll(&krpc->poll_wait, EPOLLIN);
+       return 0;
+}
+
+static struct krpc_req *krpc_req_alloc(void)
+{
+       struct krpc_req *kreq;
+
+       kreq = kmem_cache_alloc(krpc_req_cachep, GFP_NOIO);
+       if (!kreq)
+               return NULL;
+
+       spin_lock_init(&kreq->lock);
+       kreq->flags = 0;
+       memset(&kreq->completion, 0, sizeof(struct krpc_completion));
+
+       return kreq;
+}
+
+static void krpc_req_free(struct krpc_req *kreq)
+{
+       kmem_cache_free(krpc_req_cachep, kreq);
+}
+
+static void krpc_completion_free(struct krpc_completion *comp)
+{
+       if (comp->private)
+               krpc_req_free((struct krpc_req *)comp->private);
+       else
+               kfree(comp);
+}
+
+static void krpc_req_complete(struct krpc_req *kreq, int error)
+{
+       struct krpc_completion *comp = &kreq->completion;
+       struct pcs_krpc *krpc = kreq->krpc;
+       int i;
+
+       BUG_ON(!comp->xid);
+
+       comp->result = error;
+
+       pcs_mr_put(kreq->hdr_chunk.mr);
+
+       for (i = 0; i < kreq->nr_data_chunks; i++) {
+               struct krpc_chunk *chunk;
+
+               chunk = &kreq->data_chunks[i];
+               if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
+                       pcs_umem_release(chunk->umem);
+               else
+                       pcs_mr_put(chunk->mr);
+       }
+
+       if (kreq->data_chunks != &kreq->inline_data_chunks[0])
+               kfree(kreq->data_chunks);
+
+       spin_lock(&krpc->lock);
+       list_del(&kreq->link);
+
+       if (kreq->flags & KRPC_REQ_F_ABORTED) {
+               krpc_req_free(kreq);
+               spin_unlock(&krpc->lock);
+       } else if (krpc_completion_post(krpc, comp)) {
+               krpc_req_free(kreq);
+       }
+
+       pcs_krpc_put(krpc);
+}
+
+static void krpc_msg_get_response_iter(struct pcs_msg *msg, int offset,
+                       struct iov_iter *it, unsigned int direction)
+{
+       struct pcs_msg *req = msg->private;
+       struct krpc_req *kreq = req->private2;
+
+       if (!(kreq->flags & KRPC_REQ_F_RESP_BUFF)) {
+               /* No data payload */
+               BUG_ON(msg->size > PAGE_SIZE);
+
+               kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf;
+               kreq->hdr_kv.iov_len = msg->size;
+
+               iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, msg->size);
+               iov_iter_advance(it, offset);
+       } else {
+               /* With data payload */
+               int hdr_size = sizeof(struct pcs_cs_iohdr);
+
+               if (offset < hdr_size) {
+                       kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf;
+                       kreq->hdr_kv.iov_len = hdr_size;
+                       iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, 
hdr_size);
+                       iov_iter_advance(it, offset);
+               } else {
+                       BUG_ON((offset - hdr_size) > kreq->data_len);
+                       BUG_ON(msg->size - hdr_size > kreq->data_len);
+
+                       iov_iter_bvec(it, direction, &kreq->data_bvecs[0],
+                                               kreq->nr_data_bvecs, 
kreq->data_len);
+                       iov_iter_truncate(it, msg->size - hdr_size);
+                       iov_iter_advance(it, offset - hdr_size);
+               }
+       }
+}
+
+struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
+{
+       struct pcs_msg *msg, *resp;
+       struct pcs_rpc_hdr *req_h;
+       struct krpc_req *kreq;
+
+       if (!RPC_IS_RESPONSE(h->type))
+               return NULL;
+
+       msg = pcs_rpc_lookup_xid(ep, &h->xid);
+       if (msg == NULL)
+               return NULL;
+
+       req_h = (struct pcs_rpc_hdr *)msg_inline_head(msg);
+       if (req_h->type != (h->type & ~PCS_RPC_DIRECTION))
+               return NULL;
+
+       kreq = msg->private2;
+
+       resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
+       if (!resp)
+               return NULL;
+
+       memcpy(resp->_inline_buffer, h, sizeof(struct pcs_rpc_hdr));
+       memcpy(kreq->hdr_buf, h, sizeof(struct pcs_rpc_hdr));
+       resp->size = h->len;
+       resp->private = msg;
+       resp->get_iter = krpc_msg_get_response_iter;
+       resp->done = rpc_work_input;
+       pcs_msg_del_calendar(msg);
+
+       return resp;
+}
+
+/*
+ * All request type message from cs are forwarded to userspace directly. The 
PCS KRPC
+ * layer cannot process these request on its own.
+ */
+static void krpc_handle_request(struct pcs_krpc *krpc, struct pcs_msg *msg)
+{
+       struct krpc_completion *comp;
+
+       if (krpc->state != PCS_KRPC_STATE_CONNECTED)
+               return;
+
+       comp = kzalloc(sizeof(*comp) + msg->size, GFP_NOIO);
+       if (!comp)
+               return;
+
+       memcpy(comp->_data_buf, msg->_inline_buffer, msg->size);
+       comp->data_len = msg->size;
+
+       spin_lock(&krpc->lock);
+       if (krpc_completion_post(krpc, comp))
+               kfree(comp);
+}
+
+/*
+ * Let userspace to handle cs_congestion_notify request.
+ */
+void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg)
+{
+       krpc_handle_request(ep->clnt_krpc, msg);
+}
+
+/*
+ * Let userspace to handle the keep_waiting request.
+ */
+void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg 
*msg)
+{
+       krpc_handle_request(ep->clnt_krpc, msg);
+}
+
+unsigned int pcs_krpc_hash(PCS_NODE_ID_T *id)
+{
+       return *(unsigned int *)id % PCS_KRPC_HASH_SIZE;
+}
+
+static void krpc_msg_get_data(struct pcs_msg *msg, int offset,
+               struct iov_iter *it, unsigned int direction)
+{
+       struct krpc_req *kreq = msg->private2;
+       int hdr_size = pcs_krpc_msg_size(kreq->hdr_chunk.len, kreq->flags);
+       int data_size = pcs_krpc_msg_size(kreq->data_len, kreq->flags);
+       struct pcs_krpc *krpc = kreq->krpc;
+
+       if (offset < kreq->hdr_chunk.len) {
+               iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, 
kreq->hdr_chunk.len);
+               iov_iter_advance(it, offset);
+               return;
+       }
+
+       if (offset < hdr_size) {
+               iov_iter_kvec(it, direction, 
&(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv,
+                                                1, hdr_size - offset);
+               return;
+       }
+
+       if (offset < hdr_size + kreq->data_len) {
+               iov_iter_bvec(it, direction, &kreq->data_bvecs[0],
+                               kreq->nr_data_bvecs, kreq->data_len);
+               iov_iter_advance(it, offset - hdr_size);
+               return;
+       }
+
+       BUG_ON(offset >= hdr_size + data_size);
+
+       iov_iter_kvec(it, direction, 
&(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv,
+                                       1, (hdr_size + data_size - offset));
+}
+
+static void pcs_krpc_response_done(struct pcs_msg *msg)
+{
+       struct krpc_req *kreq = msg->private2;
+
+       if (msg->rpc) {
+               pcs_rpc_put(msg->rpc);
+               msg->rpc = NULL;
+       }
+
+       krpc_req_complete(kreq, msg->error.value);
+}
+
+static void pcs_krpc_msg_sent(struct pcs_msg *msg)
+{
+       msg->done = pcs_krpc_response_done;
+       if (pcs_if_error(&msg->error)) {
+               msg->done(msg);
+               return;
+       }
+       pcs_rpc_sent(msg);
+}
+
+static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct 
pcs_krpc_ioc_recvmsg *iocmsg)
+{
+       struct krpc_completion *comp;
+       int res = 0;
+
+       spin_lock(&krpc->lock);
+       if (list_empty(&krpc->completion_queue)) {
+               spin_unlock(&krpc->lock);
+               return 0;
+       }
+
+       comp = list_first_entry(&krpc->completion_queue, struct 
krpc_completion, link);
+       list_del(&comp->link);
+       krpc->nr_completion--;
+       spin_unlock(&krpc->lock);
+
+       if (comp->result) {
+               res -= comp->result;
+               goto out;
+       }
+
+       res = 1;
+       iocmsg->xid = comp->xid;
+       if (comp->xid == 0) {
+               BUG_ON(!comp->data_len);
+               BUG_ON(iocmsg->buf.len < comp->data_len);
+               if (copy_to_user((void __user *)iocmsg->buf.addr, 
comp->_data_buf, comp->data_len))
+                       res = -EFAULT;
+       }
+
+out:
+       krpc_completion_free(comp);
+       return res;
+}
+
+static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct 
pcs_krpc_ioc_sendmsg *iocmsg)
+{
+       struct krpc_req *kreq;
+       struct pcs_msg *msg;
+       struct pcs_krpc_buf_desc *chunk_bd;
+       struct krpc_chunk *chunk;
+       int res, i;
+       struct bio_vec *bvec;
+
+       kreq = krpc_req_alloc();
+       if (!kreq)
+               return -ENOMEM;
+
+       if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) {
+               kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO);
+               if (!kreq->data_chunks) {
+                       res = -ENOMEM;
+                       goto err_free_kreq;
+               }
+       } else {
+               kreq->data_chunks = &kreq->inline_data_chunks[0];
+       }
+
+       /*
+        * Header buff is exactly one page, used for staging message header. 
Will be used for
+        * receiving header of response message as well.
+        */
+       BUG_ON(iocmsg->hdr_chunk.addr & (PAGE_SIZE - 1));
+
+       chunk_bd = &iocmsg->hdr_chunk;
+       chunk = &kreq->hdr_chunk;
+
+       chunk->addr = chunk_bd->addr;
+       chunk->len = chunk_bd->len;
+       chunk->type = KRPC_CHUNK_TYPE_MR;
+
+       chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, 
iocmsg->hdr_chunk.mr_id);
+       BUG_ON(!chunk->mr);
+
+       kreq->hdr_buf = (char *) kmap(pcs_umem_page(chunk->mr->umem, 
chunk->addr));
+       kreq->hdr_kv.iov_base = kreq->hdr_buf;
+       kreq->hdr_kv.iov_len = chunk->len;
+
+       /* data chunk buf descriptors are placed at end of header buf, grow 
backwards */
+       kreq->data_len = 0;
+       kreq->nr_data_chunks = 0;
+       kreq->nr_data_bvecs = 0;
+
+       chunk_bd = (struct pcs_krpc_buf_desc *)(kreq->hdr_buf + PAGE_SIZE) - 1;
+       chunk = kreq->data_chunks;
+       bvec = &kreq->data_bvecs[0];
+
+       for (i = 0; i < iocmsg->nr_data_chunks; i++) {
+               struct page *page;
+               u64 addr;
+               int offset, len;
+
+               chunk->addr = chunk_bd->addr;
+               chunk->len = chunk_bd->len;
+               if (chunk_bd->mr_id) {
+                       chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, 
chunk_bd->mr_id);
+                       chunk->type = KRPC_CHUNK_TYPE_MR;
+                       if (!chunk->mr) {
+                               res = -ENXIO;
+                               goto err_free_data_chunk;
+                       }
+               } else {
+                       /* unregistered memory buf */
+                       chunk->umem = pcs_umem_get(chunk->addr, chunk->len);
+                       if (IS_ERR(chunk->umem)) {
+                               res = PTR_ERR(chunk->umem);
+                               goto err_free_data_chunk;
+                       }
+
+                       chunk->type = KRPC_CHUNK_TYPE_UMEM;
+               }
+
+               /* build the bvecs for the data chunk*/
+               addr = chunk->addr;
+               len = chunk->len;
+
+               while (len) {
+                       /* data bvec array overflow? */
+                       BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
+
+                       if (chunk->type == KRPC_CHUNK_TYPE_MR)
+                               page = pcs_umem_page(chunk->mr->umem, addr);
+                       else
+                               page = pcs_umem_page(chunk->umem, addr);
+
+                       BUG_ON(!page);
+
+                       offset = addr & (PAGE_SIZE - 1);
+
+                       bvec->bv_page = page;
+                       bvec->bv_offset = offset;
+
+                       bvec->bv_len = len < (PAGE_SIZE - offset) ? len : 
(PAGE_SIZE - offset);
+
+                       addr += bvec->bv_len;
+                       len -= bvec->bv_len;
+                       bvec++;
+                       kreq->nr_data_bvecs++;
+               }
+               BUG_ON(len != 0);
+
+               kreq->data_len += chunk->len;
+               chunk++;
+               chunk_bd--;
+               kreq->nr_data_chunks++;
+       }
+
+       kreq->completion.xid = iocmsg->xid;
+       kreq->completion.private = kreq;
+
+       kreq->flags = iocmsg->flags;
+
+       msg = &kreq->msg;
+       msg->private = krpc;
+       msg->private2 = kreq;
+
+       INIT_HLIST_NODE(&msg->kill_link);
+       pcs_clear_error(&msg->error);
+
+       msg->size = iocmsg->msg_size;
+       msg->timeout = iocmsg->timeout;
+
+       msg->rpc = NULL;
+       msg->done = pcs_krpc_msg_sent;
+       msg->get_iter = krpc_msg_get_data;
+
+       spin_lock(&krpc->lock);
+       kreq->krpc = pcs_krpc_get(krpc);
+       list_add_tail(&kreq->link, &krpc->pending_queue);
+       spin_unlock(&krpc->lock);
+       /* DTRACE to be added */
+       pcs_rpc_queue(krpc->rpc, msg);
+
+       return 0;
+
+err_free_data_chunk:
+       for (i = 0; i < kreq->nr_data_chunks; i++) {
+               chunk = &kreq->data_chunks[i];
+               if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
+                       pcs_umem_release(chunk->umem);
+               else
+                       pcs_mr_put(chunk->mr);
+       }
+       pcs_mr_put(kreq->hdr_chunk.mr);
+
+       if (kreq->data_chunks != &kreq->inline_data_chunks[0])
+               kfree(kreq->data_chunks);
+
+err_free_kreq:
+       krpc_req_free(kreq);
+       return res;
+}
+
+static int pcs_krpc_abort(struct pcs_krpc *krpc)
+{
+       struct list_head dispose_list;
+       struct krpc_req *kreq;
+       struct krpc_completion *comp;
+
+       INIT_LIST_HEAD(&dispose_list);
+
+       spin_lock(&krpc->lock);
+
+       if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
+               spin_unlock(&krpc->lock);
+               return 0;
+       }
+
+       krpc->state = PCS_KRPC_STATE_ABORTED;
+
+       /* abort incompleted requests */
+       list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
+       while (!list_empty(&krpc->dispose_queue)) {
+               kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, 
link);
+               kreq->flags |= KRPC_REQ_F_ABORTED;
+       }
+
+       list_splice_tail_init(&krpc->completion_queue, &dispose_list);
+       krpc->nr_completion = 0;
+
+       /* dispose all unprocessed completions */
+       while (!list_empty(&dispose_list)) {
+               comp = list_first_entry(&dispose_list, struct krpc_completion, 
link);
+               list_del(&comp->link);
+               krpc_completion_free(comp);
+       }
+
+       spin_unlock(&krpc->lock);
+
+       return 0;
+}
+
+static long pcs_krpc_ioctl(struct file *file, unsigned int cmd, unsigned long 
arg)
+{
+       struct pcs_krpc_context *ctx = file->private_data;
+       struct pcs_krpc *krpc = ctx->krpc;
+       int res = 0;
+
+       if (unlikely(ctx->gen != krpc->gen))
+               return -EFAULT;
+
+       switch (cmd) {
+       case PCS_KRPC_IOC_SEND_MSG: {
+               struct pcs_krpc_ioc_sendmsg req;
+
+               if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+                       return -EFAULT;
+
+               res = pcs_krpc_ioctl_send_msg(krpc, &req);
+               break;
+       }
+       case PCS_KRPC_IOC_RECV_MSG: {
+               struct pcs_krpc_ioc_recvmsg req;
+
+               res = pcs_krpc_ioctl_recv_msg(krpc, &req);
+
+               if (copy_to_user((void __user *)arg, &req, sizeof(req)))
+                       return -EFAULT;
+               break;
+       }
+       case PCS_KRPC_IOC_ABORT:
+               res = pcs_krpc_abort(krpc);
+               break;
+       default:
+               res = -ENOTTY;
+               break;
+       }
+
+       return res;
+}
+
+static __poll_t pcs_krpc_poll(struct file *file, poll_table *wait)
+{
+       struct pcs_krpc_context *ctx = file->private_data;
+       struct pcs_krpc *krpc = ctx->krpc;
+       __poll_t pollflags = 0;
+
+       poll_wait(file, &krpc->poll_wait, wait);
+
+       if (unlikely(ctx->gen != krpc->gen)) {
+               pollflags |= EPOLLERR;
+               return pollflags;
+       }
+
+       spin_lock(&krpc->lock);
+
+       if (krpc->state == PCS_KRPC_STATE_ABORTED)
+               pollflags |= EPOLLERR;
+       else if (krpc->state == PCS_KRPC_STATE_CONNECTED) {
+               pollflags |= EPOLLOUT;
+               if (!list_empty(&krpc->completion_queue))
+                       pollflags |= EPOLLIN;
+       }
+
+       spin_unlock(&krpc->lock);
+
+       return pollflags;
+}
+
+/*
+ * Close connection
+ */
+static int pcs_krpc_release(struct inode *inode, struct file *file)
+{
+       struct pcs_krpc_context *ctx = file->private_data;
+       struct pcs_krpc *krpc = ctx->krpc;
+
+       /* User may have started a new connection before close the old one */
+       if (ctx->gen == krpc->gen)
+               pcs_krpc_abort(krpc);
+
+       pcs_krpc_put(krpc);
+       kfree(ctx);
+
+       return 0;
+}
+
+static const struct file_operations pcs_krpc_fops = {
+       .owner = THIS_MODULE,
+       .release = pcs_krpc_release,
+       .poll   = pcs_krpc_poll,
+       .unlocked_ioctl = pcs_krpc_ioctl,
+       .llseek         = no_llseek,
+};
+
+struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
+{
+       struct pcs_krpc *krpc;
+
+       hlist_for_each_entry(krpc, &krpcs->ht[pcs_krpc_hash(id)], hlist) {
+               if (memcmp(&krpc->id, id, sizeof(krpc->id)) == 0)
+                       return krpc;
+       }
+       return NULL;
+}
+
+struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc)
+{
+       refcount_inc(&krpc->refcnt);
+       return krpc;
+}
+
+void pcs_krpc_put(struct pcs_krpc *krpc)
+{
+       if (refcount_dec_and_test(&krpc->refcnt))
+               kfree(krpc);
+}
+
+/*
+ * Create a new pcs_krpc
+ */
+int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+               PCS_NET_ADDR_T *addr, int cs_flags)
+{
+       struct pcs_krpc *krpc;
+
+       krpc = kzalloc(sizeof(struct pcs_krpc), GFP_NOIO);
+       if (!krpc)
+               return -ENOMEM;
+
+       INIT_HLIST_NODE(&krpc->hlist);
+       INIT_LIST_HEAD(&krpc->link);
+       spin_lock_init(&krpc->lock);
+       refcount_set(&krpc->refcnt, 1);
+       INIT_LIST_HEAD(&krpc->pending_queue);
+       INIT_LIST_HEAD(&krpc->dispose_queue);
+       INIT_LIST_HEAD(&krpc->completion_queue);
+       init_waitqueue_head(&krpc->poll_wait);
+
+       krpc->krpcs = krpcs;
+       krpc->id = *id;
+       krpc->gen = 0;
+       krpc->state = PCS_KRPC_STATE_UNCONN;
+
+       krpc->rpc = pcs_rpc_clnt_create(&cc_from_krpcset(krpcs)->eng, id, addr, 
cs_flags);
+       if (!krpc->rpc) {
+               kfree(krpc);
+               return -ENOMEM;
+       }
+
+       krpc->rpc->clnt_krpc = krpc;
+
+       spin_lock(&krpc->lock);
+       spin_lock(&krpcs->lock);
+       hlist_add_head(&krpc->hlist, &krpcs->ht[pcs_krpc_hash(&krpc->id)]);
+       list_add_tail(&krpc->link, &krpcs->list);
+       krpcs->nkrpc++;
+       spin_unlock(&krpcs->lock);
+       spin_unlock(&krpc->lock);
+
+       return 0;
+}
+
+int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+               PCS_NET_ADDR_T *addr, int flags)
+{
+       struct pcs_krpc *krpc;
+
+       krpc = pcs_krpc_lookup(krpcs, id);
+       if (!krpc)
+               return -ENXIO;
+
+       pcs_rpc_set_address(krpc->rpc, addr);
+
+       if (flags & CS_FL_LOCAL_SOCK)
+               krpc->rpc->flags |= PCS_RPC_F_LOCAL;
+       else
+               krpc->rpc->flags &= ~PCS_RPC_F_LOCAL;
+
+       return 0;
+}
+/*
+ * Connect to a pcs_krpc, return a valid fd on success.
+ */
+int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
+{
+       struct pcs_krpc *krpc;
+       int fd;
+       struct file *file;
+       struct pcs_krpc_context *ctx;
+
+       krpc = pcs_krpc_lookup(krpcs, id);
+       if (!krpc)
+               return -ENXIO;
+
+       if (krpc->state == PCS_KRPC_STATE_CONNECTED ||
+               krpc->state == PCS_KRPC_STATE_DESTROYED)
+               return -EPERM;
+
+       ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
+       if (!ctx)
+               return -ENOMEM;
+
+       fd = get_unused_fd_flags(O_CLOEXEC);
+       if (fd < 0) {
+               kfree(ctx);
+               return fd;
+       }
+
+       file = anon_inode_getfile("[pcs_krpc]", &pcs_krpc_fops, ctx, 0);
+       if (IS_ERR(file)) {
+               kfree(ctx);
+               put_unused_fd(fd);
+               fd = PTR_ERR(file);
+               return fd;
+       }
+
+       fd_install(fd, file);
+
+       spin_lock(&krpc->lock);
+       ctx->gen = ++krpc->gen;
+       ctx->krpc = pcs_krpc_get(krpc);
+       /*
+        * the krpc should always be connected regardless state of
+        * underlying RPC
+        */
+       krpc->state = PCS_KRPC_STATE_CONNECTED;
+       spin_unlock(&krpc->lock);
+
+       return fd;
+}
+
+static void __pcs_krpc_destroy(struct pcs_krpc *krpc)
+{
+       spin_lock(&krpc->lock);
+
+       krpc->state = PCS_KRPC_STATE_DESTROYED;
+
+       /*Remove from krpc set*/
+       spin_lock(&krpc->krpcs->lock);
+       hlist_del(&krpc->hlist);
+       list_del(&krpc->link);
+       krpc->krpcs->nkrpc--;
+       spin_unlock(&krpc->krpcs->lock);
+
+       spin_unlock(&krpc->lock);
+
+       if (krpc->rpc) {
+               krpc->rpc->clnt_krpc = NULL;
+               pcs_rpc_clnt_close(krpc->rpc);
+               krpc->rpc = NULL;
+       }
+       pcs_krpc_put(krpc);
+}
+
+int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
+{
+       struct pcs_krpc *krpc;
+
+       krpc = pcs_krpc_lookup(krpcs, id);
+       BUG_ON(!krpc);
+
+       /* Abort current connection */
+       pcs_krpc_abort(krpc);
+
+       __pcs_krpc_destroy(krpc);
+
+       return 0;
+}
+
+void pcs_krpcset_init(struct pcs_krpc_set *krpcs)
+{
+       unsigned int i;
+
+       for (i = 0; i < PCS_KRPC_HASH_SIZE; i++)
+               INIT_HLIST_HEAD(&krpcs->ht[i]);
+
+       INIT_LIST_HEAD(&krpcs->list);
+       krpcs->nkrpc = 0;
+
+       spin_lock_init(&krpcs->lock);
+}
+
+void pcs_krpcset_fini(struct pcs_krpc_set *krpcs)
+{
+       spin_lock(&krpcs->lock);
+       while (!list_empty(&krpcs->list)) {
+               struct pcs_krpc *krpc;
+
+               krpc = list_first_entry(&krpcs->list, struct pcs_krpc, link);
+               spin_unlock(&krpcs->lock);
+               pcs_krpc_abort(krpc);
+               __pcs_krpc_destroy(krpc);
+               spin_lock(&krpcs->lock);
+       }
+       spin_unlock(&krpcs->lock);
+
+       BUG_ON(!list_empty(&krpcs->list));
+       BUG_ON(krpcs->nkrpc != 0);
+}
+
+int __init pcs_krpc_init(void)
+{
+       krpc_req_cachep = kmem_cache_create("pcs_krpc_req",
+                                                       sizeof(struct 
krpc_req), 0,
+                                                       
SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL);
+
+       if (!krpc_req_cachep)
+               return -ENOMEM;
+
+       return 0;
+}
+
+void pcs_krpc_fini(void)
+{
+       kmem_cache_destroy(krpc_req_cachep);
+}
diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
new file mode 100644
index 000000000000..6c0ef20ebc99
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_krpc.h
@@ -0,0 +1,144 @@
+/*
+ *  Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
+ */
+
+#ifndef _PCS_KRPC_H_
+#define _PCS_KRPC_H_ 1
+#include <linux/types.h>
+#include <linux/bvec.h>
+
+#include "pcs_prot_types.h"
+#include "pcs_perfcounters.h"
+#include "pcs_rpc_clnt.h"
+#include "pcs_krpc_prot.h"
+#include "pcs_mr.h"
+
+struct krpc_chunk {
+       u64             addr;
+       u32             len;
+       u32             type;
+#define KRPC_CHUNK_TYPE_MR             0
+#define KRPC_CHUNK_TYPE_UMEM   1
+       union {
+               struct pcs_mr *mr;
+               struct pcs_umem *umem;
+       };
+};
+
+#define PCS_KRPC_HASH_SIZE     1024
+struct pcs_krpc_set {
+       /* Set of krpcs */
+       struct hlist_head               ht[PCS_KRPC_HASH_SIZE];
+       struct list_head                list;
+       unsigned int                    nkrpc;
+
+       spinlock_t                              lock;
+};
+
+enum {
+       PCS_KRPC_STATE_UNCONN,
+       PCS_KRPC_STATE_CONNECTED,
+       PCS_KRPC_STATE_ABORTED,
+       PCS_KRPC_STATE_DESTROYED,
+};
+
+struct pcs_krpc {
+       struct hlist_node               hlist;
+       struct list_head                link;
+       spinlock_t                              lock;
+       refcount_t                              refcnt;
+
+       struct pcs_rpc                  *rpc;
+
+       struct pcs_krpc_set             *krpcs;
+
+       PCS_NODE_ID_T                   id;
+
+       u32     state;
+       u32     gen;
+
+       struct list_head                pending_queue;
+       struct list_head                dispose_queue;
+       struct list_head                completion_queue;
+       int nr_completion;
+
+       /** Wait queue head for poll */
+       wait_queue_head_t               poll_wait;
+};
+
+struct pcs_krpc_context {
+       struct pcs_krpc *krpc;
+       u32 gen;
+};
+/*
+ * Completion message to be received by userspace
+ */
+struct krpc_completion {
+       struct list_head        link;           /* in krpc->completion_queue */
+
+       u64                     xid;
+       int                     result;
+
+       void            *private;
+       int                     data_len;
+       u8                      _data_buf[0];
+};
+
+#define KRPC_MAX_DATA_PAGES                    256
+#define NR_KRPC_DATA_CHUNKS_INLINE     4
+struct krpc_req {
+       struct list_head        link;
+       spinlock_t                      lock;
+       struct pcs_krpc         *krpc;
+
+#define KRPC_REQ_F_ALIGNMENT   PCS_KRPC_MSG_F_ALIGNMENT
+#define KRPC_REQ_F_RESP_BUFF   PCS_KRPC_MSG_F_RESP_BUFF        /* data buff is 
for read response */
+#define KRPC_REQ_F_ABORTED             0x10000
+       int flags;
+
+       struct pcs_msg          msg;
+
+       char                            *hdr_buf;
+       struct kvec                     hdr_kv;
+       struct krpc_chunk       hdr_chunk;
+
+       int                                     nr_data_chunks;
+       struct krpc_chunk       *data_chunks;
+       struct krpc_chunk       inline_data_chunks[NR_KRPC_DATA_CHUNKS_INLINE];
+
+       int data_len;
+       int nr_data_bvecs;
+       struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES];
+
+       struct krpc_completion completion;
+};
+
+static inline u32 pcs_krpc_msg_size(u32 size, u8 flags)
+{
+       if (flags & PCS_KRPC_MSG_F_ALIGNMENT)
+               size = ALIGN(size, PCS_KRPC_MSG_ALIGNMENT);
+
+       return size;
+}
+
+int pcs_krpc_init(void);
+void pcs_krpc_fini(void);
+
+void pcs_krpcset_init(struct pcs_krpc_set *krpcs);
+void pcs_krpcset_fini(struct pcs_krpc_set *krpcs);
+
+struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T 
*id);
+int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+               PCS_NET_ADDR_T *addr, int cs_flags);
+int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+               PCS_NET_ADDR_T *addr, int flags);
+int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
+int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
+
+void pcs_krpc_put(struct pcs_krpc *krpc);
+struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc);
+
+struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
+void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg 
*msg);
+void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg);
+#endif
diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h
new file mode 100644
index 000000000000..4c5bbf4492d1
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h
@@ -0,0 +1,44 @@
+#ifndef _PCS_KRPC_PROT_H_
+#define _PCS_KRPC_PROT_H_
+#include <linux/ioctl.h>
+
+#include "pcs_prot_types.h"
+
+/* Device ioctls: */
+#define PCS_KRPC_IOC_MAGIC  255
+
+/* ioctl cmds supported by the '[pcs_krpc]' anonymous inode */
+#define PCS_KRPC_IOC_SEND_MSG  _IO(PCS_KRPC_IOC_MAGIC, 10)
+#define PCS_KRPC_IOC_RECV_MSG  _IO(PCS_KRPC_IOC_MAGIC, 11)
+#define PCS_KRPC_IOC_ABORT             _IO(PCS_KRPC_IOC_MAGIC, 12)
+
+struct pcs_krpc_buf_desc {
+       u64  addr;              /* buf address in userspace. */
+       u32  len;               /* size of the buf */
+       u32  mr_id;             /* mr id */
+};
+
+#define PCS_KRPC_MSG_ALIGNMENT         (512ULL)
+
+#define PCS_KRPC_MSG_F_ALIGNMENT       0x01
+#define PCS_KRPC_MSG_F_RESP_BUFF       0x02
+/*
+ * To avoid copying, a msg is sent to kernel in an array of buffer descriptor.
+ * Each buffer descriptor points to a buf contains a chunk of the msg. And all
+ * chunk buffers are from registered MRs
+ */
+struct pcs_krpc_ioc_sendmsg {
+       u64             xid;                            /* context id */
+       u32             msg_size;                       /* total size of the 
msg */
+       u16             timeout;                        /* timeout */
+       u8              flags;                          /* alignment,  */
+       u8              nr_data_chunks;         /* total number of data chunks 
*/
+       struct pcs_krpc_buf_desc hdr_chunk;     /* the buf holding msg header */
+};
+
+struct pcs_krpc_ioc_recvmsg {
+       u64             xid;    /* context id */
+       struct pcs_krpc_buf_desc buf;   /* for cs congestion notification and 
rpc keep waiting msg.*/
+};
+
+#endif
-- 
2.39.3 (Apple Git-146)

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to