On Wed, May 18, 2016 at 09:41:03AM -0700, Eric Dumazet wrote: > On Wed, 2016-05-18 at 19:26 +0300, Michael S. Tsirkin wrote: > > On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote: > > > I agree. It is sad to see everybody is implementing the same thing, > > > open coding an array/circular based ring buffer. This kind of code is > > > hard to maintain and get right with barriers etc. We can achieve the > > > same performance with a generic implementation, by inlining the help > > > function calls. > > > > So my testing seems to show that at least for the common usecase > > in networking, which isn't lockless, circular buffer > > with indices does not perform that well, because > > each index access causes a cache line to bounce between > > CPUs, and index access causes stalls due to the dependency. > > > Yes. > > > > > By comparison, an array of pointers where NULL means invalid > > and !NULL means valid, can be updated without messing up barriers > > at all and does not have this issue. > > Right but then you need appropriate barriers. > > > > > You also mentioned cache pressure caused by using large queues, and I > > think it's a significant issue. tun has a queue of 1000 entries by > > default and that's 8K already. > > > > So, I had an idea: with an array of pointers we could actually use > > only part of the ring as long as it stays mostly empty. > > We do want to fill at least two cache lines to prevent producer > > and consumer from writing over the same cache line all the time. > > This is SKB_ARRAY_MIN_SIZE logic below. > > > > Pls take a look at the implementation below. It's a straight port from > > virtio > > unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that > > I added. Today I run out of time for testing this. Posting for early > > flames/feedback. > > > > It's using skb pointers but we switching to void * would be easy at cost > > of type safety, though it appears that people want lockless push > > etc so I'm not sure of the value. > > > > ---> > > skb_array: array based FIFO for skbs > > > > A simple array based FIFO of pointers. > > Intended for net stack so uses skbs for type > > safety, but we can replace with with void * > > if others find it useful outside of net stack. > > > > Signed-off-by: Michael S. Tsirkin <m...@redhat.com> > > > > --- > > > > diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h > > new file mode 100644 > > index 0000000..a67cc8b > > --- /dev/null > > +++ b/include/linux/skb_array.h > > @@ -0,0 +1,116 @@ > > +/* > > + * See Documentation/skbular-buffers.txt for more information. > > + */ > > + > > +#ifndef _LINUX_SKB_ARRAY_H > > +#define _LINUX_SKB_ARRAY_H 1 > > + > > +#include <linux/spinlock.h> > > +#include <linux/cache.h> > > +#include <linux/types.h> > > +#include <linux/compiler.h> > > +#include <linux/cache.h> > > +#include <linux/slab.h> > > +#include <asm/errno.h> > > + > > +struct sk_buff; > > + > > +struct skb_array { > > + int producer ____cacheline_aligned_in_smp; > > + spinlock_t producer_lock; > > + int consumer ____cacheline_aligned_in_smp; > > + spinlock_t consumer_lock; > > + /* Shared consumer/producer data */ > > + int size ____cacheline_aligned_in_smp; /* max entries in queue */ > > + struct sk_buff **queue; > > +}; > > + > > +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \ > > + sizeof (struct sk_buff *)) > > + > > +static inline int __skb_array_produce(struct skb_array *a, > > + struct sk_buff *skb) > > +{ > > + /* Try to start from beginning: good for cache utilization as we'll > > + * keep reusing the same cache line. > > + * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this, > > + * to reduce bouncing cache lines between them. > > + */ > > + if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0]) > > a->queue[0] might be set by consumer, you probably need a barrier.
I think not - we write to the same place below and two accesses to same address are never reordered. > > + a->producer = 0; > > + if (a->queue[a->producer]) > > + return -ENOSPC; > > + a->queue[a->producer] = skb; > > + if (unlikely(++a->producer > a->size)) > > + a->producer = 0; > > + return 0; > > +} > > + > > +static inline int skb_array_produce_bh(struct skb_array *a, > > + struct sk_buff *skb) > > +{ > > + int ret; > > + > > + spin_lock_bh(&a->producer_lock); > > + ret = __skb_array_produce(a, skb); > > + spin_unlock_bh(&a->producer_lock); > > + > > + return ret; > > +} > > + > > +static inline struct sk_buff *__skb_array_peek(struct skb_array *a) > > +{ > > + if (a->queue[a->consumer]) > > + return a->queue[a->consumer]; > > + > > + /* Check whether producer started at the beginning. */ > > + if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) { > > + a->consumer = 0; > > + return a->queue[0]; > > + } > > + > > + return NULL; > > +} > > + > > +static inline void __skb_array_consume(struct skb_array *a) > > +{ > > + a->queue[a->consumer++] = NULL; > > + if (unlikely(++a->consumer > a->size)) > > a->consumer is incremented twice ? Oops. > > + a->consumer = 0; > > +} > > + > > >