Currently there are 2 connections for every RPC, one in userspace, one in kernel. This wastes a lot of resources on client hosts from a huge cluster. It's therefore desirable to eliminate the connection in userspace by using kernel RPC directly from userspace.
The newly implemented pcs_krpc serves as the glue layer between userspace RPC and kernel RPC. It provides execution context for forwarding messages from/to userspace RPC to/from kernel RPC. It also providee APIs for userspace to send/receive RPC messages. https://pmc.acronis.work/browse/VSTOR-82613 Signed-off-by: Liu Kui <kui....@virtuozzo.com> --- fs/fuse/kio/pcs/pcs_krpc.c | 815 ++++++++++++++++++++++++++++++++ fs/fuse/kio/pcs/pcs_krpc.h | 144 ++++++ fs/fuse/kio/pcs/pcs_krpc_prot.h | 44 ++ 3 files changed, 1003 insertions(+) create mode 100644 fs/fuse/kio/pcs/pcs_krpc.c create mode 100644 fs/fuse/kio/pcs/pcs_krpc.h create mode 100644 fs/fuse/kio/pcs/pcs_krpc_prot.h diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c new file mode 100644 index 000000000000..3169c54a2a8b --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_krpc.c @@ -0,0 +1,815 @@ +/* + * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved. + */ + +#include <linux/types.h> +#include <linux/list.h> +#include <linux/rbtree.h> +#include <linux/refcount.h> +#include <linux/file.h> +#include <linux/anon_inodes.h> + +#include "pcs_types.h" +#include "pcs_cluster.h" +#include "pcs_cs_prot.h" +#include "pcs_req.h" +#include "pcs_krpc.h" + +struct kmem_cache *krpc_req_cachep; + +static int krpc_completion_post(struct pcs_krpc *krpc, struct krpc_completion *comp) +__releases(krpc->lock) +{ + /* post complete only when connected */ + if (krpc->state != PCS_KRPC_STATE_CONNECTED) { + spin_unlock(&krpc->lock); + return -1; + } + list_add_tail(&comp->link, &krpc->completion_queue); + krpc->nr_completion++; + spin_unlock(&krpc->lock); + + wake_up_poll(&krpc->poll_wait, EPOLLIN); + return 0; +} + +static struct krpc_req *krpc_req_alloc(void) +{ + struct krpc_req *kreq; + + kreq = kmem_cache_alloc(krpc_req_cachep, GFP_NOIO); + if (!kreq) + return NULL; + + spin_lock_init(&kreq->lock); + kreq->flags = 0; + memset(&kreq->completion, 0, sizeof(struct krpc_completion)); + + return kreq; +} + +static void krpc_req_free(struct krpc_req *kreq) +{ + kmem_cache_free(krpc_req_cachep, kreq); +} + +static void krpc_completion_free(struct krpc_completion *comp) +{ + if (comp->private) + krpc_req_free((struct krpc_req *)comp->private); + else + kfree(comp); +} + +static void krpc_req_complete(struct krpc_req *kreq, int error) +{ + struct krpc_completion *comp = &kreq->completion; + struct pcs_krpc *krpc = kreq->krpc; + int i; + + BUG_ON(!comp->xid); + + comp->result = error; + + pcs_mr_put(kreq->hdr_chunk.mr); + + for (i = 0; i < kreq->nr_data_chunks; i++) { + struct krpc_chunk *chunk; + + chunk = &kreq->data_chunks[i]; + if (chunk->type == KRPC_CHUNK_TYPE_UMEM) + pcs_umem_release(chunk->umem); + else + pcs_mr_put(chunk->mr); + } + + if (kreq->data_chunks != &kreq->inline_data_chunks[0]) + kfree(kreq->data_chunks); + + spin_lock(&krpc->lock); + list_del(&kreq->link); + + if (kreq->flags & KRPC_REQ_F_ABORTED) { + krpc_req_free(kreq); + spin_unlock(&krpc->lock); + } else if (krpc_completion_post(krpc, comp)) { + krpc_req_free(kreq); + } + + pcs_krpc_put(krpc); +} + +static void krpc_msg_get_response_iter(struct pcs_msg *msg, int offset, + struct iov_iter *it, unsigned int direction) +{ + struct pcs_msg *req = msg->private; + struct krpc_req *kreq = req->private2; + + if (!(kreq->flags & KRPC_REQ_F_RESP_BUFF)) { + /* No data payload */ + BUG_ON(msg->size > PAGE_SIZE); + + kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf; + kreq->hdr_kv.iov_len = msg->size; + + iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, msg->size); + iov_iter_advance(it, offset); + } else { + /* With data payload */ + int hdr_size = sizeof(struct pcs_cs_iohdr); + + if (offset < hdr_size) { + kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf; + kreq->hdr_kv.iov_len = hdr_size; + iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, hdr_size); + iov_iter_advance(it, offset); + } else { + BUG_ON((offset - hdr_size) > kreq->data_len); + BUG_ON(msg->size - hdr_size > kreq->data_len); + + iov_iter_bvec(it, direction, &kreq->data_bvecs[0], + kreq->nr_data_bvecs, kreq->data_len); + iov_iter_truncate(it, msg->size - hdr_size); + iov_iter_advance(it, offset - hdr_size); + } + } +} + +struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) +{ + struct pcs_msg *msg, *resp; + struct pcs_rpc_hdr *req_h; + struct krpc_req *kreq; + + if (!RPC_IS_RESPONSE(h->type)) + return NULL; + + msg = pcs_rpc_lookup_xid(ep, &h->xid); + if (msg == NULL) + return NULL; + + req_h = (struct pcs_rpc_hdr *)msg_inline_head(msg); + if (req_h->type != (h->type & ~PCS_RPC_DIRECTION)) + return NULL; + + kreq = msg->private2; + + resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr)); + if (!resp) + return NULL; + + memcpy(resp->_inline_buffer, h, sizeof(struct pcs_rpc_hdr)); + memcpy(kreq->hdr_buf, h, sizeof(struct pcs_rpc_hdr)); + resp->size = h->len; + resp->private = msg; + resp->get_iter = krpc_msg_get_response_iter; + resp->done = rpc_work_input; + pcs_msg_del_calendar(msg); + + return resp; +} + +/* + * All request type message from cs are forwarded to userspace directly. The PCS KRPC + * layer cannot process these request on its own. + */ +static void krpc_handle_request(struct pcs_krpc *krpc, struct pcs_msg *msg) +{ + struct krpc_completion *comp; + + if (krpc->state != PCS_KRPC_STATE_CONNECTED) + return; + + comp = kzalloc(sizeof(*comp) + msg->size, GFP_NOIO); + if (!comp) + return; + + memcpy(comp->_data_buf, msg->_inline_buffer, msg->size); + comp->data_len = msg->size; + + spin_lock(&krpc->lock); + if (krpc_completion_post(krpc, comp)) + kfree(comp); +} + +/* + * Let userspace to handle cs_congestion_notify request. + */ +void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg) +{ + krpc_handle_request(ep->clnt_krpc, msg); +} + +/* + * Let userspace to handle the keep_waiting request. + */ +void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) +{ + krpc_handle_request(ep->clnt_krpc, msg); +} + +unsigned int pcs_krpc_hash(PCS_NODE_ID_T *id) +{ + return *(unsigned int *)id % PCS_KRPC_HASH_SIZE; +} + +static void krpc_msg_get_data(struct pcs_msg *msg, int offset, + struct iov_iter *it, unsigned int direction) +{ + struct krpc_req *kreq = msg->private2; + int hdr_size = pcs_krpc_msg_size(kreq->hdr_chunk.len, kreq->flags); + int data_size = pcs_krpc_msg_size(kreq->data_len, kreq->flags); + struct pcs_krpc *krpc = kreq->krpc; + + if (offset < kreq->hdr_chunk.len) { + iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, kreq->hdr_chunk.len); + iov_iter_advance(it, offset); + return; + } + + if (offset < hdr_size) { + iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv, + 1, hdr_size - offset); + return; + } + + if (offset < hdr_size + kreq->data_len) { + iov_iter_bvec(it, direction, &kreq->data_bvecs[0], + kreq->nr_data_bvecs, kreq->data_len); + iov_iter_advance(it, offset - hdr_size); + return; + } + + BUG_ON(offset >= hdr_size + data_size); + + iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv, + 1, (hdr_size + data_size - offset)); +} + +static void pcs_krpc_response_done(struct pcs_msg *msg) +{ + struct krpc_req *kreq = msg->private2; + + if (msg->rpc) { + pcs_rpc_put(msg->rpc); + msg->rpc = NULL; + } + + krpc_req_complete(kreq, msg->error.value); +} + +static void pcs_krpc_msg_sent(struct pcs_msg *msg) +{ + msg->done = pcs_krpc_response_done; + if (pcs_if_error(&msg->error)) { + msg->done(msg); + return; + } + pcs_rpc_sent(msg); +} + +static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_recvmsg *iocmsg) +{ + struct krpc_completion *comp; + int res = 0; + + spin_lock(&krpc->lock); + if (list_empty(&krpc->completion_queue)) { + spin_unlock(&krpc->lock); + return 0; + } + + comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link); + list_del(&comp->link); + krpc->nr_completion--; + spin_unlock(&krpc->lock); + + if (comp->result) { + res -= comp->result; + goto out; + } + + res = 1; + iocmsg->xid = comp->xid; + if (comp->xid == 0) { + BUG_ON(!comp->data_len); + BUG_ON(iocmsg->buf.len < comp->data_len); + if (copy_to_user((void __user *)iocmsg->buf.addr, comp->_data_buf, comp->data_len)) + res = -EFAULT; + } + +out: + krpc_completion_free(comp); + return res; +} + +static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg) +{ + struct krpc_req *kreq; + struct pcs_msg *msg; + struct pcs_krpc_buf_desc *chunk_bd; + struct krpc_chunk *chunk; + int res, i; + struct bio_vec *bvec; + + kreq = krpc_req_alloc(); + if (!kreq) + return -ENOMEM; + + if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) { + kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO); + if (!kreq->data_chunks) { + res = -ENOMEM; + goto err_free_kreq; + } + } else { + kreq->data_chunks = &kreq->inline_data_chunks[0]; + } + + /* + * Header buff is exactly one page, used for staging message header. Will be used for + * receiving header of response message as well. + */ + BUG_ON(iocmsg->hdr_chunk.addr & (PAGE_SIZE - 1)); + + chunk_bd = &iocmsg->hdr_chunk; + chunk = &kreq->hdr_chunk; + + chunk->addr = chunk_bd->addr; + chunk->len = chunk_bd->len; + chunk->type = KRPC_CHUNK_TYPE_MR; + + chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, iocmsg->hdr_chunk.mr_id); + BUG_ON(!chunk->mr); + + kreq->hdr_buf = (char *) kmap(pcs_umem_page(chunk->mr->umem, chunk->addr)); + kreq->hdr_kv.iov_base = kreq->hdr_buf; + kreq->hdr_kv.iov_len = chunk->len; + + /* data chunk buf descriptors are placed at end of header buf, grow backwards */ + kreq->data_len = 0; + kreq->nr_data_chunks = 0; + kreq->nr_data_bvecs = 0; + + chunk_bd = (struct pcs_krpc_buf_desc *)(kreq->hdr_buf + PAGE_SIZE) - 1; + chunk = kreq->data_chunks; + bvec = &kreq->data_bvecs[0]; + + for (i = 0; i < iocmsg->nr_data_chunks; i++) { + struct page *page; + u64 addr; + int offset, len; + + chunk->addr = chunk_bd->addr; + chunk->len = chunk_bd->len; + if (chunk_bd->mr_id) { + chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id); + chunk->type = KRPC_CHUNK_TYPE_MR; + if (!chunk->mr) { + res = -ENXIO; + goto err_free_data_chunk; + } + } else { + /* unregistered memory buf */ + chunk->umem = pcs_umem_get(chunk->addr, chunk->len); + if (IS_ERR(chunk->umem)) { + res = PTR_ERR(chunk->umem); + goto err_free_data_chunk; + } + + chunk->type = KRPC_CHUNK_TYPE_UMEM; + } + + /* build the bvecs for the data chunk*/ + addr = chunk->addr; + len = chunk->len; + + while (len) { + /* data bvec array overflow? */ + BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES); + + if (chunk->type == KRPC_CHUNK_TYPE_MR) + page = pcs_umem_page(chunk->mr->umem, addr); + else + page = pcs_umem_page(chunk->umem, addr); + + BUG_ON(!page); + + offset = addr & (PAGE_SIZE - 1); + + bvec->bv_page = page; + bvec->bv_offset = offset; + + bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset); + + addr += bvec->bv_len; + len -= bvec->bv_len; + bvec++; + kreq->nr_data_bvecs++; + } + BUG_ON(len != 0); + + kreq->data_len += chunk->len; + chunk++; + chunk_bd--; + kreq->nr_data_chunks++; + } + + kreq->completion.xid = iocmsg->xid; + kreq->completion.private = kreq; + + kreq->flags = iocmsg->flags; + + msg = &kreq->msg; + msg->private = krpc; + msg->private2 = kreq; + + INIT_HLIST_NODE(&msg->kill_link); + pcs_clear_error(&msg->error); + + msg->size = iocmsg->msg_size; + msg->timeout = iocmsg->timeout; + + msg->rpc = NULL; + msg->done = pcs_krpc_msg_sent; + msg->get_iter = krpc_msg_get_data; + + spin_lock(&krpc->lock); + kreq->krpc = pcs_krpc_get(krpc); + list_add_tail(&kreq->link, &krpc->pending_queue); + spin_unlock(&krpc->lock); + /* DTRACE to be added */ + pcs_rpc_queue(krpc->rpc, msg); + + return 0; + +err_free_data_chunk: + for (i = 0; i < kreq->nr_data_chunks; i++) { + chunk = &kreq->data_chunks[i]; + if (chunk->type == KRPC_CHUNK_TYPE_UMEM) + pcs_umem_release(chunk->umem); + else + pcs_mr_put(chunk->mr); + } + pcs_mr_put(kreq->hdr_chunk.mr); + + if (kreq->data_chunks != &kreq->inline_data_chunks[0]) + kfree(kreq->data_chunks); + +err_free_kreq: + krpc_req_free(kreq); + return res; +} + +static int pcs_krpc_abort(struct pcs_krpc *krpc) +{ + struct list_head dispose_list; + struct krpc_req *kreq; + struct krpc_completion *comp; + + INIT_LIST_HEAD(&dispose_list); + + spin_lock(&krpc->lock); + + if (krpc->state != PCS_KRPC_STATE_CONNECTED) { + spin_unlock(&krpc->lock); + return 0; + } + + krpc->state = PCS_KRPC_STATE_ABORTED; + + /* abort incompleted requests */ + list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue); + list_for_each_entry(kreq, &krpc->dispose_queue, link) + kreq->flags |= KRPC_REQ_F_ABORTED; + + list_splice_tail_init(&krpc->completion_queue, &dispose_list); + krpc->nr_completion = 0; + + spin_unlock(&krpc->lock); + + /* dispose all unprocessed completions */ + while (!list_empty(&dispose_list)) { + comp = list_first_entry(&dispose_list, struct krpc_completion, link); + list_del(&comp->link); + krpc_completion_free(comp); + } + + return 0; +} + +static long pcs_krpc_ioctl(struct file *file, unsigned int cmd, unsigned long arg) +{ + struct pcs_krpc_context *ctx = file->private_data; + struct pcs_krpc *krpc = ctx->krpc; + int res = 0; + + if (unlikely(ctx->gen != krpc->gen)) + return -EFAULT; + + switch (cmd) { + case PCS_KRPC_IOC_SEND_MSG: { + struct pcs_krpc_ioc_sendmsg req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_krpc_ioctl_send_msg(krpc, &req); + break; + } + case PCS_KRPC_IOC_RECV_MSG: { + struct pcs_krpc_ioc_recvmsg req; + + res = pcs_krpc_ioctl_recv_msg(krpc, &req); + + if (copy_to_user((void __user *)arg, &req, sizeof(req))) + return -EFAULT; + break; + } + case PCS_KRPC_IOC_ABORT: + res = pcs_krpc_abort(krpc); + break; + default: + res = -ENOTTY; + break; + } + + return res; +} + +static __poll_t pcs_krpc_poll(struct file *file, poll_table *wait) +{ + struct pcs_krpc_context *ctx = file->private_data; + struct pcs_krpc *krpc = ctx->krpc; + __poll_t pollflags = 0; + + poll_wait(file, &krpc->poll_wait, wait); + + if (unlikely(ctx->gen != krpc->gen)) { + pollflags |= EPOLLERR; + return pollflags; + } + + spin_lock(&krpc->lock); + + if (krpc->state == PCS_KRPC_STATE_ABORTED) + pollflags |= EPOLLERR; + else if (krpc->state == PCS_KRPC_STATE_CONNECTED) { + pollflags |= EPOLLOUT; + if (!list_empty(&krpc->completion_queue)) + pollflags |= EPOLLIN; + } + + spin_unlock(&krpc->lock); + + return pollflags; +} + +/* + * Close connection + */ +static int pcs_krpc_release(struct inode *inode, struct file *file) +{ + struct pcs_krpc_context *ctx = file->private_data; + struct pcs_krpc *krpc = ctx->krpc; + + /* User may have started a new connection before close the old one */ + if (ctx->gen == krpc->gen) + pcs_krpc_abort(krpc); + + pcs_krpc_put(krpc); + kfree(ctx); + + return 0; +} + +static const struct file_operations pcs_krpc_fops = { + .owner = THIS_MODULE, + .release = pcs_krpc_release, + .poll = pcs_krpc_poll, + .unlocked_ioctl = pcs_krpc_ioctl, + .llseek = no_llseek, +}; + +struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) +{ + struct pcs_krpc *krpc; + + hlist_for_each_entry(krpc, &krpcs->ht[pcs_krpc_hash(id)], hlist) { + if (memcmp(&krpc->id, id, sizeof(krpc->id)) == 0) + return krpc; + } + return NULL; +} + +struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc) +{ + refcount_inc(&krpc->refcnt); + return krpc; +} + +void pcs_krpc_put(struct pcs_krpc *krpc) +{ + if (refcount_dec_and_test(&krpc->refcnt)) + kfree(krpc); +} + +/* + * Create a new pcs_krpc + */ +int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int cs_flags) +{ + struct pcs_krpc *krpc; + + krpc = kzalloc(sizeof(struct pcs_krpc), GFP_NOIO); + if (!krpc) + return -ENOMEM; + + INIT_HLIST_NODE(&krpc->hlist); + INIT_LIST_HEAD(&krpc->link); + spin_lock_init(&krpc->lock); + refcount_set(&krpc->refcnt, 1); + INIT_LIST_HEAD(&krpc->pending_queue); + INIT_LIST_HEAD(&krpc->dispose_queue); + INIT_LIST_HEAD(&krpc->completion_queue); + init_waitqueue_head(&krpc->poll_wait); + + krpc->krpcs = krpcs; + krpc->id = *id; + krpc->gen = 0; + krpc->state = PCS_KRPC_STATE_UNCONN; + + krpc->rpc = pcs_rpc_clnt_create(&cc_from_krpcset(krpcs)->eng, id, addr, cs_flags); + if (!krpc->rpc) { + kfree(krpc); + return -ENOMEM; + } + + krpc->rpc->clnt_krpc = krpc; + + spin_lock(&krpc->lock); + spin_lock(&krpcs->lock); + hlist_add_head(&krpc->hlist, &krpcs->ht[pcs_krpc_hash(&krpc->id)]); + list_add_tail(&krpc->link, &krpcs->list); + krpcs->nkrpc++; + spin_unlock(&krpcs->lock); + spin_unlock(&krpc->lock); + + return 0; +} + +int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int flags) +{ + struct pcs_krpc *krpc; + + krpc = pcs_krpc_lookup(krpcs, id); + if (!krpc) + return -ENXIO; + + pcs_rpc_set_address(krpc->rpc, addr); + + if (flags & CS_FL_LOCAL_SOCK) + krpc->rpc->flags |= PCS_RPC_F_LOCAL; + else + krpc->rpc->flags &= ~PCS_RPC_F_LOCAL; + + return 0; +} +/* + * Connect to a pcs_krpc, return a valid fd on success. + */ +int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) +{ + struct pcs_krpc *krpc; + int fd; + struct file *file; + struct pcs_krpc_context *ctx; + + krpc = pcs_krpc_lookup(krpcs, id); + if (!krpc) + return -ENXIO; + + if (krpc->state == PCS_KRPC_STATE_CONNECTED || + krpc->state == PCS_KRPC_STATE_DESTROYED) + return -EPERM; + + ctx = kmalloc(sizeof(*ctx), GFP_KERNEL); + if (!ctx) + return -ENOMEM; + + fd = get_unused_fd_flags(O_CLOEXEC); + if (fd < 0) { + kfree(ctx); + return fd; + } + + file = anon_inode_getfile("[pcs_krpc]", &pcs_krpc_fops, ctx, 0); + if (IS_ERR(file)) { + kfree(ctx); + put_unused_fd(fd); + fd = PTR_ERR(file); + return fd; + } + + fd_install(fd, file); + + spin_lock(&krpc->lock); + ctx->gen = ++krpc->gen; + ctx->krpc = pcs_krpc_get(krpc); + /* + * the krpc should always be connected regardless state of + * underlying RPC + */ + krpc->state = PCS_KRPC_STATE_CONNECTED; + spin_unlock(&krpc->lock); + + return fd; +} + +static void __pcs_krpc_destroy(struct pcs_krpc *krpc) +{ + spin_lock(&krpc->lock); + + krpc->state = PCS_KRPC_STATE_DESTROYED; + + /*Remove from krpc set*/ + spin_lock(&krpc->krpcs->lock); + hlist_del(&krpc->hlist); + list_del(&krpc->link); + krpc->krpcs->nkrpc--; + spin_unlock(&krpc->krpcs->lock); + + spin_unlock(&krpc->lock); + + if (krpc->rpc) { + krpc->rpc->clnt_krpc = NULL; + pcs_rpc_clnt_close(krpc->rpc); + krpc->rpc = NULL; + } + pcs_krpc_put(krpc); +} + +int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) +{ + struct pcs_krpc *krpc; + + krpc = pcs_krpc_lookup(krpcs, id); + BUG_ON(!krpc); + + /* Abort current connection */ + pcs_krpc_abort(krpc); + + __pcs_krpc_destroy(krpc); + + return 0; +} + +void pcs_krpcset_init(struct pcs_krpc_set *krpcs) +{ + unsigned int i; + + for (i = 0; i < PCS_KRPC_HASH_SIZE; i++) + INIT_HLIST_HEAD(&krpcs->ht[i]); + + INIT_LIST_HEAD(&krpcs->list); + krpcs->nkrpc = 0; + + spin_lock_init(&krpcs->lock); +} + +void pcs_krpcset_fini(struct pcs_krpc_set *krpcs) +{ + spin_lock(&krpcs->lock); + while (!list_empty(&krpcs->list)) { + struct pcs_krpc *krpc; + + krpc = list_first_entry(&krpcs->list, struct pcs_krpc, link); + spin_unlock(&krpcs->lock); + pcs_krpc_abort(krpc); + __pcs_krpc_destroy(krpc); + spin_lock(&krpcs->lock); + } + spin_unlock(&krpcs->lock); + + BUG_ON(!list_empty(&krpcs->list)); + BUG_ON(krpcs->nkrpc != 0); +} + +int __init pcs_krpc_init(void) +{ + krpc_req_cachep = kmem_cache_create("pcs_krpc_req", + sizeof(struct krpc_req), 0, + SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL); + + if (!krpc_req_cachep) + return -ENOMEM; + + return 0; +} + +void pcs_krpc_fini(void) +{ + kmem_cache_destroy(krpc_req_cachep); +} diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h new file mode 100644 index 000000000000..6c0ef20ebc99 --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_krpc.h @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved. + */ + +#ifndef _PCS_KRPC_H_ +#define _PCS_KRPC_H_ 1 +#include <linux/types.h> +#include <linux/bvec.h> + +#include "pcs_prot_types.h" +#include "pcs_perfcounters.h" +#include "pcs_rpc_clnt.h" +#include "pcs_krpc_prot.h" +#include "pcs_mr.h" + +struct krpc_chunk { + u64 addr; + u32 len; + u32 type; +#define KRPC_CHUNK_TYPE_MR 0 +#define KRPC_CHUNK_TYPE_UMEM 1 + union { + struct pcs_mr *mr; + struct pcs_umem *umem; + }; +}; + +#define PCS_KRPC_HASH_SIZE 1024 +struct pcs_krpc_set { + /* Set of krpcs */ + struct hlist_head ht[PCS_KRPC_HASH_SIZE]; + struct list_head list; + unsigned int nkrpc; + + spinlock_t lock; +}; + +enum { + PCS_KRPC_STATE_UNCONN, + PCS_KRPC_STATE_CONNECTED, + PCS_KRPC_STATE_ABORTED, + PCS_KRPC_STATE_DESTROYED, +}; + +struct pcs_krpc { + struct hlist_node hlist; + struct list_head link; + spinlock_t lock; + refcount_t refcnt; + + struct pcs_rpc *rpc; + + struct pcs_krpc_set *krpcs; + + PCS_NODE_ID_T id; + + u32 state; + u32 gen; + + struct list_head pending_queue; + struct list_head dispose_queue; + struct list_head completion_queue; + int nr_completion; + + /** Wait queue head for poll */ + wait_queue_head_t poll_wait; +}; + +struct pcs_krpc_context { + struct pcs_krpc *krpc; + u32 gen; +}; +/* + * Completion message to be received by userspace + */ +struct krpc_completion { + struct list_head link; /* in krpc->completion_queue */ + + u64 xid; + int result; + + void *private; + int data_len; + u8 _data_buf[0]; +}; + +#define KRPC_MAX_DATA_PAGES 256 +#define NR_KRPC_DATA_CHUNKS_INLINE 4 +struct krpc_req { + struct list_head link; + spinlock_t lock; + struct pcs_krpc *krpc; + +#define KRPC_REQ_F_ALIGNMENT PCS_KRPC_MSG_F_ALIGNMENT +#define KRPC_REQ_F_RESP_BUFF PCS_KRPC_MSG_F_RESP_BUFF /* data buff is for read response */ +#define KRPC_REQ_F_ABORTED 0x10000 + int flags; + + struct pcs_msg msg; + + char *hdr_buf; + struct kvec hdr_kv; + struct krpc_chunk hdr_chunk; + + int nr_data_chunks; + struct krpc_chunk *data_chunks; + struct krpc_chunk inline_data_chunks[NR_KRPC_DATA_CHUNKS_INLINE]; + + int data_len; + int nr_data_bvecs; + struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES]; + + struct krpc_completion completion; +}; + +static inline u32 pcs_krpc_msg_size(u32 size, u8 flags) +{ + if (flags & PCS_KRPC_MSG_F_ALIGNMENT) + size = ALIGN(size, PCS_KRPC_MSG_ALIGNMENT); + + return size; +} + +int pcs_krpc_init(void); +void pcs_krpc_fini(void); + +void pcs_krpcset_init(struct pcs_krpc_set *krpcs); +void pcs_krpcset_fini(struct pcs_krpc_set *krpcs); + +struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id); +int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int cs_flags); +int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int flags); +int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id); +int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id); + +void pcs_krpc_put(struct pcs_krpc *krpc); +struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc); + +struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h); +void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg); +void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg); +#endif diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h new file mode 100644 index 000000000000..4c5bbf4492d1 --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h @@ -0,0 +1,44 @@ +#ifndef _PCS_KRPC_PROT_H_ +#define _PCS_KRPC_PROT_H_ +#include <linux/ioctl.h> + +#include "pcs_prot_types.h" + +/* Device ioctls: */ +#define PCS_KRPC_IOC_MAGIC 255 + +/* ioctl cmds supported by the '[pcs_krpc]' anonymous inode */ +#define PCS_KRPC_IOC_SEND_MSG _IO(PCS_KRPC_IOC_MAGIC, 10) +#define PCS_KRPC_IOC_RECV_MSG _IO(PCS_KRPC_IOC_MAGIC, 11) +#define PCS_KRPC_IOC_ABORT _IO(PCS_KRPC_IOC_MAGIC, 12) + +struct pcs_krpc_buf_desc { + u64 addr; /* buf address in userspace. */ + u32 len; /* size of the buf */ + u32 mr_id; /* mr id */ +}; + +#define PCS_KRPC_MSG_ALIGNMENT (512ULL) + +#define PCS_KRPC_MSG_F_ALIGNMENT 0x01 +#define PCS_KRPC_MSG_F_RESP_BUFF 0x02 +/* + * To avoid copying, a msg is sent to kernel in an array of buffer descriptor. + * Each buffer descriptor points to a buf contains a chunk of the msg. And all + * chunk buffers are from registered MRs + */ +struct pcs_krpc_ioc_sendmsg { + u64 xid; /* context id */ + u32 msg_size; /* total size of the msg */ + u16 timeout; /* timeout */ + u8 flags; /* alignment, */ + u8 nr_data_chunks; /* total number of data chunks */ + struct pcs_krpc_buf_desc hdr_chunk; /* the buf holding msg header */ +}; + +struct pcs_krpc_ioc_recvmsg { + u64 xid; /* context id */ + struct pcs_krpc_buf_desc buf; /* for cs congestion notification and rpc keep waiting msg.*/ +}; + +#endif -- 2.39.3 (Apple Git-146) _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel