Previously pcs_cs is the only client of pcs_rpc, so they are tightly
bound together. However this needs to be changed with addition of
pcs_krpc. So now:
- pcs_rpc will be created on first creation of either pcs_krpc or pcs_cs.
  and destroyed on last destruction of both pcs_krpc and pcs_cs.
- pcs_rpc can handle messages belong to pcs_krpc and pcs_cs concurrently.

https://pmc.acronis.work/browse/VSTOR-82613

Signed-off-by: Liu Kui <kui....@virtuozzo.com>
---
 fs/fuse/Makefile               |   6 +-
 fs/fuse/kio/pcs/pcs_cs.c       | 120 ++++-----------------
 fs/fuse/kio/pcs/pcs_cs.h       |   5 +-
 fs/fuse/kio/pcs/pcs_req.h      |  28 ++++-
 fs/fuse/kio/pcs/pcs_rpc.c      |   3 +-
 fs/fuse/kio/pcs/pcs_rpc.h      |   4 +-
 fs/fuse/kio/pcs/pcs_rpc_clnt.c | 185 +++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_rpc_clnt.h |  13 +++
 8 files changed, 254 insertions(+), 110 deletions(-)
 create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.c
 create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.h

diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 740c805adc2a..18eaa35a234b 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -33,7 +33,11 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
        kio/pcs/pcs_rdma_rw.o \
        kio/pcs/pcs_rdma_conn.o \
        kio/pcs/pcs_net_addr.o \
-       kio/pcs/pcs_cs_accel.o
+       kio/pcs/pcs_cs_accel.o \
+       kio/pcs/pcs_rpc_clnt.o \
+       kio/pcs/pcs_mr.o \
+       kio/pcs/pcs_krpc.o
+
 fuse_kio_pcs_trace-objs := kio/pcs/fuse_kio_pcs_trace.o
 
 virtiofs-y := virtio_fs.o
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 6cc97aa3f9d0..8deb9d77ecb0 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -25,10 +25,10 @@
 #include "log.h"
 #include "fuse_ktrace.h"
 #include "pcs_net_addr.h"
+#include "pcs_rpc_clnt.h"
 
 /* Lock order: cs->lock -> css->lock (lru, hash, bl_list) */
 
-
 struct pcs_rpc_params cn_rpc_params = {
        .alloc_hdr_size         = sizeof(struct pcs_rpc_hdr),
        .max_msg_size           = PCS_CS_MSG_MAX_SIZE,
@@ -39,22 +39,9 @@ struct pcs_rpc_params cn_rpc_params = {
        .flags                  = 0,
 };
 
-static void cs_aborting(struct pcs_rpc *ep, int error);
-static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
-static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg);
-static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct 
pcs_msg *msg);
-static void cs_connect(struct pcs_rpc *ep);
 static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose);
 static void pcs_cs_destroy(struct pcs_cs *cs);
 
-struct pcs_rpc_ops cn_rpc_ops = {
-       .demux_request          = cs_input,
-       .get_hdr                = cs_get_hdr,
-       .state_change           = cs_aborting,
-       .keep_waiting           = cs_keep_waiting,
-       .connect                = cs_connect,
-};
-
 static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs)
 {
        seqlock_init(&cs->stat.seqlock);
@@ -124,8 +111,7 @@ static void pcs_cs_percpu_stat_free(struct pcs_cs *cs)
        free_percpu(cs->stat.iolat);
 }
 
-struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css,
-                            struct pcs_cluster_core *cc)
+struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css)
 {
        struct pcs_cs *cs;
 
@@ -153,13 +139,6 @@ struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css,
            kfree(cs);
            return NULL;
        }
-       cs->rpc = pcs_rpc_create(&cc->eng, &cn_rpc_params, &cn_rpc_ops);
-       if (cs->rpc == NULL) {
-               pcs_cs_percpu_stat_free(cs);
-               kfree(cs);
-               return NULL;
-       }
-       cs->rpc->private = cs;
        INIT_LIST_HEAD(&cs->map_list);
        return cs;
 }
@@ -222,6 +201,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, 
PCS_NODE_ID_T *id, P
                /* If rpc is connected, leave it connected until failure.
                 * After current connect fails, reconnect will be done to new 
address
                 */
