On 5/3/20 6:24 AM, Roman Penyaev wrote:
> On 2020-05-02 00:09, Jason Baron wrote:
>> On 5/1/20 5:02 PM, Roman Penyaev wrote:
>>> Hi Jason,
>>>
>>> That is indeed a nice catch.
>>> Seems we need smp_rmb() pair between list_empty_careful(&rp->rdllist) and
>>> READ_ONCE(ep->ovflist) for ep_events_available(), do we?
>>>
>>
>> Hi Roman,
>>
>> Good point, even if we order those reads its still racy, since the
>> read of the ready list could come after its been cleared and the
>> read of the overflow could again come after its been cleared.
> 
> You mean the second chunk? True. Sigh.
> 
>> So I'm afraid we might need instead something like this to make
>> sure they are read together:
> 
> No, impossible, I can't believe in that :) We can't give up.
> 
> All we need is to keep a mark, that ep->rdllist is not empty,
> even we've just spliced it.  ep_poll_callback() always takes
> the ->ovflist path, if ->ovflist is not EP_UNACTIVE_PTR, but
> ep_events_available() does not need to observe ->ovflist at
> all, just a ->rdllist.
> 
> Take a look at that, do I miss something? :
> 
> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
> index aba03ee749f8..a8770f9a917e 100644
> --- a/fs/eventpoll.c
> +++ b/fs/eventpoll.c
> @@ -376,8 +376,7 @@ static void ep_nested_calls_init(struct nested_calls 
> *ncalls)
>   */
>  static inline int ep_events_available(struct eventpoll *ep)
>  {
> -       return !list_empty_careful(&ep->rdllist) ||
> -               READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
> +       return !list_empty_careful(&ep->rdllist);
>  }
> 
>  #ifdef CONFIG_NET_RX_BUSY_POLL
> @@ -683,7 +682,8 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
>  {
>         __poll_t res;
>         struct epitem *epi, *nepi;
> -       LIST_HEAD(txlist);
> +       LIST_HEAD(rdllist);
> +       LIST_HEAD(ovflist);
> 
>         lockdep_assert_irqs_enabled();
> 
> @@ -704,14 +704,22 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
>          * in a lockless way.
>          */
>         write_lock_irq(&ep->lock);
> -       list_splice_init(&ep->rdllist, &txlist);
> +       /*
> +        * We do not call list_splice_init() because for lockless
> +        * ep_events_available() ->rdllist is still "not empty".
> +        * Otherwise the feature that there is something left in
> +        * the list can be lost which causes missed wakeup.
> +        */
> +       list_splice(&ep->rdllist, &rdllist);
> +       /*
> +        * If ->rdllist was empty we should pretend it was not,
> +        * because after the unlock ->ovflist comes into play,
> +        * which is invisible for lockless ep_events_available().
> +        */
> +       ep->rdllist.next = LIST_POISON1;
>         WRITE_ONCE(ep->ovflist, NULL);
>         write_unlock_irq(&ep->lock);
> 
>         /*
>          * Now call the callback function.
>          */
> -       res = (*sproc)(ep, &txlist, priv);
> +       res = (*sproc)(ep, &rdllist, priv);
> 
>         write_lock_irq(&ep->lock);
>         /*
> @@ -724,7 +732,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
>                 /*
>                  * We need to check if the item is already in the list.
>                  * During the "sproc" callback execution time, items are
> -                * queued into ->ovflist but the "txlist" might already
> +                * queued into ->ovflist but the "rdllist" might already
>                  * contain them, and the list_splice() below takes care of 
> them.
>                  */
>                 if (!ep_is_linked(epi)) {
> @@ -732,7 +740,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
>                          * ->ovflist is LIFO, so we have to reverse it in 
> order
>                          * to keep in FIFO.
>                          */
> -                       list_add(&epi->rdllink, &ep->rdllist);
> +                       list_add(&epi->rdllink, &ovflist);
>                         ep_pm_stay_awake(epi);
>                 }
>         }
> @@ -743,10 +751,11 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
>          */
>         WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
> 
> -       /*
> -        * Quickly re-inject items left on "txlist".
> -        */
> -       list_splice(&txlist, &ep->rdllist);
> +       /* Events from ->ovflist happened later, thus splice to the tail */
> +       list_splice_tail(&ovflist, &rdllist);
> +       /* Just replace list */
> +       list_replace(&rdllist, &ep->rdllist);
> +
>         __pm_relax(ep->ws);
>         write_unlock_irq(&ep->lock);
> 
> @@ -1763,13 +1772,13 @@ static __poll_t ep_send_events_proc(struct eventpoll 
> *ep, struct list_head *head
>                          * Trigger mode, we need to insert back inside
>                          * the ready list, so that the next call to
>                          * epoll_wait() will check again the events
> -                        * availability. At this point, no one can insert
> -                        * into ep->rdllist besides us. The epoll_ctl()
> -                        * callers are locked out by
> -                        * ep_scan_ready_list() holding "mtx" and the
> -                        * poll callback will queue them in ep->ovflist.
> +                        * availability. What we do here is simply
> +                        * return the epi to the same position where
> +                        * it was, the ep_scan_ready_list() will
> +                        * re-inject the leftovers to the ->rdllist
> +                        * under the proper lock.
>                          */
> -                       list_add_tail(&epi->rdllink, &ep->rdllist);
> +                       list_add_tail(&epi->rdllink, &tmp->rdllink);
>                         ep_pm_stay_awake(epi);
>                 }
>         }
> 
> 
> -- 
> Roman
> 


Hi Roman,

I think this misses an important case - the initial ep_poll_callback()
may queue to the overflow list. In this case ep_poll has no visibility
into the event since its only checking ep->rdllist.

Here's a different approach using seqcount that avoids expanding ep->lock
region.

Thanks,

-Jason


--- a/fs/eventpoll.c
+++ b/fs/eventpoll.c
@@ -221,6 +221,8 @@ struct eventpoll {
        struct list_head visited_list_link;
        int visited;

+       seqcount_t seq;
+
 #ifdef CONFIG_NET_RX_BUSY_POLL
        /* used to track busy poll napi_id */
        unsigned int napi_id;
@@ -704,8 +706,10 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
         * in a lockless way.
         */
        write_lock_irq(&ep->lock);
+       write_seqcount_begin(&ep->seq);
        list_splice_init(&ep->rdllist, &txlist);
        WRITE_ONCE(ep->ovflist, NULL);
+       write_seqcount_end(&ep->seq);
        write_unlock_irq(&ep->lock);

        /*
@@ -741,12 +745,14 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
         * releasing the lock, events will be queued in the normal way inside
         * ep->rdllist.
         */
+       write_seqcount_begin(&ep->seq);
        WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);

        /*
         * Quickly re-inject items left on "txlist".
         */
        list_splice(&txlist, &ep->rdllist);
+       write_seqcount_end(&ep->seq);
        __pm_relax(ep->ws);
        write_unlock_irq(&ep->lock);

@@ -1025,6 +1031,7 @@ static int ep_alloc(struct eventpoll **pep)
        ep->rbr = RB_ROOT_CACHED;
        ep->ovflist = EP_UNACTIVE_PTR;
        ep->user = user;
+       seqcount_init(&ep->seq);

        *pep = ep;

@@ -1824,6 +1831,7 @@ static int ep_poll(struct eventpoll *ep, struct 
epoll_event __user *events,
        u64 slack = 0;
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;
+       unsigned int seq;

        lockdep_assert_irqs_enabled();

@@ -1900,7 +1908,10 @@ static int ep_poll(struct eventpoll *ep, struct 
epoll_event __user *events,
                        break;
                }

-               eavail = ep_events_available(ep);
+               do {
+                       seq = read_seqcount_begin(&ep->seq);
+                       eavail = ep_events_available(ep);
+               } while (read_seqcount_retry(&ep->seq, seq));
                if (eavail)
                        break;
                if (signal_pending(current)) {

Reply via email to