This is still not a proper commit, I've lightly tested this.

Replace the spinlock-protected linked list ready list with wfcqueue.

This improves performance under heavy, multi-threaded workloads with
multiple threads calling epoll_wait.

Using my multi-threaded EPOLLONESHOT microbenchmark, performance is
nearly doubled:

$ eponeshotmt -t 4 -w 4 -f 10 -c 1000000

Before:
real    0m 9.58s
user    0m 1.22s
sys     0m 37.08s

After:
real    0m 5.00s
user    0m 1.07s
sys     0m 18.92s

ref: http://yhbt.net/eponeshotmt.c

Unfortunately, there are still regressions for the common,
single threaded, Level Trigger use case.

Things changed/removed:

* ep->ovflist - is no longer needed, the main ready list continues
  to be appended to while we iterate through the transaction list.

* ep_scan_ready_list - not enough generic code between users
  anymore to warrant this.  ep_poll_readyevents_proc (used for
  poll) is read-only, using __wfcq_for_each, while
  ep_send_events (used for epoll_wait) dequeues and needs
  __wfcq_for_each_safe.

* ep->lock renamed to ep->wqlock; this only protects waitqueues now
  (we use trylock to further avoid redundant wakeups)

* EPOLL_CTL_DEL/close() on a ready file will not immediately release
  epitem memory, epoll_wait() must be called since there's no way to
  delete a ready item from wfcqueue in O(1) time.  In practice this
  should not be a problem, any valid app using epoll must call
  epoll_wait occasionally.  Unfreed epitems still count against
  max_user_watches to protect against local DoS.  This should be the
  only possibly-noticeable change (in case there's an app that blindly
  adds/deletes things from the rbtree but never calls epoll_wait)

Changes since v1:
* fixed memory leak with pre-existing zombies in ep_free
* updated to use the latest wfcqueue.h APIs
* switched to using __wfcq_splice and a global transaction list
  (this is like the old txlist in ep_scan_ready_list)
* redundant wakeups avoided in ep_notify_waiters:
  - only attempt a wakeup when an item is enqueued the first time
  - use spin_trylock_irqsave when attempting notification, since a
    failed lock means either another task is already waking, or
    ep_poll is already running and will check anyways upon releasing
    wqlock, anyways.
* explicitly cache-aligned rdltail in SMP
* added ep_item_state for atomically reading epi->state with barrier
  (avoids WARN_ON in ep_send_events)
* reverted epi->nwait removal, it was not necessary
  sizeof(epitem) is still <= 128 bytes on 64-bit machines

Changes since v2:
* epi->state is no longer atomic, we only cmpxchg in ep_poll_callback
  now and rely on implicit barriers in other places for reading.
* intermediate EP_STATE_DEQUEUE removed, this (with xchg) caused too
  much overhead in the ep_send_events loop and could not eliminate
  starvation dangers from improper EPOLLET usage (the original code
  had this problem, too, the window is just a few cycles larger, now).
* minor code cleanups

Lightly-tested-by: Eric Wong <normalper...@yhbt.net>
Cc: Davide Libenzi <davi...@xmailserver.org>
Cc: Al Viro <v...@zeniv.linux.org.uk>
Cc: Andrew Morton <a...@linux-foundation.org>
Cc: Mathieu Desnoyers <mathieu.desnoy...@efficios.com>
---

 fs/eventpoll.c | 615 ++++++++++++++++++++++++++-------------------------------
 1 file changed, 276 insertions(+), 339 deletions(-)

diff --git a/fs/eventpoll.c b/fs/eventpoll.c
index a81f0ea..e039555 100644
--- a/fs/eventpoll.c
+++ b/fs/eventpoll.c
@@ -40,6 +40,7 @@
 #include <linux/atomic.h>
 #include <linux/proc_fs.h>
 #include <linux/seq_file.h>
+#include <linux/wfcqueue.h>
 
 /*
  * LOCKING:
@@ -47,15 +48,13 @@
  *
  * 1) epmutex (mutex)
  * 2) ep->mtx (mutex)
- * 3) ep->lock (spinlock)
+ * 3) ep->wqlock (spinlock)
  *
- * The acquire order is the one listed above, from 1 to 3.
- * We need a spinlock (ep->lock) because we manipulate objects
- * from inside the poll callback, that might be triggered from
- * a wake_up() that in turn might be called from IRQ context.
- * So we can't sleep inside the poll callback and hence we need
- * a spinlock. During the event transfer loop (from kernel to
- * user space) we could end up sleeping due a copy_to_user(), so
+ * The acquire order is the one listed above, from 1 to 2.
+ *
+ * We only have a spinlock (ep->wqlock) to manipulate the waitqueues.
+ * During the event transfer loop (from kernel to user space)
+ * we could end up sleeping due a copy_to_user(), so
  * we need a lock that will allow us to sleep. This lock is a
  * mutex (ep->mtx). It is acquired during the event transfer loop,
  * during epoll_ctl(EPOLL_CTL_DEL) and during eventpoll_release_file().
@@ -82,8 +81,8 @@
  * of epoll file descriptors, we use the current recursion depth as
  * the lockdep subkey.
  * It is possible to drop the "ep->mtx" and to use the global
- * mutex "epmutex" (together with "ep->lock") to have it working,
- * but having "ep->mtx" will make the interface more scalable.
+ * mutex "epmutex" to have it working,  but having "ep->mtx" will
+ * make the interface more scalable.
  * Events that require holding "epmutex" are very rare, while for
  * normal operations the epoll private "ep->mtx" will guarantee
  * a better scalability.
@@ -126,6 +125,21 @@ struct nested_calls {
 };
 
 /*
+ * epitem state transitions               callers
+ * ---------------------------------------------------------------------------
+ * EP_STATE_IDLE    -> EP_STATE_READY     ep_poll_callback, ep_enqueue_process
+ * EP_STATE_READY   -> EP_STATE_IDLE      ep_send_events
+ * EP_STATE_IDLE    -> kmem_cache_free    ep_remove
+ * EP_STATE_READY   -> EP_STATE_ZOMBIE    ep_remove
+ * EP_STATE_ZOMBIE  -> kmem_cache_free    ep_send_events
+ */
+enum epoll_item_state {
+       EP_STATE_ZOMBIE = -1,
+       EP_STATE_IDLE = 0,
+       EP_STATE_READY = 1
+};
+
+/*
  * Each file descriptor added to the eventpoll interface will
  * have an entry of this type linked to the "rbr" RB tree.
  * Avoid increasing the size of this struct, there can be many thousands
@@ -136,13 +150,7 @@ struct epitem {
        struct rb_node rbn;
 
        /* List header used to link this structure to the eventpoll ready list 
*/
-       struct list_head rdllink;
-
-       /*
-        * Works together "struct eventpoll"->ovflist in keeping the
-        * single linked chain of items.
-        */
-       struct epitem *next;
+       struct wfcq_node rdllink;
 
        /* The file descriptor information this item refers to */
        struct epoll_filefd ffd;
@@ -150,6 +158,9 @@ struct epitem {
        /* Number of active wait queue attached to poll operations */
        int nwait;
 
+       /* state of this item */
+       enum epoll_item_state state;
+
        /* List containing poll wait queues */
        struct list_head pwqlist;
 
@@ -172,37 +183,38 @@ struct epitem {
  * interface.
  */
 struct eventpoll {
-       /* Protect the access to this structure */
-       spinlock_t lock;
-
        /*
         * This mutex is used to ensure that files are not removed
         * while epoll is using them. This is held during the event
         * collection loop, the file cleanup path, the epoll file exit
         * code and the ctl operations.
+        * This also protects rdlhead, txlhead, and txltail.
         */
        struct mutex mtx;
 
+       /* head of the enqueue ready list */
+       struct wfcq_head rdlhead;
+
+       /*
+        * transactional ready list, these are only accessed when ep->mtx
+        * is held.  We splice into these (from rdlhead) and work on this list
+        */
+       struct wfcq_head txlhead;
+       struct wfcq_tail txltail;
+
+       /* Protect the access to wait queues */
+       spinlock_t wqlock;
+
        /* Wait queue used by sys_epoll_wait() */
        wait_queue_head_t wq;
 
        /* Wait queue used by file->poll() */
        wait_queue_head_t poll_wait;
 
-       /* List of ready file descriptors */
-       struct list_head rdllist;
-
        /* RB tree root used to store monitored fd structs */
        struct rb_root rbr;
 
-       /*
-        * This is a single linked list that chains all the "struct epitem" that
-        * happened while transferring ready events to userspace w/out
-        * holding ->lock.
-        */
-       struct epitem *ovflist;
-
-       /* wakeup_source used when ep_scan_ready_list is running */
+       /* wakeup_source used when ep_send_events is running */
        struct wakeup_source *ws;
 
        /* The user that created the eventpoll descriptor */
@@ -213,6 +225,8 @@ struct eventpoll {
        /* used to optimize loop detection check */
        int visited;
        struct list_head visited_list_link;
+
+       struct wfcq_tail rdltail ____cacheline_aligned_in_smp;
 };
 
 /* Wait structure used by the poll hooks */
@@ -239,12 +253,6 @@ struct ep_pqueue {
        struct epitem *epi;
 };
 
-/* Used by the ep_send_events() function as callback private data */
-struct ep_send_events_data {
-       int maxevents;
-       struct epoll_event __user *events;
-};
-
 /*
  * Configuration options available inside /proc/sys/fs/epoll/
  */
@@ -347,6 +355,12 @@ static inline struct epitem 
*ep_item_from_epqueue(poll_table *p)
        return container_of(p, struct ep_pqueue, pt)->epi;
 }
 
+/* Get the "struct epitem" from a wfcq node */
+static inline struct epitem *ep_item_from_node(struct wfcq_node *p)
+{
+       return container_of(p, struct epitem, rdllink);
+}
+
 /* Tells if the epoll_ctl(2) operation needs an event copy from userspace */
 static inline int ep_op_has_event(int op)
 {
@@ -360,17 +374,30 @@ static void ep_nested_calls_init(struct nested_calls 
*ncalls)
        spin_lock_init(&ncalls->lock);
 }
 
+/* splice newly queued ready items onto our transaction list */
+static inline enum wfcq_ret ep_ready_prepare(struct eventpoll *ep)
+{
+       lockdep_assert_held(&ep->mtx);
+
+       return __wfcq_splice(&ep->txlhead, &ep->txltail,
+                               &ep->rdlhead, &ep->rdltail);
+}
+
 /**
  * ep_events_available - Checks if ready events might be available.
+ *                       This must be called with ep->mtx held
  *
  * @ep: Pointer to the eventpoll context.
  *
- * Returns: Returns a value different than zero if ready events are available,
- *          or zero otherwise.
+ * Returns: Returns true ready events are available, or false otherwise.
  */
-static inline int ep_events_available(struct eventpoll *ep)
+static inline bool ep_events_available(struct eventpoll *ep)
 {
-       return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
+       enum wfcq_ret ret = ep_ready_prepare(ep);
+
+       if (ret == WFCQ_RET_SRC_EMPTY)
+               return ! wfcq_empty(&ep->txlhead, &ep->txltail);
+       return true;
 }
 
 /**
@@ -507,6 +534,34 @@ static void ep_poll_safewake(wait_queue_head_t *wq)
        put_cpu();
 }
 
+/* try to wake up any processes in epoll_(p)wait or poll() users with epoll */
+static void ep_notify_waiters(struct eventpoll *ep)
+{
+       unsigned long flags;
+       int pwake;
+
+       /*
+        * If we can't get the wqlock, this means either:
+        * ep_poll is working on it and will recheck the ready list when
+        * it releases wqlock anyways
+        * ...Or another task is running this function and another wakeup
+        * would be redundant
+        */
+       if (spin_trylock_irqsave(&ep->wqlock, flags)) {
+               /* Notify epoll_wait tasks  */
+               if (waitqueue_active(&ep->wq))
+                       wake_up_locked(&ep->wq);
+
+               pwake = waitqueue_active(&ep->poll_wait);
+
+               spin_unlock_irqrestore(&ep->wqlock, flags);
+
+               /* Notify the ->poll() wait list (may be outside lock) */
+               if (pwake)
+                       ep_poll_safewake(&ep->poll_wait);
+       }
+}
+
 static void ep_remove_wait_queue(struct eppoll_entry *pwq)
 {
        wait_queue_head_t *whead;
@@ -570,104 +625,11 @@ static inline void ep_pm_stay_awake_rcu(struct epitem 
*epi)
        rcu_read_unlock();
 }
 
-/**
- * ep_scan_ready_list - Scans the ready list in a way that makes possible for
- *                      the scan code, to call f_op->poll(). Also allows for
- *                      O(NumReady) performance.
- *
- * @ep: Pointer to the epoll private data structure.
- * @sproc: Pointer to the scan callback.
- * @priv: Private opaque data passed to the @sproc callback.
- * @depth: The current depth of recursive f_op->poll calls.
- *
- * Returns: The same integer error code returned by the @sproc callback.
- */
-static int ep_scan_ready_list(struct eventpoll *ep,
-                             int (*sproc)(struct eventpoll *,
-                                          struct list_head *, void *),
-                             void *priv,
-                             int depth)
+static noinline void ep_reap(struct eventpoll *ep, struct epitem *epi)
 {
-       int error, pwake = 0;
-       unsigned long flags;
-       struct epitem *epi, *nepi;
-       LIST_HEAD(txlist);
-
-       /*
-        * We need to lock this because we could be hit by
-        * eventpoll_release_file() and epoll_ctl().
-        */
-       mutex_lock_nested(&ep->mtx, depth);
-
-       /*
-        * Steal the ready list, and re-init the original one to the
-        * empty list. Also, set ep->ovflist to NULL so that events
-        * happening while looping w/out locks, are not lost. We cannot
-        * have the poll callback to queue directly on ep->rdllist,
-        * because we want the "sproc" callback to be able to do it
-        * in a lockless way.
-        */
-       spin_lock_irqsave(&ep->lock, flags);
-       list_splice_init(&ep->rdllist, &txlist);
-       ep->ovflist = NULL;
-       spin_unlock_irqrestore(&ep->lock, flags);
-
-       /*
-        * Now call the callback function.
-        */
-       error = (*sproc)(ep, &txlist, priv);
-
-       spin_lock_irqsave(&ep->lock, flags);
-       /*
-        * During the time we spent inside the "sproc" callback, some
-        * other events might have been queued by the poll callback.
-        * We re-insert them inside the main ready-list here.
-        */
-       for (nepi = ep->ovflist; (epi = nepi) != NULL;
-            nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
-               /*
-                * We need to check if the item is already in the list.
-                * During the "sproc" callback execution time, items are
-                * queued into ->ovflist but the "txlist" might already
-                * contain them, and the list_splice() below takes care of them.
-                */
-               if (!ep_is_linked(&epi->rdllink)) {
-                       list_add_tail(&epi->rdllink, &ep->rdllist);
-                       ep_pm_stay_awake(epi);
-               }
-       }
-       /*
-        * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
-        * releasing the lock, events will be queued in the normal way inside
-        * ep->rdllist.
-        */
-       ep->ovflist = EP_UNACTIVE_PTR;
-
-       /*
-        * Quickly re-inject items left on "txlist".
-        */
-       list_splice(&txlist, &ep->rdllist);
-       __pm_relax(ep->ws);
-
-       if (!list_empty(&ep->rdllist)) {
-               /*
-                * Wake up (if active) both the eventpoll wait list and
-                * the ->poll() wait list (delayed after we release the lock).
-                */
-               if (waitqueue_active(&ep->wq))
-                       wake_up_locked(&ep->wq);
-               if (waitqueue_active(&ep->poll_wait))
-                       pwake++;
-       }
-       spin_unlock_irqrestore(&ep->lock, flags);
-
-       mutex_unlock(&ep->mtx);
-
-       /* We have to call this outside the lock */
-       if (pwake)
-               ep_poll_safewake(&ep->poll_wait);
-
-       return error;
+       wakeup_source_unregister(ep_wakeup_source(epi));
+       kmem_cache_free(epi_cache, epi);
+       atomic_long_dec(&ep->user->epoll_watches);
 }
 
 /*
@@ -676,17 +638,9 @@ static int ep_scan_ready_list(struct eventpoll *ep,
  */
 static int ep_remove(struct eventpoll *ep, struct epitem *epi)
 {
-       unsigned long flags;
        struct file *file = epi->ffd.file;
 
-       /*
-        * Removes poll wait queue hooks. We _have_ to do this without holding
-        * the "ep->lock" otherwise a deadlock might occur. This because of the
-        * sequence of the lock acquisition. Here we do "ep->lock" then the wait
-        * queue head lock when unregistering the wait queue. The wakeup 
callback
-        * will run by holding the wait queue head lock and will call our 
callback
-        * that will try to get "ep->lock".
-        */
+       /* Removes poll wait queue hooks, epi->state cannot change after this */
        ep_unregister_pollwait(ep, epi);
 
        /* Remove the current item from the list of epoll hooks */
@@ -697,17 +651,12 @@ static int ep_remove(struct eventpoll *ep, struct epitem 
*epi)
 
        rb_erase(&epi->rbn, &ep->rbr);
 
-       spin_lock_irqsave(&ep->lock, flags);
-       if (ep_is_linked(&epi->rdllink))
-               list_del_init(&epi->rdllink);
-       spin_unlock_irqrestore(&ep->lock, flags);
-
-       wakeup_source_unregister(ep_wakeup_source(epi));
-
-       /* At this point it is safe to free the eventpoll item */
-       kmem_cache_free(epi_cache, epi);
-
-       atomic_long_dec(&ep->user->epoll_watches);
+       /* rely on ep_send_events to cleanup if we got enqueued */
+       if (epi->state == EP_STATE_READY)
+               epi->state = EP_STATE_ZOMBIE;
+       else
+               /* At this point it is safe to free the eventpoll item */
+               ep_reap(ep, epi);
 
        return 0;
 }
@@ -716,6 +665,7 @@ static void ep_free(struct eventpoll *ep)
 {
        struct rb_node *rbp;
        struct epitem *epi;
+       struct wfcq_node *node, *n;
 
        /* We need to release all tasks waiting for these file */
        if (waitqueue_active(&ep->poll_wait))
@@ -740,17 +690,27 @@ static void ep_free(struct eventpoll *ep)
                ep_unregister_pollwait(ep, epi);
        }
 
+       /* We only lock ep->mtx to prevent a lockdep warning */
+       mutex_lock(&ep->mtx);
+
+       /* reap zombies, these are no longer in ep->rbr */
+       ep_ready_prepare(ep);
+       __wfcq_for_each_safe(&ep->txlhead, &ep->txltail, node, n) {
+               epi = ep_item_from_node(node);
+               __wfcq_dequeue(&ep->txlhead, &ep->txltail);
+               if (epi->state == EP_STATE_ZOMBIE)
+                       ep_reap(ep, epi);
+       }
+
        /*
         * Walks through the whole tree by freeing each "struct epitem". At this
         * point we are sure no poll callbacks will be lingering around, and 
also by
         * holding "epmutex" we can be sure that no file cleanup code will hit
-        * us during this operation. So we can avoid the lock on "ep->lock".
-        * We do not need to lock ep->mtx, either, we only do it to prevent
-        * a lockdep warning.
+        * us during this operation.
         */
-       mutex_lock(&ep->mtx);
        while ((rbp = rb_first(&ep->rbr)) != NULL) {
                epi = rb_entry(rbp, struct epitem, rbn);
+               epi->state = EP_STATE_IDLE; /* prevent new zombies */
                ep_remove(ep, epi);
        }
        mutex_unlock(&ep->mtx);
@@ -779,34 +739,50 @@ static inline unsigned int ep_item_poll(struct epitem 
*epi, poll_table *pt)
        return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
 }
 
-static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head,
-                              void *priv)
+/* used our own poll() implementation */
+static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)
 {
-       struct epitem *epi, *tmp;
+       struct eventpoll *ep = priv;
+       struct epitem *epi;
+       struct wfcq_node *node;
+       int ret = 0;
        poll_table pt;
 
        init_poll_funcptr(&pt, NULL);
 
-       list_for_each_entry_safe(epi, tmp, head, rdllink) {
-               if (ep_item_poll(epi, &pt))
-                       return POLLIN | POLLRDNORM;
-               else {
+       /*
+        * this iteration should be safe because:
+        * 1) ep->mtx is locked, preventing dequeue and epi invalidation
+        * 2) stops at the tail when we started iteration
+        */
+       mutex_lock_nested(&ep->mtx, call_nests + 1);
+       ep_ready_prepare(ep);
+       __wfcq_for_each(&ep->txlhead, &ep->txltail, node) {
+               epi = ep_item_from_node(node);
+
+               /* zombie item, let ep_send_events cleanup */
+               if (epi->state == EP_STATE_ZOMBIE)
+                       continue;
+
+               pt._key = epi->event.events;
+
+               if (ep_item_poll(epi, &pt)) {
+                       ret = POLLIN | POLLRDNORM;
+                       break;
+               } else {
                        /*
                         * Item has been dropped into the ready list by the poll
                         * callback, but it's not actually ready, as far as
-                        * caller requested events goes. We can remove it here.
+                        * caller requested events goes.
+                        * We cannot remove it here, ep_send_events will
+                        * recheck and remove if possible
                         */
                        __pm_relax(ep_wakeup_source(epi));
-                       list_del_init(&epi->rdllink);
                }
        }
+       mutex_unlock(&ep->mtx);
 
-       return 0;
-}
-
-static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)
-{
-       return ep_scan_ready_list(priv, ep_read_events_proc, NULL, call_nests + 
1);
+       return ret;
 }
 
 static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
@@ -913,13 +889,13 @@ static int ep_alloc(struct eventpoll **pep)
        if (unlikely(!ep))
                goto free_uid;
 
-       spin_lock_init(&ep->lock);
+       spin_lock_init(&ep->wqlock);
        mutex_init(&ep->mtx);
        init_waitqueue_head(&ep->wq);
        init_waitqueue_head(&ep->poll_wait);
-       INIT_LIST_HEAD(&ep->rdllist);
+       wfcq_init(&ep->rdlhead, &ep->rdltail);
+       wfcq_init(&ep->txlhead, &ep->txltail);
        ep->rbr = RB_ROOT;
-       ep->ovflist = EP_UNACTIVE_PTR;
        ep->user = user;
 
        *pep = ep;
@@ -960,6 +936,14 @@ static struct epitem *ep_find(struct eventpoll *ep, struct 
file *file, int fd)
        return epir;
 }
 
+/* returns true if successfully marked ready, false if it was already ready */
+static inline bool ep_mark_ready(struct epitem *epi)
+{
+       /* We only want to go from idle -> ready */
+       return (cmpxchg(&epi->state, EP_STATE_IDLE, EP_STATE_READY)
+               == EP_STATE_IDLE);
+}
+
 /*
  * This is the callback that is passed to the wait queue wakeup
  * mechanism. It is called by the stored file descriptors when they
@@ -967,8 +951,6 @@ static struct epitem *ep_find(struct eventpoll *ep, struct 
file *file, int fd)
  */
 static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void 
*key)
 {
-       int pwake = 0;
-       unsigned long flags;
        struct epitem *epi = ep_item_from_wait(wait);
        struct eventpoll *ep = epi->ep;
 
@@ -983,7 +965,8 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned 
mode, int sync, void *k
                list_del_init(&wait->task_list);
        }
 
-       spin_lock_irqsave(&ep->lock, flags);
+       /* pairs with smp_mb in ep_modify */
+       smp_rmb();
 
        /*
         * If the event mask does not contain any poll(2) event, we consider the
@@ -992,7 +975,7 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned 
mode, int sync, void *k
         * until the next EPOLL_CTL_MOD will be issued.
         */
        if (!(epi->event.events & ~EP_PRIVATE_BITS))
-               goto out_unlock;
+               return 1;
 
        /*
         * Check the events coming with the callback. At this stage, not
@@ -1001,52 +984,14 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned 
mode, int sync, void *k
         * test for "key" != NULL before the event match test.
         */
        if (key && !((unsigned long) key & epi->event.events))
-               goto out_unlock;
-
-       /*
-        * If we are transferring events to userspace, we can hold no locks
-        * (because we're accessing user memory, and because of linux 
f_op->poll()
-        * semantics). All the events that happen during that period of time are
-        * chained in ep->ovflist and requeued later on.
-        */
-       if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
-               if (epi->next == EP_UNACTIVE_PTR) {
-                       epi->next = ep->ovflist;
-                       ep->ovflist = epi;
-                       if (epi->ws) {
-                               /*
-                                * Activate ep->ws since epi->ws may get
-                                * deactivated at any time.
-                                */
-                               __pm_stay_awake(ep->ws);
-                       }
+               return 1;
 
-               }
-               goto out_unlock;
-       }
-
-       /* If this file is already in the ready list we exit soon */
-       if (!ep_is_linked(&epi->rdllink)) {
-               list_add_tail(&epi->rdllink, &ep->rdllist);
+       if (ep_mark_ready(epi)) {
+               wfcq_enqueue(&ep->rdlhead, &ep->rdltail, &epi->rdllink);
                ep_pm_stay_awake_rcu(epi);
+               ep_notify_waiters(ep);
        }
 
-       /*
-        * Wake up ( if active ) both the eventpoll wait list and the ->poll()
-        * wait list.
-        */
-       if (waitqueue_active(&ep->wq))
-               wake_up_locked(&ep->wq);
-       if (waitqueue_active(&ep->poll_wait))
-               pwake++;
-
-out_unlock:
-       spin_unlock_irqrestore(&ep->lock, flags);
-
-       /* We have to call this outside the lock */
-       if (pwake)
-               ep_poll_safewake(&ep->poll_wait);
-
        return 1;
 }
 
@@ -1224,14 +1169,23 @@ static noinline void ep_destroy_wakeup_source(struct 
epitem *epi)
        wakeup_source_unregister(ws);
 }
 
+/* enqueue an epitem in process context via epoll_ctl */
+static void ep_enqueue_process(struct eventpoll *ep, struct epitem *epi)
+{
+       if (ep_mark_ready(epi)) {
+               wfcq_enqueue(&ep->rdlhead, &ep->rdltail, &epi->rdllink);
+               ep_pm_stay_awake(epi);
+               ep_notify_waiters(ep);
+       }
+}
+
 /*
  * Must be called with "mtx" held.
  */
 static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
                     struct file *tfile, int fd)
 {
-       int error, revents, pwake = 0;
-       unsigned long flags;
+       int error, revents;
        long user_watches;
        struct epitem *epi;
        struct ep_pqueue epq;
@@ -1243,14 +1197,14 @@ static int ep_insert(struct eventpoll *ep, struct 
epoll_event *event,
                return -ENOMEM;
 
        /* Item initialization follow here ... */
-       INIT_LIST_HEAD(&epi->rdllink);
+       wfcq_node_init(&epi->rdllink);
        INIT_LIST_HEAD(&epi->fllink);
        INIT_LIST_HEAD(&epi->pwqlist);
        epi->ep = ep;
+       epi->state = EP_STATE_IDLE;
        ep_set_ffd(&epi->ffd, tfile, fd);
-       epi->event = *event;
        epi->nwait = 0;
-       epi->next = EP_UNACTIVE_PTR;
+       epi->event = *event;
        if (epi->event.events & EPOLLWAKEUP) {
                error = ep_create_wakeup_source(epi);
                if (error)
@@ -1297,28 +1251,11 @@ static int ep_insert(struct eventpoll *ep, struct 
epoll_event *event,
        if (reverse_path_check())
                goto error_remove_epi;
 
-       /* We have to drop the new item inside our item list to keep track of 
it */
-       spin_lock_irqsave(&ep->lock, flags);
-
-       /* If the file is already "ready" we drop it inside the ready list */
-       if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
-               list_add_tail(&epi->rdllink, &ep->rdllist);
-               ep_pm_stay_awake(epi);
-
-               /* Notify waiting tasks that events are available */
-               if (waitqueue_active(&ep->wq))
-                       wake_up_locked(&ep->wq);
-               if (waitqueue_active(&ep->poll_wait))
-                       pwake++;
-       }
-
-       spin_unlock_irqrestore(&ep->lock, flags);
-
        atomic_long_inc(&ep->user->epoll_watches);
 
-       /* We have to call this outside the lock */
-       if (pwake)
-               ep_poll_safewake(&ep->poll_wait);
+       /* If the file is already "ready" we drop it inside the ready list */
+       if (revents & event->events)
+               ep_enqueue_process(ep, epi);
 
        return 0;
 
@@ -1331,18 +1268,26 @@ error_remove_epi:
        rb_erase(&epi->rbn, &ep->rbr);
 
 error_unregister:
+       /* ep->state cannot be set to EP_STATE_READY after this: */
        ep_unregister_pollwait(ep, epi);
 
        /*
+        * ep_poll_callback may've fired during ep_unregister_pollwait
+        * and set epi->state via cmpxchg, ensure we see it
+        */
+       smp_rmb();
+
+       /*
+        * Rely on ep_send_events to free epi if we got enqueued.
         * We need to do this because an event could have been arrived on some
-        * allocated wait queue. Note that we don't care about the ep->ovflist
-        * list, since that is used/cleaned only inside a section bound by 
"mtx".
-        * And ep_insert() is called with "mtx" held.
+        * allocated wait queue.
         */
-       spin_lock_irqsave(&ep->lock, flags);
-       if (ep_is_linked(&epi->rdllink))
-               list_del_init(&epi->rdllink);
-       spin_unlock_irqrestore(&ep->lock, flags);
+       if (epi->state == EP_STATE_READY) {
+               /* count the zombie against user watches to prevent OOM DoS */
+               atomic_long_inc(&ep->user->epoll_watches);
+               epi->state = EP_STATE_ZOMBIE;
+               return error;
+       }
 
        wakeup_source_unregister(ep_wakeup_source(epi));
 
@@ -1358,7 +1303,6 @@ error_create_wakeup_source:
  */
 static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct 
epoll_event *event)
 {
-       int pwake = 0;
        unsigned int revents;
        poll_table pt;
 
@@ -1384,9 +1328,7 @@ static int ep_modify(struct eventpoll *ep, struct epitem 
*epi, struct epoll_even
         * 1) Flush epi changes above to other CPUs.  This ensures
         *    we do not miss events from ep_poll_callback if an
         *    event occurs immediately after we call f_op->poll().
-        *    We need this because we did not take ep->lock while
-        *    changing epi above (but ep_poll_callback does take
-        *    ep->lock).
+        *    This pairs with the first smb_rmb in ep_poll_callback
         *
         * 2) We also need to ensure we do not miss _past_ events
         *    when calling f_op->poll().  This barrier also
@@ -1408,49 +1350,42 @@ static int ep_modify(struct eventpoll *ep, struct 
epitem *epi, struct epoll_even
         * If the item is "hot" and it is not registered inside the ready
         * list, push it inside.
         */
-       if (revents & event->events) {
-               spin_lock_irq(&ep->lock);
-               if (!ep_is_linked(&epi->rdllink)) {
-                       list_add_tail(&epi->rdllink, &ep->rdllist);
-                       ep_pm_stay_awake(epi);
-
-                       /* Notify waiting tasks that events are available */
-                       if (waitqueue_active(&ep->wq))
-                               wake_up_locked(&ep->wq);
-                       if (waitqueue_active(&ep->poll_wait))
-                               pwake++;
-               }
-               spin_unlock_irq(&ep->lock);
-       }
-
-       /* We have to call this outside the lock */
-       if (pwake)
-               ep_poll_safewake(&ep->poll_wait);
+       if (revents & event->events)
+               ep_enqueue_process(ep, epi);
 
        return 0;
 }
 
-static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
-                              void *priv)
+static int ep_send_events(struct eventpoll *ep,
+                         struct epoll_event __user *uevent, int maxevents)
 {
-       struct ep_send_events_data *esed = priv;
-       int eventcnt;
+       int eventcnt = 0;
        unsigned int revents;
        struct epitem *epi;
-       struct epoll_event __user *uevent;
        struct wakeup_source *ws;
+       struct wfcq_node *node, *n;
+       enum epoll_item_state state;
        poll_table pt;
 
        init_poll_funcptr(&pt, NULL);
 
        /*
-        * We can loop without lock because we are passed a task private list.
-        * Items cannot vanish during the loop because ep_scan_ready_list() is
-        * holding "mtx" during this call.
+        * Items cannot vanish during the loop because we are holding
+        * "mtx" during this call.
         */
-       for (eventcnt = 0, uevent = esed->events;
-            !list_empty(head) && eventcnt < esed->maxevents;) {
-               epi = list_first_entry(head, struct epitem, rdllink);
+       __wfcq_for_each_safe(&ep->txlhead, &ep->txltail, node, n) {
+               epi = ep_item_from_node(node);
+               __wfcq_dequeue(&ep->txlhead, &ep->txltail);
+
+               /* no barrier needed, splicing txl should be enough */
+               state = epi->state;
+
+               if (state == EP_STATE_ZOMBIE) {
+                       ep_reap(ep, epi);
+                       continue;
+               }
+               WARN_ON(state != EP_STATE_READY);
+               wfcq_node_init(&epi->rdllink);
 
                /*
                 * Activate ep->ws before deactivating epi->ws to prevent
@@ -1459,7 +1394,7 @@ static int ep_send_events_proc(struct eventpoll *ep, 
struct list_head *head,
                 *
                 * This could be rearranged to delay the deactivation of epi->ws
                 * instead, but then epi->ws would temporarily be out of sync
-                * with ep_is_linked().
+                * with epi->state.
                 */
                ws = ep_wakeup_source(epi);
                if (ws) {
@@ -1468,25 +1403,29 @@ static int ep_send_events_proc(struct eventpoll *ep, 
struct list_head *head,
                        __pm_relax(ws);
                }
 
-               list_del_init(&epi->rdllink);
-
                revents = ep_item_poll(epi, &pt);
 
                /*
                 * If the event mask intersect the caller-requested one,
-                * deliver the event to userspace. Again, ep_scan_ready_list()
+                * deliver the event to userspace. Again, ep_poll()
                 * is holding "mtx", so no operations coming from userspace
                 * can change the item.
                 */
                if (revents) {
                        if (__put_user(revents, &uevent->events) ||
                            __put_user(epi->event.data, &uevent->data)) {
-                               list_add(&epi->rdllink, head);
+                               wfcq_enqueue(&ep->txlhead, &ep->txltail,
+                                                       &epi->rdllink);
                                ep_pm_stay_awake(epi);
-                               return eventcnt ? eventcnt : -EFAULT;
+                               if (!eventcnt)
+                                       eventcnt = -EFAULT;
+                               break;
                        }
-                       eventcnt++;
+
                        uevent++;
+                       if (++eventcnt == maxevents)
+                               n = NULL; /* stop iteration */
+
                        if (epi->event.events & EPOLLONESHOT)
                                epi->event.events &= EP_PRIVATE_BITS;
                        else if (!(epi->event.events & EPOLLET)) {
@@ -1495,32 +1434,25 @@ static int ep_send_events_proc(struct eventpoll *ep, 
struct list_head *head,
                                 * Trigger mode, we need to insert back inside
                                 * the ready list, so that the next call to
                                 * epoll_wait() will check again the events
-                                * availability. At this point, no one can 
insert
-                                * into ep->rdllist besides us. The epoll_ctl()
-                                * callers are locked out by
-                                * ep_scan_ready_list() holding "mtx" and the
-                                * poll callback will queue them in ep->ovflist.
+                                * availability.
                                 */
-                               list_add_tail(&epi->rdllink, &ep->rdllist);
+                               wfcq_enqueue(&ep->rdlhead, &ep->rdltail,
+                                                       &epi->rdllink);
                                ep_pm_stay_awake(epi);
+                               continue;
                        }
                }
+
+               /*
+                * reset item state for EPOLLONESHOT and EPOLLET
+                * no barrier here, rely on ep->mtx release for write barrier
+                */
+               epi->state = EP_STATE_IDLE;
        }
 
        return eventcnt;
 }
 
-static int ep_send_events(struct eventpoll *ep,
-                         struct epoll_event __user *events, int maxevents)
-{
-       struct ep_send_events_data esed;
-
-       esed.maxevents = maxevents;
-       esed.events = events;
-
-       return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0);
-}
-
 static inline struct timespec ep_set_mstimeout(long ms)
 {
        struct timespec now, ts = {
@@ -1552,11 +1484,12 @@ static inline struct timespec ep_set_mstimeout(long ms)
 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                   int maxevents, long timeout)
 {
-       int res = 0, eavail, timed_out = 0;
+       int res = 0;
        unsigned long flags;
        long slack = 0;
        wait_queue_t wait;
        ktime_t expires, *to = NULL;
+       bool eavail;
 
        if (timeout > 0) {
                struct timespec end_time = ep_set_mstimeout(timeout);
@@ -1564,27 +1497,23 @@ static int ep_poll(struct eventpoll *ep, struct 
epoll_event __user *events,
                slack = select_estimate_accuracy(&end_time);
                to = &expires;
                *to = timespec_to_ktime(end_time);
-       } else if (timeout == 0) {
-               /*
-                * Avoid the unnecessary trip to the wait queue loop, if the
-                * caller specified a non blocking operation.
-                */
-               timed_out = 1;
-               spin_lock_irqsave(&ep->lock, flags);
-               goto check_events;
        }
 
-fetch_events:
-       spin_lock_irqsave(&ep->lock, flags);
+       mutex_lock(&ep->mtx);
+       eavail = ep_events_available(ep);
+
+       if (!eavail && timeout) {
+wait_queue_loop:
 
-       if (!ep_events_available(ep)) {
                /*
                 * We don't have any available event to return to the caller.
                 * We need to sleep here, and we will be wake up by
                 * ep_poll_callback() when events will become available.
                 */
                init_waitqueue_entry(&wait, current);
+               spin_lock_irqsave(&ep->wqlock, flags);
                __add_wait_queue_exclusive(&ep->wq, &wait);
+               spin_unlock_irqrestore(&ep->wqlock, flags);
 
                for (;;) {
                        /*
@@ -1593,37 +1522,45 @@ fetch_events:
                         * to TASK_INTERRUPTIBLE before doing the checks.
                         */
                        set_current_state(TASK_INTERRUPTIBLE);
-                       if (ep_events_available(ep) || timed_out)
+                       eavail = ep_events_available(ep);
+                       if (eavail || !timeout)
                                break;
                        if (signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }
+                       mutex_unlock(&ep->mtx);
 
-                       spin_unlock_irqrestore(&ep->lock, flags);
                        if (!schedule_hrtimeout_range(to, slack, 
HRTIMER_MODE_ABS))
-                               timed_out = 1;
+                               timeout = 0;
 
-                       spin_lock_irqsave(&ep->lock, flags);
+                       mutex_lock(&ep->mtx);
                }
+
+               spin_lock_irqsave(&ep->wqlock, flags);
                __remove_wait_queue(&ep->wq, &wait);
+               spin_unlock_irqrestore(&ep->wqlock, flags);
 
                set_current_state(TASK_RUNNING);
        }
-check_events:
-       /* Is it worth to try to dig for events ? */
-       eavail = ep_events_available(ep);
-
-       spin_unlock_irqrestore(&ep->lock, flags);
 
        /*
         * Try to transfer events to user space. In case we get 0 events and
         * there's still timeout left over, we go trying again in search of
         * more luck.
         */
-       if (!res && eavail &&
-           !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
-               goto fetch_events;
+       if (!res) {
+               res = ep_send_events(ep, events, maxevents);
+               if (!res && timeout)
+                       goto wait_queue_loop;
+       }
+
+       eavail = ep_events_available(ep);
+       mutex_unlock(&ep->mtx);
+
+       /* we may not have transferred everything, wake up others */
+       if (eavail)
+               ep_notify_waiters(ep);
 
        return res;
 }
-- 
Eric Wong
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to