+               struct pcs_rpc *rpc = cs->rpc;
                if (addr) {
                        if (addr->type != PCS_ADDRTYPE_NONE) {
                                if (pcs_netaddr_cmp(&cs->addr, addr)) {
@@ -231,7 +211,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, 
PCS_NODE_ID_T *id, P
                                        FUSE_KTRACE(cc_from_csset(csset)->fc,
                                                    "Port change CS" NODE_FMT " 
seq=%d",
                                                    NODE_ARGS(*id), 
cs->addr_serno);
-                                       pcs_rpc_set_address(cs->rpc, addr);
+                                       pcs_rpc_set_address(rpc, addr);
 
                                        if (!(flags & CS_FL_INACTIVE)) {
                                                pcs_map_notify_addr_change(cs);
@@ -246,30 +226,28 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set 
*csset, PCS_NODE_ID_T *id, P
                        }
                }
                if (flags & CS_FL_LOCAL_SOCK)
-                       cs->rpc->flags |= PCS_RPC_F_LOCAL;
+                       rpc->flags |= PCS_RPC_F_LOCAL;
                else
-                       cs->rpc->flags &= ~PCS_RPC_F_LOCAL;
+                       rpc->flags &= ~PCS_RPC_F_LOCAL;
                return cs;
        }
        BUG_ON(addr == NULL);
 
-       cs = pcs_cs_alloc(csset, cc_from_csset(csset));
+       cs = pcs_cs_alloc(csset);
        if (!cs)
                return NULL;
 
+       cs->rpc = pcs_rpc_clnt_create(&cc_from_csset(csset)->eng, id, addr, 
flags);
+       if (!cs->rpc) {
+               pcs_cs_percpu_stat_free(cs);
+               kfree(cs);
+               return NULL;
+       }
+       cs->rpc->clnt_cs = cs;
        cs->id = *id;
-
        cs->addr = *addr;
        cs->addr_serno = 1;
 
-       pcs_rpc_set_peer_id(cs->rpc, id, PCS_NODE_ROLE_CS);
-       pcs_rpc_set_address(cs->rpc, addr);
-
-       if (flags & CS_FL_LOCAL_SOCK)
-               cs->rpc->flags |= PCS_RPC_F_LOCAL;
-       else
-               cs->rpc->flags &= ~PCS_RPC_F_LOCAL;
-
        spin_lock(&cs->lock);
        spin_lock(&csset->lock);
        if (__lookup_cs(csset, id)) {
@@ -455,46 +433,7 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, 
int offset, struct io
        }
 }
 
-static void cs_connect(struct pcs_rpc *ep)
-{
-       void (*connect_start)(struct pcs_rpc *);
-
-       if (ep->flags & PCS_RPC_F_LOCAL) {
-               char path[128];
-
-               snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" 
CLUSTER_ID_FMT,
-                        (unsigned long long)ep->peer_id.val, 
CLUSTER_ID_ARGS(ep->eng->cluster_id));
-
-               if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 
0)->sun_path)) {
-                       TRACE("Path to local socket is too long: %s", path);
-
-                       ep->flags &= ~PCS_RPC_F_LOCAL;
-                       goto fail;
-               }
-               memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
-               ep->sh.sun.sun_family = AF_UNIX;
-               ep->sh.sa_len = sizeof(struct sockaddr_un);
-               strcpy(ep->sh.sun.sun_path, path);
-               connect_start = pcs_sockconnect_start;
-       } else {
-               /* TODO: print sock addr using pcs_format_netaddr() */
-               if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, 
&ep->sh.sa_len)) {
-                       TRACE("netaddr to sockaddr failed");
-                       goto fail;
-               }
-               connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
-                       pcs_rdmaconnect_start : pcs_sockconnect_start;
-       }
-       ep->state = PCS_RPC_CONNECT;
-       connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
-       return;
-fail:
-       pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR);
-       pcs_rpc_reset(ep);
-       return;
-}
-
-static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
+struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
 {
        struct pcs_msg *msg, *resp;
        struct pcs_rpc_hdr *req_h;
@@ -887,7 +826,7 @@ void pcs_cs_submit(struct pcs_cs *cs, struct 
pcs_int_request *ireq)
        do_cs_submit(cs, ireq);
 }
 
-static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
+void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
 {
        struct pcs_cs *who;
 
@@ -945,10 +884,10 @@ static int may_reroute(struct pcs_cs_list *csl, 
PCS_NODE_ID_T cs_id)
        return legit;
 }
 
-static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct 
pcs_msg *msg)
+void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg 
*msg)
 {
        struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg_inline_head(msg);
-       struct pcs_cs *cs = ep->private;
+       struct pcs_cs *cs = ep->clnt_cs;
        struct pcs_cs *who;
 
        /* Some CS reported it cannot complete local IO in time, close 
congestion window */
@@ -1003,21 +942,6 @@ static void cs_keep_waiting(struct pcs_rpc *ep, struct 
pcs_msg *req, struct pcs_
 
 }
 
-static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg)
-{
-       struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer;
-
-       switch (h->type) {
-       case PCS_CS_CONG_NOTIFY:
-               handle_congestion(ep->private, h);
-               msg->done(msg);
-               return 0;
-       default:
-               FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported 
message type %u", h->type);
-               return PCS_ERR_PROTOCOL;
-       }
-}
-
 void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err)
 {
        struct list_head queue;
@@ -1094,17 +1018,13 @@ static void pcs_cs_destroy(struct pcs_cs *cs)
        BUG_ON(cs->csa_ctx);
 
        if (cs->rpc) {
-               pcs_rpc_close(cs->rpc);
+               cs->rpc->clnt_cs = NULL;
+               pcs_rpc_clnt_close(cs->rpc);
                cs->rpc = NULL;
        }
        call_rcu(&cs->rcu, cs_destroy_rcu);
 }
 
-void cs_aborting(struct pcs_rpc *ep, int error)
-{
-       pcs_rpc_reset(ep);
-}
-
 /* Latency is difficult value to use for any decisions.
  * It is sampled at random, we do not know what is happening while
  * we have no samples. For now we do the following: arriving samples
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 61be99a54157..62b88f612b54 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -159,8 +159,6 @@ unsigned int cs_get_avg_in_flight(struct pcs_cs *cs);
 void pcs_csset_init(struct pcs_cs_set *css);
 void pcs_csset_fini(struct pcs_cs_set *css);
 
-struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, struct pcs_cluster_core 
*cc);
-
 void cs_log_io_times(struct pcs_int_request *ireq, struct pcs_msg *resp, 
unsigned int max_iolat);
 int pcs_cs_format_io_times(char *buf, int buflen, struct pcs_int_request 
*ireq, struct pcs_msg *resp);
 void cs_set_io_times_logger(void (*logger)(struct pcs_int_request *ireq, 
struct pcs_msg *resp, u32 max_iolat, void *ctx), void *ctx);
@@ -219,4 +217,7 @@ int pcs_csa_csl_write_submit_single(struct pcs_int_request 
* ireq, int idx);
 void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr 
* h, PCS_NODE_ID_T cs_id);
 void pcs_csa_cs_detach(struct pcs_cs * cs);
 
+void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h);
+struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
+void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg 
*msg);
 #endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index cdf1c9754803..ce3a976b5661 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -18,6 +18,8 @@
 #include "fuse_ktrace_prot.h"
 #include "fuse_stat.h"
 #include "../../fuse_i.h"
+#include "pcs_mr.h"
+#include "pcs_krpc.h"
 
 ///////////////////////////
 
@@ -168,7 +170,7 @@ struct pcs_int_request
        union {
                struct {
                        struct pcs_map_entry    *map;
-                       //// Temproraly disable flow
+                       /* Temproraly disable flow */
                        struct pcs_flow_node    *flow;
                        u8                      cmd;
                        u8                      role;
@@ -253,7 +255,7 @@ struct pcs_int_request
        };
 };
 
