Currently there are 2 connections for every RPC, one in userspace, one in kernel. This wastes a lot of resources on client hosts in case of a huge cluster. It's therefore desirable to eliminate connections in userspace by using kernel RPC directly for userspace RPC.
This patch makes the pcs_fuse_kio module provide such functionality for userspace to use. Major changes are: - Introduce a new struct pcs_krpc, which links userspace RPC connection directly to corresponding kernel RPC. It also provides execution context for forwarding RPC messages from/to kernel RPC. - Added several new cmds to the FUSE_IOC_KIO_CALL ioctl for pcs_krpc management. - Kernel RPC ops is changed to demux msgs to pcs_cs or pcs_krpc https://pmc.acronis.work/browse/VSTOR-82613 Signed-off-by: Liu Kui <kui....@virtuozzo.com> --- fs/fuse/Makefile | 6 +- fs/fuse/dev.c | 19 +- fs/fuse/fuse_i.h | 2 +- fs/fuse/kio/pcs/pcs_cluster_core.c | 4 + fs/fuse/kio/pcs/pcs_cs.c | 120 +---- fs/fuse/kio/pcs/pcs_cs.h | 5 +- fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 96 +++- fs/fuse/kio/pcs/pcs_ioctl.h | 29 ++ fs/fuse/kio/pcs/pcs_krpc.c | 794 +++++++++++++++++++++++++++++ fs/fuse/kio/pcs/pcs_krpc.h | 140 +++++ fs/fuse/kio/pcs/pcs_krpc_prot.h | 44 ++ fs/fuse/kio/pcs/pcs_mr.c | 4 +- fs/fuse/kio/pcs/pcs_req.h | 30 +- fs/fuse/kio/pcs/pcs_rpc.c | 3 +- fs/fuse/kio/pcs/pcs_rpc.h | 4 +- fs/fuse/kio/pcs/pcs_rpc_clnt.c | 188 +++++++ fs/fuse/kio/pcs/pcs_rpc_clnt.h | 16 + 17 files changed, 1386 insertions(+), 118 deletions(-) create mode 100644 fs/fuse/kio/pcs/pcs_krpc.c create mode 100644 fs/fuse/kio/pcs/pcs_krpc.h create mode 100644 fs/fuse/kio/pcs/pcs_krpc_prot.h create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.c create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.h diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile index 740c805adc2a..18eaa35a234b 100644 --- a/fs/fuse/Makefile +++ b/fs/fuse/Makefile @@ -33,7 +33,11 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \ kio/pcs/pcs_rdma_rw.o \ kio/pcs/pcs_rdma_conn.o \ kio/pcs/pcs_net_addr.o \ - kio/pcs/pcs_cs_accel.o + kio/pcs/pcs_cs_accel.o \ + kio/pcs/pcs_rpc_clnt.o \ + kio/pcs/pcs_mr.o \ + kio/pcs/pcs_krpc.o + fuse_kio_pcs_trace-objs := kio/pcs/fuse_kio_pcs_trace.o virtiofs-y := virtio_fs.o diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index e00f5b180e95..bf54adf2bca9 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -2657,11 +2657,20 @@ static long fuse_dev_ioctl(struct file *file, unsigned int cmd, if (copy_from_user(&req, (void __user *)arg, sizeof(req))) return -EFAULT; - op = fuse_kio_get(NULL, req.name); - if (op == NULL) - return -EINVAL; - res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len); - fuse_kio_put(op); + + fud = fuse_get_dev(file); + if (fud) { + op = fud->fc->kio.op; + if (op == NULL) + return -EINVAL; + res = op->dev_ioctl(fud->fc, req.cmd, req.data, req.len); + } else { + op = fuse_kio_get(NULL, req.name); + if (op == NULL) + return -EINVAL; + res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len); + fuse_kio_put(op); + } break; } default: diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index 0c1bd5209dbc..090871a1e356 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -649,7 +649,7 @@ struct fuse_kio_ops { void (*inode_release)(struct fuse_inode *fi); void (*kill_requests)(struct fuse_conn *fc, struct inode *inode); int (*ioctl)(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len); - + int (*dev_ioctl)(struct fuse_conn *fc, unsigned int cmd, unsigned long arg, int len); }; int fuse_register_kio(struct fuse_kio_ops *ops); void fuse_unregister_kio(struct fuse_kio_ops *ops); diff --git a/fs/fuse/kio/pcs/pcs_cluster_core.c b/fs/fuse/kio/pcs/pcs_cluster_core.c index 8c34c60eb79a..6df2ccee9a1f 100644 --- a/fs/fuse/kio/pcs/pcs_cluster_core.c +++ b/fs/fuse/kio/pcs/pcs_cluster_core.c @@ -162,6 +162,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq, cc->nilbuffer_kv.iov_len = sizeof(cc->nilbuffer); pcs_csset_init(&cc->css); + pcs_mrset_init(&cc->mrs); + pcs_krpcset_init(&cc->krpcs); err = pcs_mapset_init(cc); if (err) @@ -210,6 +212,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq, void pcs_cc_fini(struct pcs_cluster_core *cc) { + pcs_krpcset_fini(&cc->krpcs); + pcs_mrset_fini(&cc->mrs); pcs_csset_fini(&cc->css); pcs_mapset_fini(&cc->maps); pcs_rpc_engine_fini(&cc->eng); diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c index 6cc97aa3f9d0..a48c3fbfa5b7 100644 --- a/fs/fuse/kio/pcs/pcs_cs.c +++ b/fs/fuse/kio/pcs/pcs_cs.c @@ -25,7 +25,6 @@ #include "log.h" #include "fuse_ktrace.h" #include "pcs_net_addr.h" - /* Lock order: cs->lock -> css->lock (lru, hash, bl_list) */ @@ -39,22 +38,9 @@ struct pcs_rpc_params cn_rpc_params = { .flags = 0, }; -static void cs_aborting(struct pcs_rpc *ep, int error); -static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h); -static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg); -static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg); -static void cs_connect(struct pcs_rpc *ep); static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose); static void pcs_cs_destroy(struct pcs_cs *cs); -struct pcs_rpc_ops cn_rpc_ops = { - .demux_request = cs_input, - .get_hdr = cs_get_hdr, - .state_change = cs_aborting, - .keep_waiting = cs_keep_waiting, - .connect = cs_connect, -}; - static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs) { seqlock_init(&cs->stat.seqlock); @@ -124,8 +110,7 @@ static void pcs_cs_percpu_stat_free(struct pcs_cs *cs) free_percpu(cs->stat.iolat); } -struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, - struct pcs_cluster_core *cc) +struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css) { struct pcs_cs *cs; @@ -153,13 +138,6 @@ struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, kfree(cs); return NULL; } - cs->rpc = pcs_rpc_create(&cc->eng, &cn_rpc_params, &cn_rpc_ops); - if (cs->rpc == NULL) { - pcs_cs_percpu_stat_free(cs); - kfree(cs); - return NULL; - } - cs->rpc->private = cs; INIT_LIST_HEAD(&cs->map_list); return cs; } @@ -222,6 +200,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P /* If rpc is connected, leave it connected until failure. * After current connect fails, reconnect will be done to new address */ + struct pcs_rpc *rpc = cs->rpc; if (addr) { if (addr->type != PCS_ADDRTYPE_NONE) { if (pcs_netaddr_cmp(&cs->addr, addr)) { @@ -231,7 +210,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P FUSE_KTRACE(cc_from_csset(csset)->fc, "Port change CS" NODE_FMT " seq=%d", NODE_ARGS(*id), cs->addr_serno); - pcs_rpc_set_address(cs->rpc, addr); + pcs_rpc_set_address(rpc, addr); if (!(flags & CS_FL_INACTIVE)) { pcs_map_notify_addr_change(cs); @@ -246,30 +225,31 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P } } if (flags & CS_FL_LOCAL_SOCK) - cs->rpc->flags |= PCS_RPC_F_LOCAL; + rpc->flags |= PCS_RPC_F_LOCAL; else - cs->rpc->flags &= ~PCS_RPC_F_LOCAL; + rpc->flags &= ~PCS_RPC_F_LOCAL; return cs; } BUG_ON(addr == NULL); - cs = pcs_cs_alloc(csset, cc_from_csset(csset)); + cs = pcs_cs_alloc(csset); if (!cs) return NULL; + cs->rpc = pcs_rpc_clnt_create(&cc_from_csset(csset)->eng, id, addr, flags); + + if (!cs->rpc) { + pcs_cs_percpu_stat_free(cs); + kfree(cs); + return NULL; + } + cs->rpc->clnt_cs = cs; + cs->id = *id; cs->addr = *addr; cs->addr_serno = 1; - pcs_rpc_set_peer_id(cs->rpc, id, PCS_NODE_ROLE_CS); - pcs_rpc_set_address(cs->rpc, addr); - - if (flags & CS_FL_LOCAL_SOCK) - cs->rpc->flags |= PCS_RPC_F_LOCAL; - else - cs->rpc->flags &= ~PCS_RPC_F_LOCAL; - spin_lock(&cs->lock); spin_lock(&csset->lock); if (__lookup_cs(csset, id)) { @@ -455,46 +435,7 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io } } -static void cs_connect(struct pcs_rpc *ep) -{ - void (*connect_start)(struct pcs_rpc *); - - if (ep->flags & PCS_RPC_F_LOCAL) { - char path[128]; - - snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT, - (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id)); - - if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) { - TRACE("Path to local socket is too long: %s", path); - - ep->flags &= ~PCS_RPC_F_LOCAL; - goto fail; - } - memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un)); - ep->sh.sun.sun_family = AF_UNIX; - ep->sh.sa_len = sizeof(struct sockaddr_un); - strcpy(ep->sh.sun.sun_path, path); - connect_start = pcs_sockconnect_start; - } else { - /* TODO: print sock addr using pcs_format_netaddr() */ - if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) { - TRACE("netaddr to sockaddr failed"); - goto fail; - } - connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ? - pcs_rdmaconnect_start : pcs_sockconnect_start; - } - ep->state = PCS_RPC_CONNECT; - connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */ - return; -fail: - pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR); - pcs_rpc_reset(ep); - return; -} - -static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) +struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) { struct pcs_msg *msg, *resp; struct pcs_rpc_hdr *req_h; @@ -887,7 +828,7 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq) do_cs_submit(cs, ireq); } -static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h) +void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h) { struct pcs_cs *who; @@ -945,10 +886,10 @@ static int may_reroute(struct pcs_cs_list *csl, PCS_NODE_ID_T cs_id) return legit; } -static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) +void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) { struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg_inline_head(msg); - struct pcs_cs *cs = ep->private; + struct pcs_cs *cs = ep->clnt_cs; struct pcs_cs *who; /* Some CS reported it cannot complete local IO in time, close congestion window */ @@ -1003,21 +944,6 @@ static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_ } -static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg) -{ - struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer; - - switch (h->type) { - case PCS_CS_CONG_NOTIFY: - handle_congestion(ep->private, h); - msg->done(msg); - return 0; - default: - FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type); - return PCS_ERR_PROTOCOL; - } -} - void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err) { struct list_head queue; @@ -1094,17 +1020,13 @@ static void pcs_cs_destroy(struct pcs_cs *cs) BUG_ON(cs->csa_ctx); if (cs->rpc) { - pcs_rpc_close(cs->rpc); + cs->rpc->clnt_cs = NULL; + pcs_rpc_clnt_close(cs->rpc); cs->rpc = NULL; } call_rcu(&cs->rcu, cs_destroy_rcu); } -void cs_aborting(struct pcs_rpc *ep, int error) -{ - pcs_rpc_reset(ep); -} - /* Latency is difficult value to use for any decisions. * It is sampled at random, we do not know what is happening while * we have no samples. For now we do the following: arriving samples diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h index 61be99a54157..62b88f612b54 100644 --- a/fs/fuse/kio/pcs/pcs_cs.h +++ b/fs/fuse/kio/pcs/pcs_cs.h @@ -159,8 +159,6 @@ unsigned int cs_get_avg_in_flight(struct pcs_cs *cs); void pcs_csset_init(struct pcs_cs_set *css); void pcs_csset_fini(struct pcs_cs_set *css); -struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, struct pcs_cluster_core *cc); - void cs_log_io_times(struct pcs_int_request *ireq, struct pcs_msg *resp, unsigned int max_iolat); int pcs_cs_format_io_times(char *buf, int buflen, struct pcs_int_request *ireq, struct pcs_msg *resp); void cs_set_io_times_logger(void (*logger)(struct pcs_int_request *ireq, struct pcs_msg *resp, u32 max_iolat, void *ctx), void *ctx); @@ -219,4 +217,7 @@ int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx); void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id); void pcs_csa_cs_detach(struct pcs_cs * cs); +void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h); +struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h); +void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg); #endif /* _PCS_CS_H_ */ diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c index 566dcb5f2f4c..a71cb7a9a0b6 100644 --- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c +++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c @@ -37,6 +37,8 @@ #include "fuse_ktrace.h" #include "fuse_prometheus.h" #include "pcs_net_addr.h" +#include "pcs_mr.h" +#include "pcs_krpc.h" unsigned int pcs_loglevel = LOG_TRACE; module_param(pcs_loglevel, uint, 0644); @@ -71,6 +73,10 @@ unsigned int rdmaio_queue_depth = 8; module_param(rdmaio_queue_depth, uint, 0644); MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth"); +bool pcs_krpc_support = true; +module_param(pcs_krpc_support, bool, 0444); +MODULE_PARM_DESC(pcs_krpc_support, "krpc support"); + void (*fuse_printk_plugin)(unsigned long, const char *, ...); EXPORT_SYMBOL(fuse_printk_plugin); @@ -189,7 +195,7 @@ static void process_pcs_init_reply(struct fuse_mount *fm, struct fuse_args *args fuse_ktrace_setup(fc); fc->ktrace_level = LOG_TRACE; - printk("FUSE: kio_pcs: cl: " CLUSTER_ID_FMT ", clientid: " NODE_FMT "\n", + printk("FUSE: kio_pcs: cl: " CLUSTER_ID_FMT ", clientid: " NODE_FMT, CLUSTER_ID_ARGS(info->cluster_id), NODE_ARGS(info->node_id)); spin_lock(&fc->lock); @@ -1864,6 +1870,84 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd, return res; } +static int kpcs_dev_ioctl(struct fuse_conn *fc, unsigned int cmd, unsigned long arg, int len) +{ + struct pcs_fuse_cluster *pfc = fc->kio.ctx; + struct pcs_cluster_core *cc = &pfc->cc; + int res; + + switch (cmd) { + case PCS_IOC_KRPC_CREATE: + { + struct pcs_ioc_krpc_create req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + if (pcs_krpc_lookup(&cc->krpcs, &req.id)) + return -EEXIST; + + res = pcs_krpc_create(&cc->krpcs, &req.id, &req.addr, req.cs_flags); + break; + } + case PCS_IOC_KRPC_UPDATE_ADDR: + { + struct pcs_ioc_krpc_create req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_krpc_update_addr(&cc->krpcs, &req.id, &req.addr, req.cs_flags); + break; + } + + case PCS_IOC_KRPC_CONNECT: + { + struct pcs_ioc_krpc_connect req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_krpc_connect(&cc->krpcs, &req.id); + break; + } + case PCS_IOC_KRPC_DESTROY: + { + struct pcs_ioc_krpc_destroy req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_krpc_destroy(&cc->krpcs, &req.id); + break; + } + case PCS_IOC_REG_MR: + { + struct pcs_ioc_reg_mr req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_reg_mr(&cc->mrs, req.start, req.len); + break; + } + case PCS_IOC_DEREG_MR: + { + struct pcs_ioc_dereg_mr req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_dereg_mr(&cc->mrs, req.id); + break; + } + default: + res = -ENOIOCTLCMD; + break; + } + return res; +} + static struct fuse_kio_ops kio_pcs_ops = { .name = "pcs", .owner = THIS_MODULE, @@ -1880,6 +1964,7 @@ static struct fuse_kio_ops kio_pcs_ops = { .inode_release = kpcs_inode_release, .kill_requests = kpcs_kill_requests, .ioctl = kpcs_ioctl, + .dev_ioctl = kpcs_dev_ioctl, }; @@ -1920,10 +2005,13 @@ static int __init kpcs_mod_init(void) if (pcs_csa_init()) goto free_cleanup_wq; + if (pcs_krpc_init()) + goto free_csa; + fast_path_version = PCS_FAST_PATH_VERSION.full; if (fuse_register_kio(&kio_pcs_ops)) - goto free_csa; + goto free_krpc; /* Clone relay_file_operations to set ownership */ ktrace_file_operations = relay_file_operations; @@ -1935,10 +2023,13 @@ static int __init kpcs_mod_init(void) if (IS_ERR(crc_tfm)) crc_tfm = NULL; + printk("%s fuse_c:%p ireq_c:%p pcs_wq:%p\n", __FUNCTION__, pcs_fuse_req_cachep, pcs_ireq_cachep, pcs_wq); return 0; +free_krpc: + pcs_krpc_fini(); free_csa: pcs_csa_fini(); free_cleanup_wq: @@ -1968,6 +2059,7 @@ static void __exit kpcs_mod_exit(void) kmem_cache_destroy(pcs_ireq_cachep); kmem_cache_destroy(pcs_fuse_req_cachep); pcs_csa_fini(); + pcs_krpc_fini(); } module_init(kpcs_mod_init); diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h index 8e55be02c654..3c000373745b 100644 --- a/fs/fuse/kio/pcs/pcs_ioctl.h +++ b/fs/fuse/kio/pcs/pcs_ioctl.h @@ -123,4 +123,33 @@ struct pcs_csa_setmap #define PCS_CSA_IOC_SETMAP _IOR('V',38, struct pcs_csa_setmap) #define PCS_KIO_CALL_REG _IOR('V',39, struct fuse_pcs_ioc_register) +struct pcs_ioc_reg_mr { + u64 start; + u64 len; +}; +#define PCS_IOC_REG_MR _IOR('V', 40, struct pcs_ioc_reg_mr) + +struct pcs_ioc_dereg_mr { + u32 id; +}; +#define PCS_IOC_DEREG_MR _IOR('V', 41, struct pcs_ioc_dereg_mr) + +struct pcs_ioc_krpc_create { + PCS_NODE_ID_T id; + PCS_NET_ADDR_T addr; + int cs_flags; +}; +#define PCS_IOC_KRPC_CREATE _IOR('V', 42, struct pcs_ioc_krpc_create) +#define PCS_IOC_KRPC_UPDATE_ADDR _IOR('V', 43, struct pcs_ioc_krpc_create) + +struct pcs_ioc_krpc_connect { + PCS_NODE_ID_T id; +}; +#define PCS_IOC_KRPC_CONNECT _IOR('V', 44, struct pcs_ioc_krpc_connect) + +struct pcs_ioc_krpc_destroy { + PCS_NODE_ID_T id; +}; +#define PCS_IOC_KRPC_DESTROY _IOR('V', 45, struct pcs_ioc_krpc_destroy) + #endif /* _PCS_IOCTL_H_ */ diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c new file mode 100644 index 000000000000..0e49dc227dca --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_krpc.c @@ -0,0 +1,794 @@ +/* + * fs/fuse/kio/pcs/pcs_krpc.c + * + * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved. + * + */ + +#include <linux/types.h> +#include <linux/list.h> +#include <linux/rbtree.h> +#include <linux/refcount.h> +#include <linux/file.h> +#include <linux/anon_inodes.h> + +#include "pcs_types.h" +#include "pcs_cluster.h" +#include "pcs_cs_prot.h" +#include "pcs_req.h" +#include "pcs_krpc.h" + +struct kmem_cache *krpc_req_cachep; + +static int krpc_completion_post(struct pcs_krpc *krpc, struct krpc_completion *comp) +__releases(krpc->lock) +{ + /* post complete only when connected */ + if (krpc->state != PCS_KRPC_STATE_CONNECTED) { + spin_unlock(&krpc->lock); + return -1; + } + list_add_tail(&comp->link, &krpc->completion_queue); + krpc->nr_completion++; + spin_unlock(&krpc->lock); + + wake_up_poll(&krpc->poll_wait, EPOLLIN); + return 0; +} + +static struct krpc_req *krpc_req_alloc(void) +{ + struct krpc_req *kreq; + + kreq = kmem_cache_alloc(krpc_req_cachep, GFP_NOIO); + if (!kreq) + return NULL; + + spin_lock_init(&kreq->lock); + kreq->flags = 0; + memset(&kreq->completion, 0, sizeof(struct krpc_completion)); + + return kreq; +} + +static void krpc_req_free(struct krpc_req *kreq) +{ + kmem_cache_free(krpc_req_cachep, kreq); +} + +static void krpc_completion_free(struct krpc_completion *comp) +{ + if (comp->private) + krpc_req_free((struct krpc_req *)comp->private); + else + kfree(comp); +} + +static void krpc_req_complete(struct krpc_req *kreq, int error) +{ + struct krpc_completion *comp = &kreq->completion; + struct pcs_krpc *krpc = kreq->krpc; + int i; + + BUG_ON(!comp->xid); + + comp->result = error; + + /* kunmap(kreq->hdr_buf); */ + pcs_mr_put(kreq->hdr_chunk.mr); + + for (i = 0; i < kreq->nr_data_chunks; i++) { + struct krpc_chunk *chunk; + + chunk = &kreq->data_chunks[i]; + if (chunk->type == KRPC_CHUNK_TYPE_UMEM) + pcs_umem_release(chunk->umem); + else + pcs_mr_put(chunk->mr); + } + + if (kreq->data_chunks != &kreq->inline_data_chunks[0]) + kfree(kreq->data_chunks); + + spin_lock(&krpc->lock); + list_del(&kreq->link); + + if (kreq->flags & KRPC_REQ_F_ABORTED) { + krpc_req_free(kreq); + spin_unlock(&krpc->lock); + } else if (krpc_completion_post(krpc, comp)) + krpc_req_free(kreq); + + pcs_krpc_put(krpc); +} + +static void krpc_msg_get_response_iter(struct pcs_msg *msg, int offset, + struct iov_iter *it, unsigned int direction) +{ + struct pcs_msg *req = msg->private; + struct krpc_req *kreq = req->private2; + + /* No data payload*/ + if (!(kreq->flags & KRPC_REQ_F_RESP_BUFF)) { + /* No data payload */ + BUG_ON(msg->size > PAGE_SIZE); + + kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf; + kreq->hdr_kv.iov_len = msg->size; + + iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, msg->size); + iov_iter_advance(it, offset); + } else { + /* With Data payload */ + int hdr_size = sizeof(struct pcs_cs_iohdr); + + if (offset < hdr_size) { + kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf; + kreq->hdr_kv.iov_len = hdr_size; + iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, hdr_size); + iov_iter_advance(it, offset); + } else { + //offset -= (unsigned int) sizeof(struct pcs_cs_iohdr); + BUG_ON((offset - hdr_size) > kreq->data_len); + BUG_ON(msg->size - hdr_size > kreq->data_len); + + iov_iter_bvec(it, direction, &kreq->data_bvecs[0], + kreq->nr_data_bvecs, kreq->data_len); + iov_iter_truncate(it, msg->size - hdr_size); + iov_iter_advance(it, offset - hdr_size); + } + } +} + +struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) +{ + struct pcs_msg *msg, *resp; + struct pcs_rpc_hdr *req_h; + struct krpc_req *kreq; + + if (!RPC_IS_RESPONSE(h->type)) + return NULL; + + msg = pcs_rpc_lookup_xid(ep, &h->xid); + if (msg == NULL) + return NULL; + + req_h = (struct pcs_rpc_hdr *)msg_inline_head(msg); + if (req_h->type != (h->type & ~PCS_RPC_DIRECTION)) + return NULL; + + kreq = msg->private2; + + resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr)); + if (!resp) + return NULL; + + memcpy(resp->_inline_buffer, h, sizeof(struct pcs_rpc_hdr)); + memcpy(kreq->hdr_buf, h, sizeof(struct pcs_rpc_hdr)); + resp->size = h->len; + resp->private = msg; + resp->get_iter = krpc_msg_get_response_iter; + resp->done = rpc_work_input; + pcs_msg_del_calendar(msg); + + return resp; +} + +/* + * All request type message from cs are forwarded to userspace directly. The PCS KRPC + * layer cannot process these request on its own. + */ +static void krpc_handle_request(struct pcs_krpc *krpc, struct pcs_msg *msg) +{ + struct krpc_completion *comp; + + if (krpc->state != PCS_KRPC_STATE_CONNECTED) + return; + + comp = kzalloc(sizeof(*comp) + msg->size, GFP_NOIO); + if (!comp) { + /* Error ?*/ + return; + } + + memcpy(comp->_data_buf, msg->_inline_buffer, msg->size); + comp->data_len = msg->size; + + spin_lock(&krpc->lock); + if (krpc_completion_post(krpc, comp)) + kfree(comp); +} + +/* + * Let userspace to handle cs_congestion_notify request. + */ +void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg) +{ + krpc_handle_request(ep->clnt_krpc, msg); +} + +/* + * Let userspace to handle the keep_waiting request. + */ +void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) +{ + krpc_handle_request(ep->clnt_krpc, msg); +} + +unsigned int pcs_krpc_hash(PCS_NODE_ID_T *id) +{ + return *(unsigned int *)id % PCS_KRPC_HASH_SIZE; +} + +static void krpc_msg_get_data(struct pcs_msg *msg, int offset, + struct iov_iter *it, unsigned int direction) +{ + struct krpc_req *kreq = msg->private2; + int hdr_size = pcs_krpc_msg_size(kreq->hdr_chunk.len, kreq->flags); + int data_size = pcs_krpc_msg_size(kreq->data_len, kreq->flags); + struct pcs_krpc *krpc = kreq->krpc; + + if (offset < kreq->hdr_chunk.len) { + iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, kreq->hdr_chunk.len); + iov_iter_advance(it, offset); + return; + } + + if (offset < hdr_size) { + iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv, + 1, hdr_size - offset); + return; + } + + if (offset < hdr_size + kreq->data_len) { + iov_iter_bvec(it, direction, &kreq->data_bvecs[0], + kreq->nr_data_bvecs, kreq->data_len); + iov_iter_advance(it, offset - hdr_size); + return; + } + + BUG_ON(offset >= hdr_size + data_size); + + iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv, + 1, (hdr_size + data_size - offset)); +} + +static void pcs_krpc_response_done(struct pcs_msg *msg) +{ + struct krpc_req *kreq = msg->private2; + + if (msg->rpc) { + pcs_rpc_put(msg->rpc); + msg->rpc = NULL; + } + + krpc_req_complete(kreq, msg->error.value); +} + +static void pcs_krpc_msg_sent(struct pcs_msg *msg) +{ + msg->done = pcs_krpc_response_done; + if (pcs_if_error(&msg->error)) { + msg->done(msg); + return; + } + pcs_rpc_sent(msg); +} + +static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_recvmsg *iocmsg) +{ + struct krpc_completion *comp; + int res = 0; + + spin_lock(&krpc->lock); + if (list_empty(&krpc->completion_queue)) { + spin_unlock(&krpc->lock); + return 0; + } + + comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link); + list_del(&comp->link); + krpc->nr_completion--; + spin_unlock(&krpc->lock); + + if (comp->result) { + res -= comp->result; + goto out; + } + + res = 1; + iocmsg->xid = comp->xid; + if (comp->xid == 0) { + BUG_ON(!comp->data_len); + BUG_ON(iocmsg->buf.len < comp->data_len); + if (copy_to_user((void __user *)iocmsg->buf.addr, comp->_data_buf, comp->data_len)) + res = -EFAULT; + } + +out: + krpc_completion_free(comp); + return res; +} + +static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg) +{ + struct krpc_req *kreq; + struct pcs_msg *msg; + struct pcs_krpc_buf_desc *chunk_bd; + struct krpc_chunk *chunk; + int res, i; + struct bio_vec *bvec; + + kreq = krpc_req_alloc(); + if (!kreq) + return -ENOMEM; + + if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) { + kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO); + if (!kreq->data_chunks) { + res = -ENOMEM; + goto err_free_kreq; + } + } else + kreq->data_chunks = &kreq->inline_data_chunks[0]; + + /* + * Header buff is exactly one page, used for staging message header. Will be used for + * receiving header of response message as well. + */ + BUG_ON(iocmsg->hdr_chunk.addr & (PAGE_SIZE - 1)); + + chunk_bd = &iocmsg->hdr_chunk; + chunk = &kreq->hdr_chunk; + + chunk->addr = chunk_bd->addr; + chunk->len = chunk_bd->len; + chunk->type = KRPC_CHUNK_TYPE_MR; + + chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, iocmsg->hdr_chunk.mr_id); + BUG_ON(!chunk->mr); + + kreq->hdr_buf = (char *) kmap(pcs_umem_page(chunk->mr->umem, chunk->addr)); + kreq->hdr_kv.iov_base = kreq->hdr_buf; + kreq->hdr_kv.iov_len = chunk->len; + + /* data chunk buf descriptors are placed at end of header buf, grow backwards */ + kreq->data_len = 0; + kreq->nr_data_chunks = 0; + kreq->nr_data_bvecs = 0; + + chunk_bd = (struct pcs_krpc_buf_desc *)(kreq->hdr_buf + PAGE_SIZE) - 1; + chunk = kreq->data_chunks; + bvec = &kreq->data_bvecs[0]; + + for (i = 0; i < iocmsg->nr_data_chunks; i++) { + struct page *page; + u64 addr; + int offset, len; + + chunk->addr = chunk_bd->addr; + chunk->len = chunk_bd->len; + if (chunk_bd->mr_id) { + chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id); + chunk->type = KRPC_CHUNK_TYPE_MR; + if (!chunk->mr) { + res = -ENXIO; + goto err_free_data_chunk; + } + } else { + /* unregistered memory buf */ + chunk->umem = pcs_umem_get(chunk->addr, chunk->len); + if (IS_ERR(chunk->umem)) { + res = PTR_ERR(chunk->umem); + goto err_free_data_chunk; + } + + chunk->type = KRPC_CHUNK_TYPE_UMEM; + } + + /* build the bvecs for the data chunk*/ + addr = chunk->addr; + len = chunk->len; + + while (len) { + /* data bvec array overflow? */ + BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES); + + if (chunk->type == KRPC_CHUNK_TYPE_MR) + page = pcs_umem_page(chunk->mr->umem, addr); + else + page = pcs_umem_page(chunk->umem, addr); + + BUG_ON(!page); + + offset = addr & (PAGE_SIZE - 1); + + bvec->bv_page = page; + bvec->bv_offset = offset; + + bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset); + + addr += bvec->bv_len; + len -= bvec->bv_len; + bvec++; + kreq->nr_data_bvecs++; + } + BUG_ON(len != 0); + + kreq->data_len += chunk->len; + chunk++; + chunk_bd--; + kreq->nr_data_chunks++; + } + + kreq->completion.xid = iocmsg->xid; + kreq->completion.private = kreq; + + kreq->flags = iocmsg->flags; + + msg = &kreq->msg; + msg->private = krpc; + msg->private2 = kreq; + + INIT_HLIST_NODE(&msg->kill_link); + pcs_clear_error(&msg->error); + + msg->size = iocmsg->msg_size; + msg->timeout = iocmsg->timeout; + + msg->rpc = NULL; + msg->done = pcs_krpc_msg_sent; + msg->get_iter = krpc_msg_get_data; + + spin_lock(&krpc->lock); + kreq->krpc = pcs_krpc_get(krpc); + list_add_tail(&kreq->link, &krpc->pending_queue); + spin_unlock(&krpc->lock); + /* DTRACE to be added */ + pcs_rpc_queue(krpc->rpc, msg); + + return 0; + +err_free_data_chunk: + for (i = 0; i < kreq->nr_data_chunks; i++) { + chunk = &kreq->data_chunks[i]; + if (chunk->type == KRPC_CHUNK_TYPE_UMEM) + pcs_umem_release(chunk->umem); + else + pcs_mr_put(chunk->mr); + } + pcs_mr_put(kreq->hdr_chunk.mr); + +err_free_kreq: + if (kreq->data_chunks != &kreq->inline_data_chunks[0]) + kfree(kreq->data_chunks); + krpc_req_free(kreq); + return res; +} + +static int pcs_krpc_abort(struct pcs_krpc *krpc) +{ + struct list_head dispose_list; + struct krpc_req *kreq; + struct krpc_completion *comp; + + INIT_LIST_HEAD(&dispose_list); + + spin_lock(&krpc->lock); + + if (krpc->state != PCS_KRPC_STATE_CONNECTED) { + spin_unlock(&krpc->lock); + return 0; + } + + krpc->state = PCS_KRPC_STATE_ABORTED; + + /* abort incompleted requests */ + list_splice_tail_init(&krpc->pending_queue, &dispose_list); + while (!list_empty(&dispose_list)) { + kreq = list_first_entry(&dispose_list, struct krpc_req, link); + kreq->flags |= KRPC_REQ_F_ABORTED; + } + + list_splice_tail_init(&krpc->completion_queue, &dispose_list); + krpc->nr_completion = 0; + + /* dispose all unprocessed completions */ + while (!list_empty(&dispose_list)) { + comp = list_first_entry(&dispose_list, struct krpc_completion, link); + list_del(&comp->link); + krpc_completion_free(comp); + } + + spin_unlock(&krpc->lock); + + return 0; +} + +static long pcs_krpc_ioctl(struct file *file, unsigned int cmd, unsigned long arg) +{ + struct pcs_krpc *krpc = file->private_data; + int res = 0; + + switch (cmd) { + case PCS_KRPC_IOC_SEND_MSG: { + struct pcs_krpc_ioc_sendmsg req; + + if (copy_from_user(&req, (void __user *)arg, sizeof(req))) + return -EFAULT; + + res = pcs_krpc_ioctl_send_msg(krpc, &req); + break; + } + case PCS_KRPC_IOC_RECV_MSG: { + struct pcs_krpc_ioc_recvmsg req; + + res = pcs_krpc_ioctl_recv_msg(krpc, &req); + + if (copy_to_user((void __user *)arg, &req, sizeof(req))) + return -EFAULT; + break; + } + case PCS_KRPC_IOC_ABORT: + res = pcs_krpc_abort(krpc); + break; + default: + res = -ENOTTY; + break; + } + + return res; +} + +static __poll_t pcs_krpc_poll(struct file *file, poll_table *wait) +{ + struct pcs_krpc *krpc = file->private_data; + __poll_t pollflags = 0; + + poll_wait(file, &krpc->poll_wait, wait); + + spin_lock(&krpc->lock); + /* EPOLLERR? */ + if (krpc->state == PCS_KRPC_STATE_ABORTED) + pollflags |= EPOLLERR; + else if (krpc->state == PCS_KRPC_STATE_CONNECTED) { + pollflags |= EPOLLOUT; + if (!list_empty(&krpc->completion_queue)) + pollflags |= EPOLLIN; + } + + spin_unlock(&krpc->lock); + + return pollflags; +} + +/* + * Reset to initial state -- PCS_KRPC_STATE_UNCONN + */ +static int pcs_krpc_release(struct inode *inode, struct file *file) +{ + struct pcs_krpc *krpc = file->private_data; + + /* Just abort all pending requests and disconnect from the krpc */ + pcs_krpc_abort(krpc); + + spin_lock(&krpc->lock); + if (krpc->state == PCS_KRPC_STATE_ABORTED) + krpc->state = PCS_KRPC_STATE_UNCONN; + spin_unlock(&krpc->lock); + + return 0; +} + +static const struct file_operations pcs_krpc_fops = { + .owner = THIS_MODULE, + .release = pcs_krpc_release, + .poll = pcs_krpc_poll, + .unlocked_ioctl = pcs_krpc_ioctl, + .llseek = no_llseek, +}; + +struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) +{ + struct pcs_krpc *krpc; + + hlist_for_each_entry(krpc, &krpcs->ht[pcs_krpc_hash(id)], hlist) { + if (memcmp(&krpc->id, id, sizeof(krpc->id)) == 0) + return krpc; + } + return NULL; +} + +struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc) +{ + refcount_inc(&krpc->refcnt); + return krpc; +} + +void pcs_krpc_put(struct pcs_krpc *krpc) +{ + if (refcount_dec_and_test(&krpc->refcnt)) + kfree(krpc); +} + +/* + * Create a new pcs_krpc + */ +int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int cs_flags) +{ + struct pcs_krpc *krpc; + + krpc = kzalloc(sizeof(struct pcs_krpc), GFP_NOIO); + if (!krpc) + return -ENOMEM; + + INIT_HLIST_NODE(&krpc->hlist); + INIT_LIST_HEAD(&krpc->link); + spin_lock_init(&krpc->lock); + refcount_set(&krpc->refcnt, 1); + INIT_LIST_HEAD(&krpc->pending_queue); + INIT_LIST_HEAD(&krpc->completion_queue); + init_waitqueue_head(&krpc->poll_wait); + + krpc->krpcs = krpcs; + krpc->id = *id; + krpc->state = PCS_KRPC_STATE_UNCONN; + + krpc->rpc = pcs_rpc_clnt_create(&cc_from_krpcset(krpcs)->eng, id, addr, cs_flags); + if (!krpc->rpc) { + kfree(krpc); + return -ENOMEM; + } + + krpc->rpc->clnt_krpc = krpc; + + spin_lock(&krpc->lock); + spin_lock(&krpcs->lock); + hlist_add_head(&krpc->hlist, &krpcs->ht[pcs_krpc_hash(&krpc->id)]); + list_add_tail(&krpc->link, &krpcs->list); + krpcs->nkrpc++; + spin_unlock(&krpcs->lock); + spin_unlock(&krpc->lock); + + return 0; +} + +int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int flags) +{ + struct pcs_krpc *krpc; + + krpc = pcs_krpc_lookup(krpcs, id); + if (!krpc) + return -ENXIO; + + pcs_rpc_set_address(krpc->rpc, addr); + + if (flags & CS_FL_LOCAL_SOCK) + krpc->rpc->flags |= PCS_RPC_F_LOCAL; + else + krpc->rpc->flags &= ~PCS_RPC_F_LOCAL; + + return 0; +} +/* + * Connect to a pcs_krpc, return a valid fd on success. + */ +int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) +{ + struct pcs_krpc *krpc; + int fd; + struct file *file; + + krpc = pcs_krpc_lookup(krpcs, id); + if (!krpc) + return -ENXIO; + + WARN_ON(krpc->state != PCS_KRPC_STATE_UNCONN); + if (krpc->state != PCS_KRPC_STATE_UNCONN) + return -EPERM; + + fd = get_unused_fd_flags(O_CLOEXEC); + if (fd < 0) + return fd; + + file = anon_inode_getfile("[pcs_krpc]", &pcs_krpc_fops, krpc, 0); + if (IS_ERR(file)) { + put_unused_fd(fd); + fd = PTR_ERR(file); + return fd; + } + + fd_install(fd, file); + + /* + * the krpc should always be connected regardless state of + * underlying RPC + */ + krpc->state = PCS_KRPC_STATE_CONNECTED; + return fd; +} + +static void __pcs_krpc_destroy(struct pcs_krpc *krpc) +{ + spin_lock(&krpc->lock); + + krpc->state = PCS_KRPC_STATE_DESTROYED; + + /*Remove from krpc set*/ + spin_lock(&krpc->krpcs->lock); + hlist_del(&krpc->hlist); + list_del(&krpc->link); + krpc->krpcs->nkrpc--; + spin_unlock(&krpc->krpcs->lock); + + spin_unlock(&krpc->lock); + + if (krpc->rpc) { + krpc->rpc->clnt_krpc = NULL; + pcs_rpc_clnt_close(krpc->rpc); + krpc->rpc = NULL; + } + pcs_krpc_put(krpc); +} + +int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) +{ + struct pcs_krpc *krpc; + + krpc = pcs_krpc_lookup(krpcs, id); + BUG_ON(!krpc); + BUG_ON(krpc->state != PCS_KRPC_STATE_UNCONN); + + __pcs_krpc_destroy(krpc); + return 0; +} + +void pcs_krpcset_init(struct pcs_krpc_set *krpcs) +{ + unsigned int i; + + for (i = 0; i < PCS_KRPC_HASH_SIZE; i++) + INIT_HLIST_HEAD(&krpcs->ht[i]); + + INIT_LIST_HEAD(&krpcs->list); + krpcs->nkrpc = 0; + + spin_lock_init(&krpcs->lock); +} + +void pcs_krpcset_fini(struct pcs_krpc_set *krpcs) +{ + spin_lock(&krpcs->lock); + while (!list_empty(&krpcs->list)) { + struct pcs_krpc *krpc; + + krpc = list_first_entry(&krpcs->list, struct pcs_krpc, link); + spin_unlock(&krpcs->lock); + pcs_krpc_abort(krpc); + __pcs_krpc_destroy(krpc); + spin_lock(&krpcs->lock); + } + spin_unlock(&krpcs->lock); + + BUG_ON(!list_empty(&krpcs->list)); + BUG_ON(krpcs->nkrpc != 0); +} + +int __init pcs_krpc_init(void) +{ + krpc_req_cachep = kmem_cache_create("pcs_krpc_req", + sizeof(struct krpc_req), 0, + SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL); + + if (!krpc_req_cachep) + return -ENOMEM; + + return 0; +} + +void pcs_krpc_fini(void) +{ + kmem_cache_destroy(krpc_req_cachep); +} diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h new file mode 100644 index 000000000000..6a257ba80208 --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_krpc.h @@ -0,0 +1,140 @@ +/* + * fs/fuse/kio/pcs/pcs_krpc.h + * + * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved. + * + */ +#ifndef _PCS_KRPC_H_ +#define _PCS_KRPC_H_ 1 +#include <linux/types.h> +#include <linux/bvec.h> + +#include "pcs_prot_types.h" +#include "pcs_perfcounters.h" +#include "pcs_rpc_clnt.h" +#include "pcs_krpc_prot.h" +#include "pcs_mr.h" + +struct krpc_chunk { + u64 addr; + u32 len; + u32 type; +#define KRPC_CHUNK_TYPE_MR 0 +#define KRPC_CHUNK_TYPE_UMEM 1 + union { + struct pcs_mr *mr; + struct pcs_umem *umem; + }; +}; + +#define PCS_KRPC_HASH_SIZE 1024 +struct pcs_krpc_set { + /* Set of krpcs */ + struct hlist_head ht[PCS_KRPC_HASH_SIZE]; + struct list_head list; + unsigned int nkrpc; + + spinlock_t lock; +}; + +enum { + PCS_KRPC_STATE_UNCONN, + PCS_KRPC_STATE_CONNECTED, + PCS_KRPC_STATE_ABORTED, + PCS_KRPC_STATE_DESTROYED, +}; + +struct pcs_krpc { + struct hlist_node hlist; + struct list_head link; + spinlock_t lock; + refcount_t refcnt; + + struct pcs_rpc *rpc; + + struct pcs_krpc_set *krpcs; + + PCS_NODE_ID_T id; + + int state; + + struct list_head pending_queue; + struct list_head completion_queue; + int nr_completion; + + /** Wait queue head for poll */ + wait_queue_head_t poll_wait; +}; + +/* + * Completion message to be received by userspace + */ +struct krpc_completion { + struct list_head link; /* in krpc->completion_queue */ + + u64 xid; + int result; + + void *private; + int data_len; + u8 _data_buf[0]; +}; + +#define KRPC_MAX_DATA_PAGES 256 +#define NR_KRPC_DATA_CHUNKS_INLINE 4 +struct krpc_req { + struct list_head link; + spinlock_t lock; + struct pcs_krpc *krpc; + +#define KRPC_REQ_F_ALIGNMENT PCS_KRPC_MSG_F_ALIGNMENT +#define KRPC_REQ_F_RESP_BUFF PCS_KRPC_MSG_F_RESP_BUFF /* data buff is for read response */ +#define KRPC_REQ_F_ABORTED 0x10000 + int flags; + + struct pcs_msg msg; + + char *hdr_buf; + struct kvec hdr_kv; + struct krpc_chunk hdr_chunk; + + int nr_data_chunks; + struct krpc_chunk *data_chunks; + struct krpc_chunk inline_data_chunks[NR_KRPC_DATA_CHUNKS_INLINE]; + + int data_len; + int nr_data_bvecs; + struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES]; + + struct krpc_completion completion; +}; + +static inline u32 pcs_krpc_msg_size(u32 size, u8 flags) +{ + if (flags & PCS_KRPC_MSG_F_ALIGNMENT) + size = ALIGN(size, PCS_KRPC_MSG_ALIGNMENT); + + return size; +} + +int pcs_krpc_init(void); +void pcs_krpc_fini(void); + +void pcs_krpcset_init(struct pcs_krpc_set *krpcs); +void pcs_krpcset_fini(struct pcs_krpc_set *krpcs); + +struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id); +int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int cs_flags); +int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id, + PCS_NET_ADDR_T *addr, int flags); +int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id); +int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id); + +void pcs_krpc_put(struct pcs_krpc *krpc); +struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc); + +struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h); +void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg); +void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg); +#endif diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h new file mode 100644 index 000000000000..525f4e04d2cc --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h @@ -0,0 +1,44 @@ +#ifndef _PCS_KRPC_PROT_H_ +#define _PCS_KRPC_PROT_H_ +#include <linux/ioctl.h> + +#include "pcs_prot_types.h" + +/*Device ioctls: */ +#define PCS_KRPC_IOC_MAGIC 255 + +/* ioctl cmds supported by the '[pcs_krpc]' anonymous inode */ +#define PCS_KRPC_IOC_SEND_MSG _IO(PCS_KRPC_IOC_MAGIC, 10) +#define PCS_KRPC_IOC_RECV_MSG _IO(PCS_KRPC_IOC_MAGIC, 11) +#define PCS_KRPC_IOC_ABORT _IO(PCS_KRPC_IOC_MAGIC, 12) + +struct pcs_krpc_buf_desc { + u64 addr; /* buf address in userspace. */ + u32 len; /* size of the buf */ + u32 mr_id; /* mr id */ +}; + +#define PCS_KRPC_MSG_ALIGNMENT (512ULL) + +#define PCS_KRPC_MSG_F_ALIGNMENT 0x01 +#define PCS_KRPC_MSG_F_RESP_BUFF 0x02 +/* + * To avoid copying, a msg is sent to kernel in an array of buffer descriptor. + * Each buffer descriptor points to a buf contains a chunk of the msg. And all + * chunk buffers are from registered MRs + */ +struct pcs_krpc_ioc_sendmsg { + u64 xid; /* context id */ + u32 msg_size; /* total size of the msg */ + u16 timeout; /* timeout */ + u8 flags; /* alignment, */ + u8 nr_data_chunks; /* total number of data chunks */ + struct pcs_krpc_buf_desc hdr_chunk; /* the buf holding msg header */ +}; + +struct pcs_krpc_ioc_recvmsg { + u64 xid; /* context id */ + struct pcs_krpc_buf_desc buf; /* for cs congestion notification and rpc keep waiting msg.*/ +}; + +#endif diff --git a/fs/fuse/kio/pcs/pcs_mr.c b/fs/fuse/kio/pcs/pcs_mr.c index ad55b0cbad8b..c2a2c072ba9e 100644 --- a/fs/fuse/kio/pcs/pcs_mr.c +++ b/fs/fuse/kio/pcs/pcs_mr.c @@ -126,8 +126,10 @@ int pcs_reg_mr(struct pcs_mr_set *mrs, u64 start, u64 len) } umem = pcs_umem_get(start, len); - if (IS_ERR(umem)) + if (IS_ERR(umem)) { + atomic_dec(&mrs->mr_num); return PTR_ERR(umem); + } mr = kzalloc(sizeof(*mr), GFP_KERNEL); if (!mr) { diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h index cdf1c9754803..b88e56d2b9a2 100644 --- a/fs/fuse/kio/pcs/pcs_req.h +++ b/fs/fuse/kio/pcs/pcs_req.h @@ -18,6 +18,8 @@ #include "fuse_ktrace_prot.h" #include "fuse_stat.h" #include "../../fuse_i.h" +#include "pcs_mr.h" +#include "pcs_krpc.h" /////////////////////////// @@ -168,7 +170,7 @@ struct pcs_int_request union { struct { struct pcs_map_entry *map; - //// Temproraly disable flow + /* Temproraly disable flow */ struct pcs_flow_node *flow; u8 cmd; u8 role; @@ -253,7 +255,7 @@ struct pcs_int_request }; }; -// FROM pcs_cluste_core.h +/* FROM pcs_cluste_core.h */ struct pcs_clnt_config { @@ -284,9 +286,15 @@ struct pcs_cluster_core struct pcs_map_set maps; /* Global map data */ struct pcs_rpc_engine eng; /* RPC engine */ struct workqueue_struct *wq; -//// struct pcs_ratelimit rlim; /* Rate limiter */ -//// struct pcs_rng rng; +#if 0 + struct pcs_ratelimit rlim; /* Rate limiter */ + struct pcs_rng rng; +#endif /* <SKIP */ + + struct pcs_mr_set mrs; /* Table of all MRs*/ + struct pcs_krpc_set krpcs; /* Table of all KRPCs */ + struct pcs_fuse_stat stat; struct { @@ -337,10 +345,20 @@ static inline struct pcs_cluster_core *cc_from_maps(struct pcs_map_set *maps) return container_of(maps, struct pcs_cluster_core, maps); } +static inline struct pcs_cluster_core *cc_from_krpcset(struct pcs_krpc_set *krpcs) +{ + return container_of(krpcs, struct pcs_cluster_core, krpcs); +} + +static inline struct pcs_cluster_core *cc_from_krpc(struct pcs_krpc *krpc) +{ + return cc_from_krpcset(krpc->krpcs); +} + void pcs_cc_submit(struct pcs_cluster_core *cc, struct pcs_int_request* ireq); void pcs_cc_requeue(struct pcs_cluster_core *cc, struct list_head * q); void pcs_cc_update_storage_versions(struct pcs_cluster_core *cc, int version); -////// FROM pcs_cluster.h +/* FROM pcs_cluster.h */ static inline void pcs_sreq_attach(struct pcs_int_request * sreq, struct pcs_int_request * parent) { sreq->completion_data.parent = parent; @@ -421,4 +439,6 @@ typedef void (*kio_req_itr)(struct fuse_file *ff, struct fuse_req *req, void pcs_kio_req_list(struct fuse_conn *fc, kio_req_itr kreq_cb, void *ctx); extern struct crypto_shash *crc_tfm; + + #endif /* _PCS_REQ_H_ */ diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c index 3b763cea9c01..deebc1dddf1b 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.c +++ b/fs/fuse/kio/pcs/pcs_rpc.c @@ -292,7 +292,8 @@ void pcs_rpc_attach_new_ep(struct pcs_rpc * ep, struct pcs_rpc_engine * eng) ep->peer_flags = 0; ep->peer_version = ~0U; ep->conn = NULL; - ep->private = NULL; + ep->clnt_cs = NULL; + ep->clnt_krpc = NULL; INIT_LIST_HEAD(&ep->pending_queue); INIT_LIST_HEAD(&ep->state_queue); INIT_LIST_HEAD(&ep->input_queue); diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h index ef4ab26b9d44..f7df01ef9a49 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.h +++ b/fs/fuse/kio/pcs/pcs_rpc.h @@ -153,7 +153,9 @@ struct pcs_rpc struct hlist_head kill_calendar[RPC_MAX_CALENDAR]; struct llist_node cleanup_node; - struct pcs_cs * private; + struct pcs_cs *clnt_cs; + struct pcs_krpc *clnt_krpc; + int nr_clnts; }; struct pcs_rpc_engine diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.c b/fs/fuse/kio/pcs/pcs_rpc_clnt.c new file mode 100644 index 000000000000..11b7c3175bf5 --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.c @@ -0,0 +1,188 @@ +/* + * fs/fuse/kio/pcs/pcs_rpc_clnt.c + * + * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved. + * + */ + +#include <linux/types.h> + +#include "pcs_types.h" +#include "pcs_rpc.h" +#include "pcs_cluster.h" +#include "pcs_sock_io.h" +#include "pcs_sock_conn.h" +#include "pcs_rdma_conn.h" +#include "pcs_net_addr.h" +#include "log.h" +#include "fuse_ktrace.h" +#include "pcs_cs.h" +#include "pcs_krpc.h" + +static int clnt_input(struct pcs_rpc *ep, struct pcs_msg *msg) +{ + struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer; + + switch (h->type) { + case PCS_CS_CONG_NOTIFY: + if (ep->clnt_cs) + cs_handle_congestion(ep->clnt_cs, h); + + if (ep->clnt_krpc) + krpc_handle_congestion(ep, msg); + return 0; + default: + FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type); + return PCS_ERR_PROTOCOL; + } +} + + +static struct pcs_msg *clnt_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) +{ + if (h->xid.origin.val & PCS_NODE_ALT_MASK) + return cs_get_hdr(ep, h); + else + return krpc_get_hdr(ep, h); +} + +void clnt_aborting(struct pcs_rpc *ep, int error) +{ + pcs_rpc_reset(ep); +} + +static void clnt_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) +{ + struct pcs_rpc_hdr *req_h = (struct pcs_rpc_hdr *)msg_inline_head(req); + + if (req_h->xid.origin.val & PCS_NODE_ALT_MASK) + cs_keep_waiting(ep, req, msg); + else + krpc_keep_waiting(ep, req, msg); +} + +static void clnt_connect(struct pcs_rpc *ep) +{ + void (*connect_start)(struct pcs_rpc *); + + if (ep->flags & PCS_RPC_F_LOCAL) { + char path[128]; + + snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT, + (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id)); + + if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) { + TRACE("Path to local socket is too long: %s", path); + + ep->flags &= ~PCS_RPC_F_LOCAL; + goto fail; + } + memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un)); + ep->sh.sun.sun_family = AF_UNIX; + ep->sh.sa_len = sizeof(struct sockaddr_un); + strcpy(ep->sh.sun.sun_path, path); + connect_start = pcs_sockconnect_start; + } else { + /* TODO: print sock addr using pcs_format_netaddr() */ + if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) { + TRACE("netaddr to sockaddr failed"); + goto fail; + } + connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ? + pcs_rdmaconnect_start : pcs_sockconnect_start; + } + ep->state = PCS_RPC_CONNECT; + connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */ + return; + +fail: + pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR); + pcs_rpc_reset(ep); +} + +struct pcs_rpc_params clnt_rpc_params = { + .alloc_hdr_size = sizeof(struct pcs_rpc_hdr), + .max_msg_size = PCS_CS_MSG_MAX_SIZE, + .holddown_timeout = HZ, + .connect_timeout = 5*HZ, + .response_timeout = 30*HZ, + .max_conn_retry = 3, + .flags = 0, +}; + +struct pcs_rpc_ops clnt_rpc_ops = { + .demux_request = clnt_input, + .get_hdr = clnt_get_hdr, + .state_change = clnt_aborting, + .connect = clnt_connect, + .keep_waiting = clnt_keep_waiting, +}; + + +struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, PCS_NODE_ID_T *peer_id, + PCS_NET_ADDR_T *peer_addr, int flags) +{ + struct pcs_rpc *ep = NULL; + + /* + * It's not expected this function to be called frequently, + * slow version of search is acceptable here. + */ + spin_lock(&eng->lock); + hlist_for_each_entry(ep, &eng->unhashed, link) { + if (memcmp(&ep->peer_id, peer_id, sizeof(ep->peer_id)) == 0) { + pcs_rpc_get(ep); + break; + } + } + spin_unlock(&eng->lock); + + if (ep) { + mutex_lock(&ep->mutex); + if (ep->state != PCS_RPC_DESTROY) + goto found; + + mutex_unlock(&ep->mutex); + } + + /* create a new pcs_rpc instance if found one had been closed by its last owner */ + ep = pcs_rpc_alloc_ep(); + if (!ep) + return NULL; + + pcs_rpc_attach_new_ep(ep, eng); + pcs_rpc_configure_new_ep(ep, &clnt_rpc_params, &clnt_rpc_ops); + + pcs_rpc_set_peer_id(ep, peer_id, PCS_NODE_ROLE_CS); + pcs_rpc_set_address(ep, peer_addr); + + if (flags & CS_FL_LOCAL_SOCK) + ep->flags |= PCS_RPC_F_LOCAL; + else + ep->flags &= ~PCS_RPC_F_LOCAL; + + mutex_lock(&ep->mutex); +found: + ep->nr_clnts++; + mutex_unlock(&ep->mutex); + + return ep; +} + +void pcs_rpc_clnt_close(struct pcs_rpc *ep) +{ + mutex_lock(&ep->mutex); + BUG_ON(ep->flags & PCS_RPC_F_DEAD); + BUG_ON(ep->flags & PCS_RPC_F_PASSIVE); + + ep->nr_clnts--; + if (!ep->nr_clnts) { + /* close the rpc if we're the last rpc client */ + ep->flags |= PCS_RPC_F_DEAD; + rpc_abort(ep, 1, PCS_ERR_NET_ABORT); + ep->state = PCS_RPC_DESTROY; + } + mutex_unlock(&ep->mutex); + + pcs_rpc_put(ep); +} diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.h b/fs/fuse/kio/pcs/pcs_rpc_clnt.h new file mode 100644 index 000000000000..7afe59e9c992 --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.h @@ -0,0 +1,16 @@ +/* + * fs/fuse/kio/pcs/pcs_rpc_clnt.h + * + * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved. + * + */ + +#ifndef _PCS_RPC_CLNT_H_ +#define _PCS_RPC_CLNT_H_ 1 +#include "pcs_rpc.h" + +struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, + PCS_NODE_ID_T *peer_id, PCS_NET_ADDR_T *peer_addr, int flags); +void pcs_rpc_clnt_close(struct pcs_rpc *ep); + +#endif -- 2.39.3 (Apple Git-146) _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel