Ack On Thu, Aug 1, 2024 at 12:12 AM Liu Kui <kui....@virtuozzo.com> wrote: > > When userspace issues krpc abort ioctl cmd, we need to make sure > that all pending msgs must have been cancelled properly, that kernel > can no longer access memory buffers from userspace, before we return > to userspace. However if buffers are being used for IO, the msg > cannot be cancelled, then we must wait until IO completes. > > Signed-off-by: Liu Kui <kui....@virtuozzo.com> > --- > fs/fuse/kio/pcs/pcs_krpc.c | 74 +++++++++++++++++++++++++++-------- > fs/fuse/kio/pcs/pcs_rpc.c | 46 ++++++++++++++++++++-- > fs/fuse/kio/pcs/pcs_rpc.h | 1 + > fs/fuse/kio/pcs/pcs_sock_io.h | 3 +- > 4 files changed, 103 insertions(+), 21 deletions(-) > > diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c > index e2391571561a..f62bf6ef11aa 100644 > --- a/fs/fuse/kio/pcs/pcs_krpc.c > +++ b/fs/fuse/kio/pcs/pcs_krpc.c > @@ -4,10 +4,10 @@ > > #include <linux/types.h> > #include <linux/list.h> > -#include <linux/rbtree.h> > #include <linux/refcount.h> > #include <linux/file.h> > #include <linux/anon_inodes.h> > +#include <linux/delay.h> > > #include "pcs_types.h" > #include "pcs_cluster.h" > @@ -89,9 +89,9 @@ static void krpc_req_complete(struct krpc_req *kreq, int > error) > spin_lock(&krpc->lock); > list_del(&kreq->link); > > - if (kreq->flags & KRPC_REQ_F_ABORTED) { > - krpc_req_free(kreq); > + if (unlikely(kreq->flags & KRPC_REQ_F_ABORTED)) { > spin_unlock(&krpc->lock); > + krpc_req_free(kreq); > } else if (krpc_completion_post(krpc, comp)) { > krpc_req_free(kreq); > } > @@ -166,6 +166,7 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct > pcs_rpc_hdr *h) > resp->get_iter = krpc_msg_get_response_iter; > resp->done = rpc_work_input; > pcs_msg_del_calendar(msg); > + msg->stage = PCS_MSG_STAGE_RECV; > > return resp; > } > @@ -464,11 +465,11 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc > *krpc, struct pcs_krpc_ioc_se > > static int pcs_krpc_abort(struct pcs_krpc *krpc) > { > - struct list_head dispose_list; > - struct krpc_req *kreq; > + struct krpc_req *kreq, *tmp; > struct krpc_completion *comp; > - > - INIT_LIST_HEAD(&dispose_list); > + struct pcs_rpc *ep = krpc->rpc; > + struct pcs_msg *msg; > + int timeout = 1000; /* 10 ms */ > > spin_lock(&krpc->lock); > > @@ -479,23 +480,64 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) > > krpc->state = PCS_KRPC_STATE_ABORTED; > > + /* dispose all unprocessed completions */ > + while (!list_empty(&krpc->completion_queue)) { > + comp = list_first_entry(&krpc->completion_queue, struct > krpc_completion, link); > + list_del(&comp->link); > + krpc_completion_free(comp); > + } > + krpc->nr_completion = 0; > + > /* abort incompleted requests */ > list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue); > - list_for_each_entry(kreq, &krpc->dispose_queue, link) > + spin_unlock(&krpc->lock); > + > + /* nothing to be done */ > + if (list_empty(&krpc->dispose_queue)) > + return 0; > + > + /* abort incomplete requests */ > + mutex_lock(&ep->mutex); > + list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) { > kreq->flags |= KRPC_REQ_F_ABORTED; > + msg = &kreq->msg; > + /* if msg is cancelled, kreq will be removed from the queue */ > + pcs_rpc_cancel_msg(msg); > + } > > - list_splice_tail_init(&krpc->completion_queue, &dispose_list); > - krpc->nr_completion = 0; > + /* > + * The krpc->dispose_queue should be empty if there are no requests in > + * busy state. Otherwise wait until all busy requests complete. This > + * should be a extremely rare case, therefore sleep is acceptable > here. > + * > + * We cannot keep references to busy requests while waiting, because > + * busy requests could have been freed. > + */ > + while (!list_empty(&krpc->dispose_queue)) { > + kreq = list_first_entry(&krpc->dispose_queue, struct > krpc_req, link); > + msg = &kreq->msg; > + > + /* no longer busy and cancelled */ > + if (!pcs_rpc_cancel_msg(msg)) > + continue; > + > + /* seems somthing wrong happened to hardware, abort the rpc */ > + if (timeout == 0) { > + rpc_abort(ep, 0, PCS_ERR_NET_ABORT); > + break; > + } > + mutex_unlock(&ep->mutex); > > - spin_unlock(&krpc->lock); > + /* sleep 10 us */ > + udelay(10); > + timeout--; > > - /* dispose all unprocessed completions */ > - while (!list_empty(&dispose_list)) { > - comp = list_first_entry(&dispose_list, struct > krpc_completion, link); > - list_del(&comp->link); > - krpc_completion_free(comp); > + /* check again */ > + mutex_lock(&ep->mutex); > } > > + mutex_unlock(&ep->mutex); > + > return 0; > } > > diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c > index f990f0f30609..80bbffcb88a8 100644 > --- a/fs/fuse/kio/pcs/pcs_rpc.c > +++ b/fs/fuse/kio/pcs/pcs_rpc.c > @@ -561,6 +561,44 @@ void pcs_rpc_cancel_request(struct pcs_msg * msg) > msg->done(msg); > } > > +int pcs_rpc_cancel_msg(struct pcs_msg *msg) > +{ > + struct pcs_rpc *ep = msg->rpc; > + > + BUG_ON(!ep); > + BUG_ON(!mutex_is_locked(&ep->mutex)); > + > + switch (msg->stage) { > + case PCS_MSG_STAGE_SEND: > + if (msg->netio->tops->cancel_msg(msg)) > + /* > + * Request is under network IO right now, cannot be > cancelled as its > + * buffer could be in use. > + */ > + return -EBUSY; > + break; > + case PCS_MSG_STAGE_RECV: > + /* > + * Response is under network IO right now, the request > message cannot > + * be cancelled as its buffer could be in use. > + */ > + return -EBUSY; > + default: > + break; > + } > + > + /* msg could be in ep->input_queue*/ > + spin_lock(&ep->q_lock); > + list_del(&msg->list); > + spin_unlock(&ep->q_lock); > + > + pcs_msg_del_calendar(msg); > + msg->stage = PCS_MSG_STAGE_NONE; > + msg->done(msg); > + > + return 0; > +} > + > void rpc_work_input(struct pcs_msg * msg) > { > struct pcs_rpc * ep = msg->rpc; > @@ -900,14 +938,14 @@ static void rpc_queue_work(struct work_struct *w) > int repeat; > > again: > - spin_lock(&ep->q_lock); > - list_splice_tail_init(&ep->input_queue, &input_q); > - spin_unlock(&ep->q_lock); > - > mutex_lock(&ep->mutex); > > TRACE("Handle queues\n"); > > + spin_lock(&ep->q_lock); > + list_splice_tail_init(&ep->input_queue, &input_q); > + spin_unlock(&ep->q_lock); > + > /* Process messages which are already in the sock queue */ > if (ep->state == PCS_RPC_WORK) { > struct pcs_netio *netio = (struct pcs_netio *)ep->conn; > diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h > index fe30bede7efe..9a651a812cf7 100644 > --- a/fs/fuse/kio/pcs/pcs_rpc.h > +++ b/fs/fuse/kio/pcs/pcs_rpc.h > @@ -324,5 +324,6 @@ static inline struct pcs_rpc *pcs_rpc_from_work(struct > work_struct *wr) > const char* pcs_rpc_state_name(unsigned state); > > void pcs_rpc_report_error(struct pcs_rpc *ep, unsigned int err); > +int pcs_rpc_cancel_msg(struct pcs_msg *msg); > > #endif /* _PCS_RPC_H_ */ > diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h > index 274eebd94bfc..872faffefe01 100644 > --- a/fs/fuse/kio/pcs/pcs_sock_io.h > +++ b/fs/fuse/kio/pcs/pcs_sock_io.h > @@ -89,7 +89,8 @@ enum > PCS_MSG_STAGE_SEND = 2, /* Message queued on socket queue */ > PCS_MSG_STAGE_SENT = 3, /* Message is sent */ > PCS_MSG_STAGE_WAIT = 4, /* Message is waiting for respnose */ > - PCS_MSG_STAGE_DONE = 5, /* Response received */ > + PCS_MSG_STAGE_RECV = 5, /* Response is being received */ > + PCS_MSG_STAGE_DONE = 6, /* Response received */ > }; > > enum > -- > 2.39.3 (Apple Git-146)
_______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel