On 5/3/20 6:24 AM, Roman Penyaev wrote:
> On 2020-05-02 00:09, Jason Baron wrote:
>> On 5/1/20 5:02 PM, Roman Penyaev wrote:
>>> Hi Jason,
>>>
>>> That is indeed a nice catch.
>>> Seems we need smp_rmb() pair between list_empty_careful(&rp->rdllist) and
>>> READ_ONCE(ep->ovflist) for ep_events_available(), do we?
>>>
>>
>> Hi Roman,
>>
>> Good point, even if we order those reads its still racy, since the
>> read of the ready list could come after its been cleared and the
>> read of the overflow could again come after its been cleared.
>
> You mean the second chunk? True. Sigh.
>
>> So I'm afraid we might need instead something like this to make
>> sure they are read together:
>
> No, impossible, I can't believe in that :) We can't give up.
>
> All we need is to keep a mark, that ep->rdllist is not empty,
> even we've just spliced it. ep_poll_callback() always takes
> the ->ovflist path, if ->ovflist is not EP_UNACTIVE_PTR, but
> ep_events_available() does not need to observe ->ovflist at
> all, just a ->rdllist.
>
> Take a look at that, do I miss something? :
>
> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
> index aba03ee749f8..a8770f9a917e 100644
> --- a/fs/eventpoll.c
> +++ b/fs/eventpoll.c
> @@ -376,8 +376,7 @@ static void ep_nested_calls_init(struct nested_calls
> *ncalls)
> */
> static inline int ep_events_available(struct eventpoll *ep)
> {
> - return !list_empty_careful(&ep->rdllist) ||
> - READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
> + return !list_empty_careful(&ep->rdllist);
> }
>
> #ifdef CONFIG_NET_RX_BUSY_POLL
> @@ -683,7 +682,8 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> {
> __poll_t res;
> struct epitem *epi, *nepi;
> - LIST_HEAD(txlist);
> + LIST_HEAD(rdllist);
> + LIST_HEAD(ovflist);
>
> lockdep_assert_irqs_enabled();
>
> @@ -704,14 +704,22 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> * in a lockless way.
> */
> write_lock_irq(&ep->lock);
> - list_splice_init(&ep->rdllist, &txlist);
> + /*
> + * We do not call list_splice_init() because for lockless
> + * ep_events_available() ->rdllist is still "not empty".
> + * Otherwise the feature that there is something left in
> + * the list can be lost which causes missed wakeup.
> + */
> + list_splice(&ep->rdllist, &rdllist);
> + /*
> + * If ->rdllist was empty we should pretend it was not,
> + * because after the unlock ->ovflist comes into play,
> + * which is invisible for lockless ep_events_available().
> + */
> + ep->rdllist.next = LIST_POISON1;
> WRITE_ONCE(ep->ovflist, NULL);
> write_unlock_irq(&ep->lock);
>
> /*
> * Now call the callback function.
> */
> - res = (*sproc)(ep, &txlist, priv);
> + res = (*sproc)(ep, &rdllist, priv);
>
> write_lock_irq(&ep->lock);
> /*
> @@ -724,7 +732,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> /*
> * We need to check if the item is already in the list.
> * During the "sproc" callback execution time, items are
> - * queued into ->ovflist but the "txlist" might already
> + * queued into ->ovflist but the "rdllist" might already
> * contain them, and the list_splice() below takes care of
> them.
> */
> if (!ep_is_linked(epi)) {
> @@ -732,7 +740,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> * ->ovflist is LIFO, so we have to reverse it in
> order
> * to keep in FIFO.
> */
> - list_add(&epi->rdllink, &ep->rdllist);
> + list_add(&epi->rdllink, &ovflist);
> ep_pm_stay_awake(epi);
> }
> }
> @@ -743,10 +751,11 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> */
> WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
>
> - /*
> - * Quickly re-inject items left on "txlist".
> - */
> - list_splice(&txlist, &ep->rdllist);
> + /* Events from ->ovflist happened later, thus splice to the tail */
> + list_splice_tail(&ovflist, &rdllist);
> + /* Just replace list */
> + list_replace(&rdllist, &ep->rdllist);
> +
> __pm_relax(ep->ws);
> write_unlock_irq(&ep->lock);
>
> @@ -1763,13 +1772,13 @@ static __poll_t ep_send_events_proc(struct eventpoll
> *ep, struct list_head *head
> * Trigger mode, we need to insert back inside
> * the ready list, so that the next call to
> * epoll_wait() will check again the events
> - * availability. At this point, no one can insert
> - * into ep->rdllist besides us. The epoll_ctl()
> - * callers are locked out by
> - * ep_scan_ready_list() holding "mtx" and the
> - * poll callback will queue them in ep->ovflist.
> + * availability. What we do here is simply
> + * return the epi to the same position where
> + * it was, the ep_scan_ready_list() will
> + * re-inject the leftovers to the ->rdllist
> + * under the proper lock.
> */
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> + list_add_tail(&epi->rdllink, &tmp->rdllink);
> ep_pm_stay_awake(epi);
> }
> }
>
>
> --
> Roman
>
Hi Roman,
I think this misses an important case - the initial ep_poll_callback()
may queue to the overflow list. In this case ep_poll has no visibility
into the event since its only checking ep->rdllist.
Here's a different approach using seqcount that avoids expanding ep->lock
region.
Thanks,
-Jason
--- a/fs/eventpoll.c
+++ b/fs/eventpoll.c
@@ -221,6 +221,8 @@ struct eventpoll {
struct list_head visited_list_link;
int visited;
+ seqcount_t seq;
+
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
@@ -704,8 +706,10 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
* in a lockless way.
*/
write_lock_irq(&ep->lock);
+ write_seqcount_begin(&ep->seq);
list_splice_init(&ep->rdllist, &txlist);
WRITE_ONCE(ep->ovflist, NULL);
+ write_seqcount_end(&ep->seq);
write_unlock_irq(&ep->lock);
/*
@@ -741,12 +745,14 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
+ write_seqcount_begin(&ep->seq);
WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
/*
* Quickly re-inject items left on "txlist".
*/
list_splice(&txlist, &ep->rdllist);
+ write_seqcount_end(&ep->seq);
__pm_relax(ep->ws);
write_unlock_irq(&ep->lock);
@@ -1025,6 +1031,7 @@ static int ep_alloc(struct eventpoll **pep)
ep->rbr = RB_ROOT_CACHED;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
+ seqcount_init(&ep->seq);
*pep = ep;
@@ -1824,6 +1831,7 @@ static int ep_poll(struct eventpoll *ep, struct
epoll_event __user *events,
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
+ unsigned int seq;
lockdep_assert_irqs_enabled();
@@ -1900,7 +1908,10 @@ static int ep_poll(struct eventpoll *ep, struct
epoll_event __user *events,
break;
}
- eavail = ep_events_available(ep);
+ do {
+ seq = read_seqcount_begin(&ep->seq);
+ eavail = ep_events_available(ep);
+ } while (read_seqcount_retry(&ep->seq, seq));
if (eavail)
break;
if (signal_pending(current)) {