On Tue, Jan 02, 2018 at 06:53:08PM +0200, Michael S. Tsirkin wrote:
> On Wed, Dec 27, 2017 at 07:50:25PM -0800, John Fastabend wrote:
> > When running consumer and/or producer operations and empty checks in
> > parallel its possible to have the empty check run past the end of the
> > array. The scenario occurs when an empty check is run while
> > __ptr_ring_discard_one() is in progress. Specifically after the
> > consumer_head is incremented but before (consumer_head >= ring_size)
> > check is made and the consumer head is zeroe'd.
> > 
> > To resolve this, without having to rework how consumer/producer ops
> > work on the array, simply add an extra dummy slot to the end of the
> > array. Even if we did a rework to avoid the extra slot it looks
> > like the normal case checks would suffer some so best to just
> > allocate an extra pointer.
> > 
> > Reported-by: Jakub Kicinski <jakub.kicin...@netronome.com>
> > Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array")
> > Signed-off-by: John Fastabend <john.fastab...@gmail.com>
> 
> 
> 
> 
> > ---
> >  include/linux/ptr_ring.h |    7 ++++++-
> >  1 file changed, 6 insertions(+), 1 deletion(-)
> > 
> > diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
> > index 6866df4..13fb06a 100644
> > --- a/include/linux/ptr_ring.h
> > +++ b/include/linux/ptr_ring.h
> > @@ -447,7 +447,12 @@ static inline int ptr_ring_consume_batched_bh(struct 
> > ptr_ring *r,
> >  
> >  static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t 
> > gfp)
> >  {
> > -   return kcalloc(size, sizeof(void *), gfp);
> > +   /* Allocate an extra dummy element at end of ring to avoid consumer head
> > +    * or produce head access past the end of the array. Possible when
> > +    * producer/consumer operations and __ptr_ring_peek operations run in
> > +    * parallel.
> > +    */
> > +   return kcalloc(size + 1, sizeof(void *), gfp);
> >  }
> >  
> >  static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
> 
> 
> Well the peek will return a false negative then, won't it?
> 
> So I kind of prefer just fixing the consumer.  The first step I think
> would look something like the below untested patch.  Pls take a look.  I
> suspect we'll need a memory barrier too.
> 
> I wonder though: are false positives or negatives ever a problem?
> 
> Would it be a big deal to just take a lock there, and
> avoid trying to support a lockless peek?
> 
> 
> It would definitely be more straight-forward to just
> remove the promise to support a lockless peek.
> 
> Thoughts?

In fact, the API says:

 * Callers must take consumer_lock
 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
 * If ring is never resized, and if the pointer is merely
 * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.

So it looks like the API is actually misused here as callers
will dereferences the skb returned.


> -->
> 
> ptr_ring: keep consumer_head valid at all times
> 
> The comment near __ptr_ring_peek says: 
>  
>  * If ring is never resized, and if the pointer is merely 
>  * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty. 
> 
> but this was in fact never possible.
> 
> Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array")
> Signed-off-by: Michael S. Tsirkin <m...@redhat.com>
> 
> ---
> 
> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
> index 37b4bb2..802375f 100644
> --- a/include/linux/ptr_ring.h
> +++ b/include/linux/ptr_ring.h
> @@ -236,22 +236,28 @@ static inline void __ptr_ring_discard_one(struct 
> ptr_ring *r)
>       /* Fundamentally, what we want to do is update consumer
>        * index and zero out the entry so producer can reuse it.
>        * Doing it naively at each consume would be as simple as:
> -      *       r->queue[r->consumer++] = NULL;
> -      *       if (unlikely(r->consumer >= r->size))
> -      *               r->consumer = 0;
> +      *       consumer = r->consumer;
> +      *       r->queue[consumer++] = NULL;
> +      *       if (unlikely(consumer >= r->size))
> +      *               consumer = 0;
> +      *       r->consumer = consumer;
>        * but that is suboptimal when the ring is full as producer is writing
>        * out new entries in the same cache line.  Defer these updates until a
>        * batch of entries has been consumed.
>        */
> -     int head = r->consumer_head++;
> +     /* Note: we must keep consumer_head valid at all times for 
> __ptr_ring_peek
> +      * to work correctly.
> +      */
> +     int consumer_head = r->consumer_head;
> +     int head = consumer_head++;
>  
>       /* Once we have processed enough entries invalidate them in
>        * the ring all at once so producer can reuse their space in the ring.
>        * We also do this when we reach end of the ring - not mandatory
>        * but helps keep the implementation simple.
>        */
> -     if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
> -                  r->consumer_head >= r->size)) {
> +     if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
> +                  consumer_head >= r->size)) {
>               /* Zero out entries in the reverse order: this way we touch the
>                * cache line that producer might currently be reading the last;
>                * producer won't make progress and touch other cache lines
> @@ -259,12 +265,13 @@ static inline void __ptr_ring_discard_one(struct 
> ptr_ring *r)
>                */
>               while (likely(head >= r->consumer_tail))
>                       r->queue[head--] = NULL;
> -             r->consumer_tail = r->consumer_head;
> +             r->consumer_tail = consumer_head;
>       }
> -     if (unlikely(r->consumer_head >= r->size)) {
> -             r->consumer_head = 0;
> +     if (unlikely(consumer_head >= r->size)) {
> +             consumer_head = 0;
>               r->consumer_tail = 0;
>       }
> +     r->consumer_head = consumer_head;
>  }
>  
>  static inline void *__ptr_ring_consume(struct ptr_ring *r)

Reply via email to