The commit is pushed to "branch-rh9-5.14.0-427.26.1.vz9.66.x-ovz" and will appear at g...@bitbucket.org:openvz/vzkernel.git after rh9-5.14.0-427.26.1.vz9.66.1 ------> commit a3c3993861a22c251aa40fc7b39aab19ddb2d1a7 Author: Liu Kui <kui....@virtuozzo.com> Date: Wed Jul 31 22:51:31 2024 +0700
fs/fuse kio: fixed krpc abort, implement proper request msg cancellation When userspace issues krpc abort ioctl cmd, we need to make sure that all pending msgs must have been cancelled properly, that kernel can no longer access memory buffers from userspace, before we return to userspace. However if buffers are being used for IO, the msg cannot be cancelled, then we must wait until IO completes. Signed-off-by: Liu Kui <kui....@virtuozzo.com> Acked-by: Alexey Kuznetsov <kuz...@virtuozzo.com> Feature: fuse: kRPC - single RPC for kernel and userspace --- fs/fuse/kio/pcs/pcs_krpc.c | 74 +++++++++++++++++++++++++++++++++---------- fs/fuse/kio/pcs/pcs_rpc.c | 46 ++++++++++++++++++++++++--- fs/fuse/kio/pcs/pcs_rpc.h | 1 + fs/fuse/kio/pcs/pcs_sock_io.h | 3 +- 4 files changed, 103 insertions(+), 21 deletions(-) diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c index 3169c54a2a8b..c08cdb07337b 100644 --- a/fs/fuse/kio/pcs/pcs_krpc.c +++ b/fs/fuse/kio/pcs/pcs_krpc.c @@ -4,10 +4,10 @@ #include <linux/types.h> #include <linux/list.h> -#include <linux/rbtree.h> #include <linux/refcount.h> #include <linux/file.h> #include <linux/anon_inodes.h> +#include <linux/delay.h> #include "pcs_types.h" #include "pcs_cluster.h" @@ -89,9 +89,9 @@ static void krpc_req_complete(struct krpc_req *kreq, int error) spin_lock(&krpc->lock); list_del(&kreq->link); - if (kreq->flags & KRPC_REQ_F_ABORTED) { - krpc_req_free(kreq); + if (unlikely(kreq->flags & KRPC_REQ_F_ABORTED)) { spin_unlock(&krpc->lock); + krpc_req_free(kreq); } else if (krpc_completion_post(krpc, comp)) { krpc_req_free(kreq); } @@ -165,6 +165,7 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) resp->get_iter = krpc_msg_get_response_iter; resp->done = rpc_work_input; pcs_msg_del_calendar(msg); + msg->stage = PCS_MSG_STAGE_RECV; return resp; } @@ -463,11 +464,11 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se static int pcs_krpc_abort(struct pcs_krpc *krpc) { - struct list_head dispose_list; - struct krpc_req *kreq; + struct krpc_req *kreq, *tmp; struct krpc_completion *comp; - - INIT_LIST_HEAD(&dispose_list); + struct pcs_rpc *ep = krpc->rpc; + struct pcs_msg *msg; + int timeout = 1000; /* 10 ms */ spin_lock(&krpc->lock); @@ -478,23 +479,64 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) krpc->state = PCS_KRPC_STATE_ABORTED; + /* dispose all unprocessed completions */ + while (!list_empty(&krpc->completion_queue)) { + comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link); + list_del(&comp->link); + krpc_completion_free(comp); + } + krpc->nr_completion = 0; + /* abort incompleted requests */ list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue); - list_for_each_entry(kreq, &krpc->dispose_queue, link) + spin_unlock(&krpc->lock); + + /* nothing to be done */ + if (list_empty(&krpc->dispose_queue)) + return 0; + + /* abort incomplete requests */ + mutex_lock(&ep->mutex); + list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) { kreq->flags |= KRPC_REQ_F_ABORTED; + msg = &kreq->msg; + /* if msg is cancelled, kreq will be removed from the queue */ + pcs_rpc_cancel_msg(msg); + } - list_splice_tail_init(&krpc->completion_queue, &dispose_list); - krpc->nr_completion = 0; + /* + * The krpc->dispose_queue should be empty if there are no requests in + * busy state. Otherwise wait until all busy requests complete. This + * should be a extremely rare case, therefore sleep is acceptable here. + * + * We cannot keep references to busy requests while waiting, because + * busy requests could have been freed. + */ + while (!list_empty(&krpc->dispose_queue)) { + kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link); + msg = &kreq->msg; + + /* no longer busy and cancelled */ + if (!pcs_rpc_cancel_msg(msg)) + continue; + + /* seems somthing wrong happened to hardware, abort the rpc */ + if (timeout == 0) { + rpc_abort(ep, 0, PCS_ERR_NET_ABORT); + break; + } + mutex_unlock(&ep->mutex); - spin_unlock(&krpc->lock); + /* sleep 10 us */ + udelay(10); + timeout--; - /* dispose all unprocessed completions */ - while (!list_empty(&dispose_list)) { - comp = list_first_entry(&dispose_list, struct krpc_completion, link); - list_del(&comp->link); - krpc_completion_free(comp); + /* check again */ + mutex_lock(&ep->mutex); } + mutex_unlock(&ep->mutex); + return 0; } diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c index deebc1dddf1b..6b54a7bc7596 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.c +++ b/fs/fuse/kio/pcs/pcs_rpc.c @@ -554,6 +554,44 @@ void pcs_rpc_cancel_request(struct pcs_msg * msg) msg->done(msg); } +int pcs_rpc_cancel_msg(struct pcs_msg *msg) +{ + struct pcs_rpc *ep = msg->rpc; + + BUG_ON(!ep); + BUG_ON(!mutex_is_locked(&ep->mutex)); + + switch (msg->stage) { + case PCS_MSG_STAGE_SEND: + if (msg->netio->tops->cancel_msg(msg)) + /* + * Request is under network IO right now, cannot be cancelled as its + * buffer could be in use. + */ + return -EBUSY; + break; + case PCS_MSG_STAGE_RECV: + /* + * Response is under network IO right now, the request message cannot + * be cancelled as its buffer could be in use. + */ + return -EBUSY; + default: + break; + } + + /* msg could be in ep->input_queue*/ + spin_lock(&ep->q_lock); + list_del(&msg->list); + spin_unlock(&ep->q_lock); + + pcs_msg_del_calendar(msg); + msg->stage = PCS_MSG_STAGE_NONE; + msg->done(msg); + + return 0; +} + void rpc_work_input(struct pcs_msg * msg) { struct pcs_rpc * ep = msg->rpc; @@ -893,14 +931,14 @@ static void rpc_queue_work(struct work_struct *w) int repeat; again: - spin_lock(&ep->q_lock); - list_splice_tail_init(&ep->input_queue, &input_q); - spin_unlock(&ep->q_lock); - mutex_lock(&ep->mutex); TRACE("Handle queues\n"); + spin_lock(&ep->q_lock); + list_splice_tail_init(&ep->input_queue, &input_q); + spin_unlock(&ep->q_lock); + /* Process messages which are already in the sock queue */ if (ep->state == PCS_RPC_WORK) { struct pcs_netio *netio = (struct pcs_netio *)ep->conn; diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h index fe30bede7efe..9a651a812cf7 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.h +++ b/fs/fuse/kio/pcs/pcs_rpc.h @@ -324,5 +324,6 @@ static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr) const char* pcs_rpc_state_name(unsigned state); void pcs_rpc_report_error(struct pcs_rpc *ep, unsigned int err); +int pcs_rpc_cancel_msg(struct pcs_msg *msg); #endif /* _PCS_RPC_H_ */ diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h index 274eebd94bfc..872faffefe01 100644 --- a/fs/fuse/kio/pcs/pcs_sock_io.h +++ b/fs/fuse/kio/pcs/pcs_sock_io.h @@ -89,7 +89,8 @@ enum PCS_MSG_STAGE_SEND = 2, /* Message queued on socket queue */ PCS_MSG_STAGE_SENT = 3, /* Message is sent */ PCS_MSG_STAGE_WAIT = 4, /* Message is waiting for respnose */ - PCS_MSG_STAGE_DONE = 5, /* Response received */ + PCS_MSG_STAGE_RECV = 5, /* Response is being received */ + PCS_MSG_STAGE_DONE = 6, /* Response received */ }; enum _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel