On Wed, 2016-05-18 at 19:26 +0300, Michael S. Tsirkin wrote: > On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote: > > I agree. It is sad to see everybody is implementing the same thing, > > open coding an array/circular based ring buffer. This kind of code is > > hard to maintain and get right with barriers etc. We can achieve the > > same performance with a generic implementation, by inlining the help > > function calls. > > So my testing seems to show that at least for the common usecase > in networking, which isn't lockless, circular buffer > with indices does not perform that well, because > each index access causes a cache line to bounce between > CPUs, and index access causes stalls due to the dependency.
Yes. > > By comparison, an array of pointers where NULL means invalid > and !NULL means valid, can be updated without messing up barriers > at all and does not have this issue. Right but then you need appropriate barriers. > > You also mentioned cache pressure caused by using large queues, and I > think it's a significant issue. tun has a queue of 1000 entries by > default and that's 8K already. > > So, I had an idea: with an array of pointers we could actually use > only part of the ring as long as it stays mostly empty. > We do want to fill at least two cache lines to prevent producer > and consumer from writing over the same cache line all the time. > This is SKB_ARRAY_MIN_SIZE logic below. > > Pls take a look at the implementation below. It's a straight port from virtio > unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that > I added. Today I run out of time for testing this. Posting for early > flames/feedback. > > It's using skb pointers but we switching to void * would be easy at cost > of type safety, though it appears that people want lockless push > etc so I'm not sure of the value. > > ---> > skb_array: array based FIFO for skbs > > A simple array based FIFO of pointers. > Intended for net stack so uses skbs for type > safety, but we can replace with with void * > if others find it useful outside of net stack. > > Signed-off-by: Michael S. Tsirkin <m...@redhat.com> > > --- > > diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h > new file mode 100644 > index 0000000..a67cc8b > --- /dev/null > +++ b/include/linux/skb_array.h > @@ -0,0 +1,116 @@ > +/* > + * See Documentation/skbular-buffers.txt for more information. > + */ > + > +#ifndef _LINUX_SKB_ARRAY_H > +#define _LINUX_SKB_ARRAY_H 1 > + > +#include <linux/spinlock.h> > +#include <linux/cache.h> > +#include <linux/types.h> > +#include <linux/compiler.h> > +#include <linux/cache.h> > +#include <linux/slab.h> > +#include <asm/errno.h> > + > +struct sk_buff; > + > +struct skb_array { > + int producer ____cacheline_aligned_in_smp; > + spinlock_t producer_lock; > + int consumer ____cacheline_aligned_in_smp; > + spinlock_t consumer_lock; > + /* Shared consumer/producer data */ > + int size ____cacheline_aligned_in_smp; /* max entries in queue */ > + struct sk_buff **queue; > +}; > + > +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \ > + sizeof (struct sk_buff *)) > + > +static inline int __skb_array_produce(struct skb_array *a, > + struct sk_buff *skb) > +{ > + /* Try to start from beginning: good for cache utilization as we'll > + * keep reusing the same cache line. > + * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this, > + * to reduce bouncing cache lines between them. > + */ > + if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0]) a->queue[0] might be set by consumer, you probably need a barrier. > + a->producer = 0; > + if (a->queue[a->producer]) > + return -ENOSPC; > + a->queue[a->producer] = skb; > + if (unlikely(++a->producer > a->size)) > + a->producer = 0; > + return 0; > +} > + > +static inline int skb_array_produce_bh(struct skb_array *a, > + struct sk_buff *skb) > +{ > + int ret; > + > + spin_lock_bh(&a->producer_lock); > + ret = __skb_array_produce(a, skb); > + spin_unlock_bh(&a->producer_lock); > + > + return ret; > +} > + > +static inline struct sk_buff *__skb_array_peek(struct skb_array *a) > +{ > + if (a->queue[a->consumer]) > + return a->queue[a->consumer]; > + > + /* Check whether producer started at the beginning. */ > + if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) { > + a->consumer = 0; > + return a->queue[0]; > + } > + > + return NULL; > +} > + > +static inline void __skb_array_consume(struct skb_array *a) > +{ > + a->queue[a->consumer++] = NULL; > + if (unlikely(++a->consumer > a->size)) a->consumer is incremented twice ? > + a->consumer = 0; > +} > +