The commit is pushed to "branch-rh9-5.14.0-427.22.1.vz9.62.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git after rh9-5.14.0-427.22.1.vz9.62.3 ------> commit 176c96b0d670c3eff4505368a7f67857ea317e0b Author: Liu Kui <kui....@virtuozzo.com> Date: Tue Jul 2 23:26:08 2024 +0800
fs/fuse kio: adapt pcs_rpc to support pcs_krpc. Previously pcs_cs is the only client of pcs_rpc, so they are tightly bound together. However this needs to be changed with addition of pcs_krpc. So now: - pcs_rpc will be created on first creation of either pcs_krpc or pcs_cs. and destroyed on last destruction of both pcs_krpc and pcs_cs. - pcs_rpc can handle messages belong to pcs_krpc and pcs_cs concurrently. https://pmc.acronis.work/browse/VSTOR-82613 Signed-off-by: Liu Kui <kui....@virtuozzo.com> ====== Patchset description: fs/fuse kio: introduce pcs_krpc for merging userspace RPC in vstorage-mount with kernel RPC Implement pcs_krpc in kio module to support using kernel RPC directly from userspace. https://pmc.acronis.work/browse/VSTOR-82613 Liu Kui (4): fs/fuse kio: implement memory region to support zero-copy between userspace and kernel. fs/fuse kio: implement pcs_krpc - export kernel RPC to userspace fs/fuse kio: adapt pcs_rpc to support pcs_krpc. fs/fuse kio: integrate pcs_krpc to kio module Feature: fuse: kRPC - single RPC for kernel and userspace --- fs/fuse/Makefile | 6 +- fs/fuse/kio/pcs/pcs_cs.c | 120 +++++--------------------- fs/fuse/kio/pcs/pcs_cs.h | 5 +- fs/fuse/kio/pcs/pcs_req.h | 28 +++++-- fs/fuse/kio/pcs/pcs_rpc.c | 3 +- fs/fuse/kio/pcs/pcs_rpc.h | 4 +- fs/fuse/kio/pcs/pcs_rpc_clnt.c | 185 +++++++++++++++++++++++++++++++++++++++++ fs/fuse/kio/pcs/pcs_rpc_clnt.h | 13 +++ 8 files changed, 254 insertions(+), 110 deletions(-) diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile index 740c805adc2a..18eaa35a234b 100644 --- a/fs/fuse/Makefile +++ b/fs/fuse/Makefile @@ -33,7 +33,11 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \ kio/pcs/pcs_rdma_rw.o \ kio/pcs/pcs_rdma_conn.o \ kio/pcs/pcs_net_addr.o \ - kio/pcs/pcs_cs_accel.o + kio/pcs/pcs_cs_accel.o \ + kio/pcs/pcs_rpc_clnt.o \ + kio/pcs/pcs_mr.o \ + kio/pcs/pcs_krpc.o + fuse_kio_pcs_trace-objs := kio/pcs/fuse_kio_pcs_trace.o virtiofs-y := virtio_fs.o diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c index 6cc97aa3f9d0..8deb9d77ecb0 100644 --- a/fs/fuse/kio/pcs/pcs_cs.c +++ b/fs/fuse/kio/pcs/pcs_cs.c @@ -25,10 +25,10 @@ #include "log.h" #include "fuse_ktrace.h" #include "pcs_net_addr.h" +#include "pcs_rpc_clnt.h" /* Lock order: cs->lock -> css->lock (lru, hash, bl_list) */ - struct pcs_rpc_params cn_rpc_params = { .alloc_hdr_size = sizeof(struct pcs_rpc_hdr), .max_msg_size = PCS_CS_MSG_MAX_SIZE, @@ -39,22 +39,9 @@ struct pcs_rpc_params cn_rpc_params = { .flags = 0, }; -static void cs_aborting(struct pcs_rpc *ep, int error); -static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h); -static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg); -static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg); -static void cs_connect(struct pcs_rpc *ep); static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose); static void pcs_cs_destroy(struct pcs_cs *cs); -struct pcs_rpc_ops cn_rpc_ops = { - .demux_request = cs_input, - .get_hdr = cs_get_hdr, - .state_change = cs_aborting, - .keep_waiting = cs_keep_waiting, - .connect = cs_connect, -}; - static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs) { seqlock_init(&cs->stat.seqlock); @@ -124,8 +111,7 @@ static void pcs_cs_percpu_stat_free(struct pcs_cs *cs) free_percpu(cs->stat.iolat); } -struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, - struct pcs_cluster_core *cc) +struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css) { struct pcs_cs *cs; @@ -153,13 +139,6 @@ struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, kfree(cs); return NULL; } - cs->rpc = pcs_rpc_create(&cc->eng, &cn_rpc_params, &cn_rpc_ops); - if (cs->rpc == NULL) { - pcs_cs_percpu_stat_free(cs); - kfree(cs); - return NULL; - } - cs->rpc->private = cs; INIT_LIST_HEAD(&cs->map_list); return cs; } @@ -222,6 +201,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P /* If rpc is connected, leave it connected until failure. * After current connect fails, reconnect will be done to new address */ + struct pcs_rpc *rpc = cs->rpc; if (addr) { if (addr->type != PCS_ADDRTYPE_NONE) { if (pcs_netaddr_cmp(&cs->addr, addr)) { @@ -231,7 +211,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P FUSE_KTRACE(cc_from_csset(csset)->fc, "Port change CS" NODE_FMT " seq=%d", NODE_ARGS(*id), cs->addr_serno); - pcs_rpc_set_address(cs->rpc, addr); + pcs_rpc_set_address(rpc, addr); if (!(flags & CS_FL_INACTIVE)) { pcs_map_notify_addr_change(cs); @@ -246,30 +226,28 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P } } if (flags & CS_FL_LOCAL_SOCK) - cs->rpc->flags |= PCS_RPC_F_LOCAL; + rpc->flags |= PCS_RPC_F_LOCAL; else - cs->rpc->flags &= ~PCS_RPC_F_LOCAL; + rpc->flags &= ~PCS_RPC_F_LOCAL; return cs; } BUG_ON(addr == NULL); - cs = pcs_cs_alloc(csset, cc_from_csset(csset)); + cs = pcs_cs_alloc(csset); if (!cs) return NULL; + cs->rpc = pcs_rpc_clnt_create(&cc_from_csset(csset)->eng, id, addr, flags); + if (!cs->rpc) { + pcs_cs_percpu_stat_free(cs); + kfree(cs); + return NULL; + } + cs->rpc->clnt_cs = cs; cs->id = *id; - cs->addr = *addr; cs->addr_serno = 1; - pcs_rpc_set_peer_id(cs->rpc, id, PCS_NODE_ROLE_CS); - pcs_rpc_set_address(cs->rpc, addr); - - if (flags & CS_FL_LOCAL_SOCK) - cs->rpc->flags |= PCS_RPC_F_LOCAL; - else - cs->rpc->flags &= ~PCS_RPC_F_LOCAL; - spin_lock(&cs->lock); spin_lock(&csset->lock); if (__lookup_cs(csset, id)) { @@ -455,46 +433,7 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io } } -static void cs_connect(struct pcs_rpc *ep) -{ - void (*connect_start)(struct pcs_rpc *); - - if (ep->flags & PCS_RPC_F_LOCAL) { - char path[128]; - - snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT, - (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id)); - - if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) { - TRACE("Path to local socket is too long: %s", path); - - ep->flags &= ~PCS_RPC_F_LOCAL; - goto fail; - } - memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un)); - ep->sh.sun.sun_family = AF_UNIX; - ep->sh.sa_len = sizeof(struct sockaddr_un); - strcpy(ep->sh.sun.sun_path, path); - connect_start = pcs_sockconnect_start; - } else { - /* TODO: print sock addr using pcs_format_netaddr() */ - if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) { - TRACE("netaddr to sockaddr failed"); - goto fail; - } - connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ? - pcs_rdmaconnect_start : pcs_sockconnect_start; - } - ep->state = PCS_RPC_CONNECT; - connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */ - return; -fail: - pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR); - pcs_rpc_reset(ep); - return; -} - -static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) +struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) { struct pcs_msg *msg, *resp; struct pcs_rpc_hdr *req_h; @@ -887,7 +826,7 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq) do_cs_submit(cs, ireq); } -static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h) +void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h) { struct pcs_cs *who; @@ -945,10 +884,10 @@ static int may_reroute(struct pcs_cs_list *csl, PCS_NODE_ID_T cs_id) return legit; } -static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) +void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) { struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg_inline_head(msg); - struct pcs_cs *cs = ep->private; + struct pcs_cs *cs = ep->clnt_cs; struct pcs_cs *who; /* Some CS reported it cannot complete local IO in time, close congestion window */ @@ -1003,21 +942,6 @@ static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_ } -static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg) -{ - struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer; - - switch (h->type) { - case PCS_CS_CONG_NOTIFY: - handle_congestion(ep->private, h); - msg->done(msg); - return 0; - default: - FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type); - return PCS_ERR_PROTOCOL; - } -} - void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err) { struct list_head queue; @@ -1094,17 +1018,13 @@ static void pcs_cs_destroy(struct pcs_cs *cs) BUG_ON(cs->csa_ctx); if (cs->rpc) { - pcs_rpc_close(cs->rpc); + cs->rpc->clnt_cs = NULL; + pcs_rpc_clnt_close(cs->rpc); cs->rpc = NULL; } call_rcu(&cs->rcu, cs_destroy_rcu); } -void cs_aborting(struct pcs_rpc *ep, int error) -{ - pcs_rpc_reset(ep); -} - /* Latency is difficult value to use for any decisions. * It is sampled at random, we do not know what is happening while * we have no samples. For now we do the following: arriving samples diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h index 61be99a54157..62b88f612b54 100644 --- a/fs/fuse/kio/pcs/pcs_cs.h +++ b/fs/fuse/kio/pcs/pcs_cs.h @@ -159,8 +159,6 @@ unsigned int cs_get_avg_in_flight(struct pcs_cs *cs); void pcs_csset_init(struct pcs_cs_set *css); void pcs_csset_fini(struct pcs_cs_set *css); -struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, struct pcs_cluster_core *cc); - void cs_log_io_times(struct pcs_int_request *ireq, struct pcs_msg *resp, unsigned int max_iolat); int pcs_cs_format_io_times(char *buf, int buflen, struct pcs_int_request *ireq, struct pcs_msg *resp); void cs_set_io_times_logger(void (*logger)(struct pcs_int_request *ireq, struct pcs_msg *resp, u32 max_iolat, void *ctx), void *ctx); @@ -219,4 +217,7 @@ int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx); void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id); void pcs_csa_cs_detach(struct pcs_cs * cs); +void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h); +struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h); +void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg); #endif /* _PCS_CS_H_ */ diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h index cdf1c9754803..ce3a976b5661 100644 --- a/fs/fuse/kio/pcs/pcs_req.h +++ b/fs/fuse/kio/pcs/pcs_req.h @@ -18,6 +18,8 @@ #include "fuse_ktrace_prot.h" #include "fuse_stat.h" #include "../../fuse_i.h" +#include "pcs_mr.h" +#include "pcs_krpc.h" /////////////////////////// @@ -168,7 +170,7 @@ struct pcs_int_request union { struct { struct pcs_map_entry *map; - //// Temproraly disable flow + /* Temproraly disable flow */ struct pcs_flow_node *flow; u8 cmd; u8 role; @@ -253,7 +255,7 @@ struct pcs_int_request }; }; -// FROM pcs_cluste_core.h +/* FROM pcs_cluste_core.h */ struct pcs_clnt_config { @@ -284,9 +286,15 @@ struct pcs_cluster_core struct pcs_map_set maps; /* Global map data */ struct pcs_rpc_engine eng; /* RPC engine */ struct workqueue_struct *wq; -//// struct pcs_ratelimit rlim; /* Rate limiter */ -//// struct pcs_rng rng; +#if 0 + struct pcs_ratelimit rlim; /* Rate limiter */ + struct pcs_rng rng; +#endif /* <SKIP */ + + struct pcs_mr_set mrs; /* Table of all MRs*/ + struct pcs_krpc_set krpcs; /* Table of all KRPCs */ + struct pcs_fuse_stat stat; struct { @@ -337,10 +345,20 @@ static inline struct pcs_cluster_core *cc_from_maps(struct pcs_map_set *maps) return container_of(maps, struct pcs_cluster_core, maps); } +static inline struct pcs_cluster_core *cc_from_krpcset(struct pcs_krpc_set *krpcs) +{ + return container_of(krpcs, struct pcs_cluster_core, krpcs); +} + +static inline struct pcs_cluster_core *cc_from_krpc(struct pcs_krpc *krpc) +{ + return cc_from_krpcset(krpc->krpcs); +} + void pcs_cc_submit(struct pcs_cluster_core *cc, struct pcs_int_request* ireq); void pcs_cc_requeue(struct pcs_cluster_core *cc, struct list_head * q); void pcs_cc_update_storage_versions(struct pcs_cluster_core *cc, int version); -////// FROM pcs_cluster.h +/* FROM pcs_cluster.h */ static inline void pcs_sreq_attach(struct pcs_int_request * sreq, struct pcs_int_request * parent) { sreq->completion_data.parent = parent; diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c index 3b763cea9c01..deebc1dddf1b 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.c +++ b/fs/fuse/kio/pcs/pcs_rpc.c @@ -292,7 +292,8 @@ void pcs_rpc_attach_new_ep(struct pcs_rpc * ep, struct pcs_rpc_engine * eng) ep->peer_flags = 0; ep->peer_version = ~0U; ep->conn = NULL; - ep->private = NULL; + ep->clnt_cs = NULL; + ep->clnt_krpc = NULL; INIT_LIST_HEAD(&ep->pending_queue); INIT_LIST_HEAD(&ep->state_queue); INIT_LIST_HEAD(&ep->input_queue); diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h index ef4ab26b9d44..fe30bede7efe 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.h +++ b/fs/fuse/kio/pcs/pcs_rpc.h @@ -153,7 +153,9 @@ struct pcs_rpc struct hlist_head kill_calendar[RPC_MAX_CALENDAR]; struct llist_node cleanup_node; - struct pcs_cs * private; + struct pcs_cs *clnt_cs; + struct pcs_krpc *clnt_krpc; + int nr_clnts; }; struct pcs_rpc_engine diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.c b/fs/fuse/kio/pcs/pcs_rpc_clnt.c new file mode 100644 index 000000000000..810aaaf0081d --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.c @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved. + */ + +#include <linux/types.h> + +#include "pcs_types.h" +#include "pcs_rpc.h" +#include "pcs_cluster.h" +#include "pcs_sock_io.h" +#include "pcs_sock_conn.h" +#include "pcs_rdma_conn.h" +#include "pcs_net_addr.h" +#include "log.h" +#include "fuse_ktrace.h" +#include "pcs_cs.h" +#include "pcs_krpc.h" + +static int clnt_input(struct pcs_rpc *ep, struct pcs_msg *msg) +{ + struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer; + + switch (h->type) { + case PCS_CS_CONG_NOTIFY: + if (ep->clnt_cs) + cs_handle_congestion(ep->clnt_cs, h); + + if (ep->clnt_krpc) + krpc_handle_congestion(ep, msg); + return 0; + default: + FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type); + return PCS_ERR_PROTOCOL; + } +} + + +static struct pcs_msg *clnt_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) +{ + if (h->xid.origin.val & PCS_NODE_ALT_MASK) + return cs_get_hdr(ep, h); + else + return krpc_get_hdr(ep, h); +} + +void clnt_aborting(struct pcs_rpc *ep, int error) +{ + pcs_rpc_reset(ep); +} + +static void clnt_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg) +{ + struct pcs_rpc_hdr *req_h = (struct pcs_rpc_hdr *)msg_inline_head(req); + + if (req_h->xid.origin.val & PCS_NODE_ALT_MASK) + cs_keep_waiting(ep, req, msg); + else + krpc_keep_waiting(ep, req, msg); +} + +static void clnt_connect(struct pcs_rpc *ep) +{ + void (*connect_start)(struct pcs_rpc *); + + if (ep->flags & PCS_RPC_F_LOCAL) { + char path[128]; + + snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT, + (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id)); + + if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) { + TRACE("Path to local socket is too long: %s", path); + + ep->flags &= ~PCS_RPC_F_LOCAL; + goto fail; + } + memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un)); + ep->sh.sun.sun_family = AF_UNIX; + ep->sh.sa_len = sizeof(struct sockaddr_un); + strcpy(ep->sh.sun.sun_path, path); + connect_start = pcs_sockconnect_start; + } else { + /* TODO: print sock addr using pcs_format_netaddr() */ + if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) { + TRACE("netaddr to sockaddr failed"); + goto fail; + } + connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ? + pcs_rdmaconnect_start : pcs_sockconnect_start; + } + ep->state = PCS_RPC_CONNECT; + connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */ + return; + +fail: + pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR); + pcs_rpc_reset(ep); +} + +struct pcs_rpc_params clnt_rpc_params = { + .alloc_hdr_size = sizeof(struct pcs_rpc_hdr), + .max_msg_size = PCS_CS_MSG_MAX_SIZE, + .holddown_timeout = HZ, + .connect_timeout = 5*HZ, + .response_timeout = 30*HZ, + .max_conn_retry = 3, + .flags = 0, +}; + +struct pcs_rpc_ops clnt_rpc_ops = { + .demux_request = clnt_input, + .get_hdr = clnt_get_hdr, + .state_change = clnt_aborting, + .connect = clnt_connect, + .keep_waiting = clnt_keep_waiting, +}; + + +struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, PCS_NODE_ID_T *peer_id, + PCS_NET_ADDR_T *peer_addr, int flags) +{ + struct pcs_rpc *ep = NULL; + + /* + * It's not expected this function to be called frequently, + * slow version of search is acceptable here. + */ + spin_lock(&eng->lock); + hlist_for_each_entry(ep, &eng->unhashed, link) { + if (memcmp(&ep->peer_id, peer_id, sizeof(ep->peer_id)) == 0) { + pcs_rpc_get(ep); + break; + } + } + spin_unlock(&eng->lock); + + if (ep) { + mutex_lock(&ep->mutex); + if (ep->state != PCS_RPC_DESTROY) + goto found; + + mutex_unlock(&ep->mutex); + } + + /* create a new pcs_rpc instance if found one had been closed by its last owner */ + ep = pcs_rpc_alloc_ep(); + if (!ep) + return NULL; + + pcs_rpc_attach_new_ep(ep, eng); + pcs_rpc_configure_new_ep(ep, &clnt_rpc_params, &clnt_rpc_ops); + + pcs_rpc_set_peer_id(ep, peer_id, PCS_NODE_ROLE_CS); + pcs_rpc_set_address(ep, peer_addr); + + if (flags & CS_FL_LOCAL_SOCK) + ep->flags |= PCS_RPC_F_LOCAL; + else + ep->flags &= ~PCS_RPC_F_LOCAL; + + mutex_lock(&ep->mutex); +found: + ep->nr_clnts++; + mutex_unlock(&ep->mutex); + + return ep; +} + +void pcs_rpc_clnt_close(struct pcs_rpc *ep) +{ + mutex_lock(&ep->mutex); + BUG_ON(ep->flags & PCS_RPC_F_DEAD); + BUG_ON(ep->flags & PCS_RPC_F_PASSIVE); + + ep->nr_clnts--; + if (!ep->nr_clnts) { + /* close the rpc if we're the last rpc client */ + ep->flags |= PCS_RPC_F_DEAD; + rpc_abort(ep, 1, PCS_ERR_NET_ABORT); + ep->state = PCS_RPC_DESTROY; + } + mutex_unlock(&ep->mutex); + + pcs_rpc_put(ep); +} diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.h b/fs/fuse/kio/pcs/pcs_rpc_clnt.h new file mode 100644 index 000000000000..e94cf96d7079 --- /dev/null +++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.h @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved. + */ + +#ifndef _PCS_RPC_CLNT_H_ +#define _PCS_RPC_CLNT_H_ 1 +#include "pcs_rpc.h" + +struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, + PCS_NODE_ID_T *peer_id, PCS_NET_ADDR_T *peer_addr, int flags); +void pcs_rpc_clnt_close(struct pcs_rpc *ep); + +#endif _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel