This is missing element in previous scalability patch. We removes any limits on direct io submitted to cluster there, which is not right thing to do.
The problem is not trivial. Experiments show we cannot do _any_ shared spinlock in this path, even empty lock-unlock added there reduces performance twice! So, we have to come with scalable solution not using locks. Several approaches were tried, suggested one looks the best. The idea is to create hash table of counters and wait queues, sending thread mapped to a hash bucket based on its pid. We limit number of pending requests in each bucket by max_background / HASH_SIZE. It works surprizingly well, almost reaches results with unlimited queue. Flaw: in case of small number of threads and large aio queues or when cluster latency is high effective per-thread limit can be too small. If we see a problem we have to come out with some solution, yet it must be free of global locks. I used to insist on this simple axiom, but not very aggessively, because actual observed bad effect was small. Now we see the sutuation when it is really large, 50% of loss, and we cannot unsee this. Signed-off-by: Alexey Kuznetsov <kuz...@acronis.com> --- fs/fuse/dev.c | 27 +++++++++++++++++++++++++-- fs/fuse/fuse_i.h | 27 +++++++++++++++++++++++++++ fs/fuse/inode.c | 9 ++++++++- 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index 592cb3a..15fec87b 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -362,9 +362,17 @@ void __fuse_request_end( struct fuse_req *req, bool flush_bg) flush_bg_queue_and_unlock(fc); else spin_unlock(&fc->bg_lock); - } else if (test_bit(FR_NO_ACCT, &req->flags)) + } else if (test_bit(FR_NO_ACCT, &req->flags)) { + unsigned int bkt = req->qhash; + bg = true; + if (atomic_dec_return(&fc->qhash[bkt].num_reqs) < ((2*fc->max_background) / FUSE_QHASH_SIZE)) { + if (waitqueue_active(&fc->qhash[bkt].waitq)) + wake_up(&fc->qhash[bkt].waitq); + } + } + if (test_bit(FR_ASYNC, &req->flags)) { req->args->end(fm, req->args, req->out.h.error); if (!bg) @@ -613,12 +621,25 @@ static int fuse_request_queue_background(struct fuse_req *req) } __set_bit(FR_ISREPLY, &req->flags); - if (fc->kio.op && req->args->async && !nonblocking && + if (fc->kio.op && req->args->async && !nonblocking && READ_ONCE(fc->connected) && (!ff || !test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state))) { int ret = fc->kio.op->req_classify(req, false, false); if (likely(!ret)) { + unsigned int bkt = fuse_qhash_bucket(); __clear_bit(FR_BACKGROUND, &req->flags); __set_bit(FR_NO_ACCT, &req->flags); + if (wait_event_killable_exclusive(fc->qhash[bkt].waitq, + (atomic_read(&fc->qhash[bkt].num_reqs) < + ((2 * fc->max_background) / FUSE_QHASH_SIZE) || + !READ_ONCE(fc->connected) || + (ff && test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state))))) + return -EIO; + if (!READ_ONCE(fc->connected)) + return -ENOTCONN; + if (ff && test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state)) + return -EIO; + req->qhash = bkt; + atomic_inc(&fc->qhash[bkt].num_reqs); fc->kio.op->req_send(req, true); return 0; } else if (ret < 0) @@ -2323,6 +2344,8 @@ void fuse_abort_conn(struct fuse_conn *fc) end_polls(fc); wake_up_all(&fc->blocked_waitq); + for (cpu = 0; cpu < FUSE_QHASH_SIZE; cpu++) + wake_up_all(&fc->qhash[cpu].waitq); spin_unlock(&fc->lock); end_requests(&to_end); diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index d9e27b3..487e112 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -435,6 +435,8 @@ struct fuse_req { /* Request flags, updated with test/set/clear_bit() */ unsigned long flags; + unsigned int qhash; + /* The request input header */ struct { struct fuse_in_header h; @@ -640,6 +642,29 @@ struct fuse_kio_ops { int fuse_register_kio(struct fuse_kio_ops *ops); void fuse_unregister_kio(struct fuse_kio_ops *ops); +#define FUSE_QHASH_SIZE 64 + +#include <linux/jhash.h> + +struct fuse_qhash_queue +{ + atomic_t num_reqs; + wait_queue_head_t waitq; +} ____cacheline_aligned_in_smp; + +#if 0 +static inline unsigned int fuse_qhash_bucket(struct fuse_args * args) +{ + unsigned long val = (unsigned long)args; + return jhash_2words(val & 0xFFFFFFFFU, val >> 32, 0) & (FUSE_QHASH_SIZE - 1); +} +#else +static inline unsigned int fuse_qhash_bucket(void) +{ + return jhash_1word(current->pid, 0) & (FUSE_QHASH_SIZE - 1); +} +#endif + /** * A Fuse connection. * @@ -726,6 +751,8 @@ struct fuse_conn { /** waitq for blocked connection */ wait_queue_head_t blocked_waitq; + struct fuse_qhash_queue qhash[FUSE_QHASH_SIZE]; + /** Connection established, cleared on umount, connection abort and device release */ unsigned connected; diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c index 1eb64b1..33cd8f9 100644 --- a/fs/fuse/inode.c +++ b/fs/fuse/inode.c @@ -498,7 +498,7 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid) struct inode *inode; struct fuse_inode *fi; struct fuse_file *ff; - int err; + int err, i; if (!fc->async_read) { printk(KERN_ERR "Turn async_read ON to use " @@ -523,6 +523,9 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid) /* let them see FUSE_S_FAIL_IMMEDIATELY */ wake_up_all(&fc->blocked_waitq); + for (i = 0; i < FUSE_QHASH_SIZE; i++) + wake_up_all(&fc->qhash[i].waitq); + err = filemap_write_and_wait(inode->i_mapping); if (!err || err == -EIO) { /* AS_EIO might trigger -EIO */ struct fuse_dev *fud; @@ -975,6 +978,10 @@ int fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm, refcount_set(&fc->count, 1); atomic_set(&fc->dev_count, 1); init_waitqueue_head(&fc->blocked_waitq); + for (cpu = 0; cpu < FUSE_QHASH_SIZE; cpu++) { + atomic_set(&fc->qhash[cpu].num_reqs, 0); + init_waitqueue_head(&fc->qhash[cpu].waitq); + } fuse_iqueue_init(&fc->main_iq, fiq_ops, fiq_priv); fc->iqs = alloc_percpu(struct fuse_iqueue); if (!fc->iqs) -- 1.8.3.1 _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel