On Tue, Jan 02, 2018 at 06:53:08PM +0200, Michael S. Tsirkin wrote: > On Wed, Dec 27, 2017 at 07:50:25PM -0800, John Fastabend wrote: > > When running consumer and/or producer operations and empty checks in > > parallel its possible to have the empty check run past the end of the > > array. The scenario occurs when an empty check is run while > > __ptr_ring_discard_one() is in progress. Specifically after the > > consumer_head is incremented but before (consumer_head >= ring_size) > > check is made and the consumer head is zeroe'd. > > > > To resolve this, without having to rework how consumer/producer ops > > work on the array, simply add an extra dummy slot to the end of the > > array. Even if we did a rework to avoid the extra slot it looks > > like the normal case checks would suffer some so best to just > > allocate an extra pointer. > > > > Reported-by: Jakub Kicinski <jakub.kicin...@netronome.com> > > Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array") > > Signed-off-by: John Fastabend <john.fastab...@gmail.com> > > > > > > --- > > include/linux/ptr_ring.h | 7 ++++++- > > 1 file changed, 6 insertions(+), 1 deletion(-) > > > > diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h > > index 6866df4..13fb06a 100644 > > --- a/include/linux/ptr_ring.h > > +++ b/include/linux/ptr_ring.h > > @@ -447,7 +447,12 @@ static inline int ptr_ring_consume_batched_bh(struct > > ptr_ring *r, > > > > static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t > > gfp) > > { > > - return kcalloc(size, sizeof(void *), gfp); > > + /* Allocate an extra dummy element at end of ring to avoid consumer head > > + * or produce head access past the end of the array. Possible when > > + * producer/consumer operations and __ptr_ring_peek operations run in > > + * parallel. > > + */ > > + return kcalloc(size + 1, sizeof(void *), gfp); > > } > > > > static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) > > > Well the peek will return a false negative then, won't it? > > So I kind of prefer just fixing the consumer. The first step I think > would look something like the below untested patch. Pls take a look. I > suspect we'll need a memory barrier too. > > I wonder though: are false positives or negatives ever a problem? > > Would it be a big deal to just take a lock there, and > avoid trying to support a lockless peek? > > > It would definitely be more straight-forward to just > remove the promise to support a lockless peek. > > Thoughts?
In fact, the API says: * Callers must take consumer_lock * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. * If ring is never resized, and if the pointer is merely * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. So it looks like the API is actually misused here as callers will dereferences the skb returned. > --> > > ptr_ring: keep consumer_head valid at all times > > The comment near __ptr_ring_peek says: > > * If ring is never resized, and if the pointer is merely > * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. > > but this was in fact never possible. > > Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array") > Signed-off-by: Michael S. Tsirkin <m...@redhat.com> > > --- > > diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h > index 37b4bb2..802375f 100644 > --- a/include/linux/ptr_ring.h > +++ b/include/linux/ptr_ring.h > @@ -236,22 +236,28 @@ static inline void __ptr_ring_discard_one(struct > ptr_ring *r) > /* Fundamentally, what we want to do is update consumer > * index and zero out the entry so producer can reuse it. > * Doing it naively at each consume would be as simple as: > - * r->queue[r->consumer++] = NULL; > - * if (unlikely(r->consumer >= r->size)) > - * r->consumer = 0; > + * consumer = r->consumer; > + * r->queue[consumer++] = NULL; > + * if (unlikely(consumer >= r->size)) > + * consumer = 0; > + * r->consumer = consumer; > * but that is suboptimal when the ring is full as producer is writing > * out new entries in the same cache line. Defer these updates until a > * batch of entries has been consumed. > */ > - int head = r->consumer_head++; > + /* Note: we must keep consumer_head valid at all times for > __ptr_ring_peek > + * to work correctly. > + */ > + int consumer_head = r->consumer_head; > + int head = consumer_head++; > > /* Once we have processed enough entries invalidate them in > * the ring all at once so producer can reuse their space in the ring. > * We also do this when we reach end of the ring - not mandatory > * but helps keep the implementation simple. > */ > - if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || > - r->consumer_head >= r->size)) { > + if (unlikely(consumer_head - r->consumer_tail >= r->batch || > + consumer_head >= r->size)) { > /* Zero out entries in the reverse order: this way we touch the > * cache line that producer might currently be reading the last; > * producer won't make progress and touch other cache lines > @@ -259,12 +265,13 @@ static inline void __ptr_ring_discard_one(struct > ptr_ring *r) > */ > while (likely(head >= r->consumer_tail)) > r->queue[head--] = NULL; > - r->consumer_tail = r->consumer_head; > + r->consumer_tail = consumer_head; > } > - if (unlikely(r->consumer_head >= r->size)) { > - r->consumer_head = 0; > + if (unlikely(consumer_head >= r->size)) { > + consumer_head = 0; > r->consumer_tail = 0; > } > + r->consumer_head = consumer_head; > } > > static inline void *__ptr_ring_consume(struct ptr_ring *r)