-// FROM pcs_cluste_core.h
+/* FROM pcs_cluste_core.h */
 
 struct pcs_clnt_config
 {
@@ -284,9 +286,15 @@ struct pcs_cluster_core
        struct pcs_map_set      maps;           /* Global map data */
        struct pcs_rpc_engine   eng;            /* RPC engine */
        struct workqueue_struct *wq;
-////   struct pcs_ratelimit    rlim;           /* Rate limiter */
-////   struct pcs_rng          rng;
+#if 0
+       struct pcs_ratelimit    rlim;           /* Rate limiter */
+       struct pcs_rng          rng;
+#endif
        /* <SKIP */
+
+       struct pcs_mr_set       mrs;            /* Table of all MRs*/
+       struct pcs_krpc_set     krpcs;          /* Table of all KRPCs */
+
        struct pcs_fuse_stat    stat;
 
        struct {
@@ -337,10 +345,20 @@ static inline struct pcs_cluster_core 
*cc_from_maps(struct pcs_map_set *maps)
        return container_of(maps, struct pcs_cluster_core, maps);
 }
 
+static inline struct pcs_cluster_core *cc_from_krpcset(struct pcs_krpc_set 
*krpcs)
+{
+       return container_of(krpcs, struct pcs_cluster_core, krpcs);
+}
+
+static inline struct pcs_cluster_core *cc_from_krpc(struct pcs_krpc *krpc)
+{
+       return cc_from_krpcset(krpc->krpcs);
+}
+
 void pcs_cc_submit(struct pcs_cluster_core *cc, struct pcs_int_request* ireq);
 void pcs_cc_requeue(struct pcs_cluster_core *cc, struct list_head * q);
 void pcs_cc_update_storage_versions(struct pcs_cluster_core *cc, int version);
-////// FROM pcs_cluster.h
+/* FROM pcs_cluster.h */
 static inline void pcs_sreq_attach(struct pcs_int_request * sreq, struct 
pcs_int_request * parent)
 {
        sreq->completion_data.parent = parent;
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 3b763cea9c01..deebc1dddf1b 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -292,7 +292,8 @@ void pcs_rpc_attach_new_ep(struct pcs_rpc * ep, struct 
pcs_rpc_engine * eng)
        ep->peer_flags = 0;
        ep->peer_version = ~0U;
        ep->conn = NULL;
-       ep->private = NULL;
+       ep->clnt_cs = NULL;
+       ep->clnt_krpc = NULL;
        INIT_LIST_HEAD(&ep->pending_queue);
        INIT_LIST_HEAD(&ep->state_queue);
        INIT_LIST_HEAD(&ep->input_queue);
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index ef4ab26b9d44..fe30bede7efe 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -153,7 +153,9 @@ struct pcs_rpc
        struct hlist_head       kill_calendar[RPC_MAX_CALENDAR];
        struct llist_node       cleanup_node;
 
-       struct pcs_cs *         private;
+       struct pcs_cs           *clnt_cs;
+       struct pcs_krpc         *clnt_krpc;
+       int nr_clnts;
 };
 
 struct pcs_rpc_engine
diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.c b/fs/fuse/kio/pcs/pcs_rpc_clnt.c
new file mode 100644
index 000000000000..810aaaf0081d
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.c
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved.
+ */
+
+#include <linux/types.h>
+
+#include "pcs_types.h"
+#include "pcs_rpc.h"
+#include "pcs_cluster.h"
+#include "pcs_sock_io.h"
+#include "pcs_sock_conn.h"
+#include "pcs_rdma_conn.h"
+#include "pcs_net_addr.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+#include "pcs_cs.h"
+#include "pcs_krpc.h"
+
+static int clnt_input(struct pcs_rpc *ep, struct pcs_msg *msg)
+{
+       struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer;
+
+       switch (h->type) {
+       case PCS_CS_CONG_NOTIFY:
+               if (ep->clnt_cs)
+                       cs_handle_congestion(ep->clnt_cs, h);
+
+               if (ep->clnt_krpc)
+                       krpc_handle_congestion(ep, msg);
+               return 0;
+       default:
+               FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported 
message type %u", h->type);
+               return PCS_ERR_PROTOCOL;
+       }
+}
+
+
+static struct pcs_msg *clnt_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
+{
+       if (h->xid.origin.val & PCS_NODE_ALT_MASK)
+               return cs_get_hdr(ep, h);
+       else
+               return krpc_get_hdr(ep, h);
+}
+
+void clnt_aborting(struct pcs_rpc *ep, int error)
+{
+       pcs_rpc_reset(ep);
+}
+
+static void clnt_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct 
pcs_msg *msg)
+{
+       struct pcs_rpc_hdr *req_h = (struct pcs_rpc_hdr *)msg_inline_head(req);
+
+       if (req_h->xid.origin.val & PCS_NODE_ALT_MASK)
+               cs_keep_waiting(ep, req, msg);
+       else
+               krpc_keep_waiting(ep, req, msg);
+}
+
+static void clnt_connect(struct pcs_rpc *ep)
+{
+       void (*connect_start)(struct pcs_rpc *);
+
+       if (ep->flags & PCS_RPC_F_LOCAL) {
+               char path[128];
+
+               snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" 
CLUSTER_ID_FMT,
+                        (unsigned long long)ep->peer_id.val, 
CLUSTER_ID_ARGS(ep->eng->cluster_id));
+
+               if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 
0)->sun_path)) {
+                       TRACE("Path to local socket is too long: %s", path);
+
+                       ep->flags &= ~PCS_RPC_F_LOCAL;
+                       goto fail;
+               }
+               memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
+               ep->sh.sun.sun_family = AF_UNIX;
+               ep->sh.sa_len = sizeof(struct sockaddr_un);
+               strcpy(ep->sh.sun.sun_path, path);
+               connect_start = pcs_sockconnect_start;
+       } else {
+               /* TODO: print sock addr using pcs_format_netaddr() */
+               if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, 
&ep->sh.sa_len)) {
+                       TRACE("netaddr to sockaddr failed");
+                       goto fail;
+               }
+               connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
+                       pcs_rdmaconnect_start : pcs_sockconnect_start;
+       }
+       ep->state = PCS_RPC_CONNECT;
+       connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
+       return;
+
+fail:
+       pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR);
+       pcs_rpc_reset(ep);
+}
+
+struct pcs_rpc_params clnt_rpc_params = {
+       .alloc_hdr_size         = sizeof(struct pcs_rpc_hdr),
+       .max_msg_size           = PCS_CS_MSG_MAX_SIZE,
+       .holddown_timeout       = HZ,
+       .connect_timeout        = 5*HZ,
+       .response_timeout       = 30*HZ,
+       .max_conn_retry         = 3,
+       .flags                  = 0,
+};
+
+struct pcs_rpc_ops clnt_rpc_ops = {
+       .demux_request          = clnt_input,
+       .get_hdr                        = clnt_get_hdr,
+       .state_change           = clnt_aborting,
+       .connect                        = clnt_connect,
+       .keep_waiting           = clnt_keep_waiting,
+};
+
+
+struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, PCS_NODE_ID_T 
*peer_id,
+                               PCS_NET_ADDR_T *peer_addr, int flags)
+{
+       struct pcs_rpc *ep = NULL;
+
+       /*
+        * It's not expected this function to be called frequently,
+        * slow version of search is acceptable here.
+        */
+       spin_lock(&eng->lock);
+       hlist_for_each_entry(ep, &eng->unhashed, link) {
+               if (memcmp(&ep->peer_id, peer_id, sizeof(ep->peer_id)) == 0) {
+                       pcs_rpc_get(ep);
+                       break;
+               }
+       }
+       spin_unlock(&eng->lock);
+
+       if (ep) {
+               mutex_lock(&ep->mutex);
+               if (ep->state != PCS_RPC_DESTROY)
+                       goto found;
+
+               mutex_unlock(&ep->mutex);
+       }
+
+       /* create a new pcs_rpc instance if found one had been closed by its 
last owner */
+       ep = pcs_rpc_alloc_ep();
+       if (!ep)
+               return NULL;
+
+       pcs_rpc_attach_new_ep(ep, eng);
+       pcs_rpc_configure_new_ep(ep, &clnt_rpc_params, &clnt_rpc_ops);
+
+       pcs_rpc_set_peer_id(ep, peer_id, PCS_NODE_ROLE_CS);
+       pcs_rpc_set_address(ep, peer_addr);
+
+       if (flags & CS_FL_LOCAL_SOCK)
+               ep->flags |= PCS_RPC_F_LOCAL;
+       else
+               ep->flags &= ~PCS_RPC_F_LOCAL;
+
+       mutex_lock(&ep->mutex);
+found:
+       ep->nr_clnts++;
+       mutex_unlock(&ep->mutex);
+
+       return ep;
+}
+
+void pcs_rpc_clnt_close(struct pcs_rpc *ep)
+{
+       mutex_lock(&ep->mutex);
+       BUG_ON(ep->flags & PCS_RPC_F_DEAD);
+       BUG_ON(ep->flags & PCS_RPC_F_PASSIVE);
+
+       ep->nr_clnts--;
+       if (!ep->nr_clnts) {
+               /* close the rpc if we're the last rpc client */
+               ep->flags |= PCS_RPC_F_DEAD;
+               rpc_abort(ep, 1, PCS_ERR_NET_ABORT);
+               ep->state = PCS_RPC_DESTROY;
+       }
+       mutex_unlock(&ep->mutex);
+
+       pcs_rpc_put(ep);
+}
diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.h b/fs/fuse/kio/pcs/pcs_rpc_clnt.h
new file mode 100644
index 000000000000..e94cf96d7079
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.h
@@ -0,0 +1,13 @@
+/*
+ *  Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved.
+ */
+
+#ifndef _PCS_RPC_CLNT_H_
+#define _PCS_RPC_CLNT_H_ 1
+#include "pcs_rpc.h"
+
+struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng,
+               PCS_NODE_ID_T *peer_id, PCS_NET_ADDR_T *peer_addr, int flags);
+void pcs_rpc_clnt_close(struct pcs_rpc *ep);
+
+#endif
-- 
2.39.3 (Apple Git-146)

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to