> > > > > > Current APIs assume ring elements to be pointers. However, in > > > > > > many use cases, the size can be different. Add new APIs to > > > > > > support configurable ring element sizes. > > > > > > > > > > > > Signed-off-by: Honnappa Nagarahalli > > > > > > <honnappa.nagaraha...@arm.com> > > > > > > Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com> > > > > > > Reviewed-by: Gavin Hu <gavin...@arm.com> > > > > > > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > > > > > > --- > > > > > > lib/librte_ring/Makefile | 3 +- > > > > > > lib/librte_ring/meson.build | 3 + > > > > > > lib/librte_ring/rte_ring.c | 45 +- > > > > > > lib/librte_ring/rte_ring.h | 1 + > > > > > > lib/librte_ring/rte_ring_elem.h | 946 > > +++++++++++++++++++++++++++ > > > > > > lib/librte_ring/rte_ring_version.map | 2 + > > > > > > 6 files changed, 991 insertions(+), 9 deletions(-) create mode > > > > > > 100644 lib/librte_ring/rte_ring_elem.h > > > > > > > > > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > > > > > > index 21a36770d..515a967bb 100644 > > > > > > --- a/lib/librte_ring/Makefile > > > > > > +++ b/lib/librte_ring/Makefile > > <snip> > > > > > > > + > > > > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are > > > > > > +experimental allow_experimental_apis = true > > > > > > diff --git a/lib/librte_ring/rte_ring.c > > > > > > b/lib/librte_ring/rte_ring.c index d9b308036..6fed3648b 100644 > > > > > > --- a/lib/librte_ring/rte_ring.c > > > > > > +++ b/lib/librte_ring/rte_ring.c > > > > > > @@ -33,6 +33,7 @@ > > > > > > #include <rte_tailq.h> > > > > > > > > > > > > #include "rte_ring.h" > > > > > > +#include "rte_ring_elem.h" > > > > > > > > <snip> > > > > > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index > > > > > > 000000000..860f059ad > > > > > > --- /dev/null > > > > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > > > > @@ -0,0 +1,946 @@ > > > > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > > > > + * > > > > > > + * Copyright (c) 2019 Arm Limited > > > > > > + * Copyright (c) 2010-2017 Intel Corporation > > > > > > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > > > > > > + * All rights reserved. > > > > > > + * Derived from FreeBSD's bufring.h > > > > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > > > > + */ > > > > > > + > > > > > > +#ifndef _RTE_RING_ELEM_H_ > > > > > > +#define _RTE_RING_ELEM_H_ > > > > > > + > > <snip> > > > > > > > + > > > > > > +/* the actual enqueue of pointers on the ring. > > > > > > + * Placed here since identical code needed in both > > > > > > + * single and multi producer enqueue functions. > > > > > > + */ > > > > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, > > > > > > +esize, n) > > > > > > do { \ > > > > > > + if (esize == 4) \ > > > > > > + ENQUEUE_PTRS_32(r, ring_start, prod_head, > > obj_table, n); \ > > > > > > + else if (esize == 8) \ > > > > > > + ENQUEUE_PTRS_64(r, ring_start, prod_head, > > obj_table, n); \ > > > > > > + else if (esize == 16) \ > > > > > > + ENQUEUE_PTRS_128(r, ring_start, prod_head, > > obj_table, n); > > > > \ } > > > > > > while > > > > > > +(0) > > > > > > + > > > > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) > > do { \ > > > > > > + unsigned int i; \ > > > > > > + const uint32_t size = (r)->size; \ > > > > > > + uint32_t idx = prod_head & (r)->mask; \ > > > > > > + uint32_t *ring = (uint32_t *)ring_start; \ > > > > > > + uint32_t *obj = (uint32_t *)obj_table; \ > > > > > > + if (likely(idx + n < size)) { \ > > > > > > + for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx > > > > > > += 8) > > { \ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + ring[idx + 1] = obj[i + 1]; \ > > > > > > + ring[idx + 2] = obj[i + 2]; \ > > > > > > + ring[idx + 3] = obj[i + 3]; \ > > > > > > + ring[idx + 4] = obj[i + 4]; \ > > > > > > + ring[idx + 5] = obj[i + 5]; \ > > > > > > + ring[idx + 6] = obj[i + 6]; \ > > > > > > + ring[idx + 7] = obj[i + 7]; \ > > > > > > + } \ > > > > > > + switch (n & 0x7) { \ > > > > > > + case 7: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 6: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 5: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 4: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 3: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 2: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 1: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + } \ > > > > > > + } else { \ > > > > > > + for (i = 0; idx < size; i++, idx++)\ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + } \ > > > > > > +} while (0) > > > > > > + > > > > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) > > do { \ > > > > > > + unsigned int i; \ > > > > > > + const uint32_t size = (r)->size; \ > > > > > > + uint32_t idx = prod_head & (r)->mask; \ > > > > > > + uint64_t *ring = (uint64_t *)ring_start; \ > > > > > > + uint64_t *obj = (uint64_t *)obj_table; \ > > > > > > + if (likely(idx + n < size)) { \ > > > > > > + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx > > > > > > += 4) > > { \ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + ring[idx + 1] = obj[i + 1]; \ > > > > > > + ring[idx + 2] = obj[i + 2]; \ > > > > > > + ring[idx + 3] = obj[i + 3]; \ > > > > > > + } \ > > > > > > + switch (n & 0x3) { \ > > > > > > + case 3: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 2: \ > > > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > > > + case 1: \ > > > > > > + ring[idx++] = obj[i++]; \ > > > > > > + } \ > > > > > > + } else { \ > > > > > > + for (i = 0; idx < size; i++, idx++)\ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + } \ > > > > > > +} while (0) > > > > > > + > > > > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, > > > > > > +n) do > > > > { \ > > > > > > + unsigned int i; \ > > > > > > + const uint32_t size = (r)->size; \ > > > > > > + uint32_t idx = prod_head & (r)->mask; \ > > > > > > + __uint128_t *ring = (__uint128_t *)ring_start; \ > > > > > > + __uint128_t *obj = (__uint128_t *)obj_table; \ > > > > > > + if (likely(idx + n < size)) { \ > > > > > > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + ring[idx + 1] = obj[i + 1]; \ > > > > > > + } \ > > > > > > + switch (n & 0x1) { \ > > > > > > + case 1: \ > > > > > > + ring[idx++] = obj[i++]; \ > > > > > > + } \ > > > > > > + } else { \ > > > > > > + for (i = 0; idx < size; i++, idx++)\ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > > > + ring[idx] = obj[i]; \ > > > > > > + } \ > > > > > > +} while (0) > > > > > > + > > > > > > +/* the actual copy of pointers on the ring to obj_table. > > > > > > + * Placed here since identical code needed in both > > > > > > + * single and multi consumer dequeue functions. > > > > > > + */ > > > > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, > > > > > > +esize, n) > > > > > > do { \ > > > > > > + if (esize == 4) \ > > > > > > + DEQUEUE_PTRS_32(r, ring_start, cons_head, > > obj_table, n); \ > > > > > > + else if (esize == 8) \ > > > > > > + DEQUEUE_PTRS_64(r, ring_start, cons_head, > > obj_table, n); \ > > > > > > + else if (esize == 16) \ > > > > > > + DEQUEUE_PTRS_128(r, ring_start, cons_head, > > obj_table, n); > > > > \ } > > > > > > while > > > > > > +(0) > > > > > > + > > > > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do > > { \ > > > > > > + unsigned int i; \ > > > > > > + uint32_t idx = cons_head & (r)->mask; \ > > > > > > + const uint32_t size = (r)->size; \ > > > > > > + uint32_t *ring = (uint32_t *)ring_start; \ > > > > > > + uint32_t *obj = (uint32_t *)obj_table; \ > > > > > > + if (likely(idx + n < size)) { \ > > > > > > + for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += > > > > > > 8) > > {\ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + obj[i + 1] = ring[idx + 1]; \ > > > > > > + obj[i + 2] = ring[idx + 2]; \ > > > > > > + obj[i + 3] = ring[idx + 3]; \ > > > > > > + obj[i + 4] = ring[idx + 4]; \ > > > > > > + obj[i + 5] = ring[idx + 5]; \ > > > > > > + obj[i + 6] = ring[idx + 6]; \ > > > > > > + obj[i + 7] = ring[idx + 7]; \ > > > > > > + } \ > > > > > > + switch (n & 0x7) { \ > > > > > > + case 7: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 6: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 5: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 4: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 3: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 2: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 1: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + } \ > > > > > > + } else { \ > > > > > > + for (i = 0; idx < size; i++, idx++) \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + } \ > > > > > > +} while (0) > > > > > > + > > > > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do > > { \ > > > > > > + unsigned int i; \ > > > > > > + uint32_t idx = cons_head & (r)->mask; \ > > > > > > + const uint32_t size = (r)->size; \ > > > > > > + uint64_t *ring = (uint64_t *)ring_start; \ > > > > > > + uint64_t *obj = (uint64_t *)obj_table; \ > > > > > > + if (likely(idx + n < size)) { \ > > > > > > + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += > > > > > > 4) > > {\ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + obj[i + 1] = ring[idx + 1]; \ > > > > > > + obj[i + 2] = ring[idx + 2]; \ > > > > > > + obj[i + 3] = ring[idx + 3]; \ > > > > > > + } \ > > > > > > + switch (n & 0x3) { \ > > > > > > + case 3: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 2: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + case 1: \ > > > > > > + obj[i++] = ring[idx++]; \ > > > > > > + } \ > > > > > > + } else { \ > > > > > > + for (i = 0; idx < size; i++, idx++) \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + } \ > > > > > > +} while (0) > > > > > > + > > > > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, > > > > > > +n) do > > > > { \ > > > > > > + unsigned int i; \ > > > > > > + uint32_t idx = cons_head & (r)->mask; \ > > > > > > + const uint32_t size = (r)->size; \ > > > > > > + __uint128_t *ring = (__uint128_t *)ring_start; \ > > > > > > + __uint128_t *obj = (__uint128_t *)obj_table; \ > > > > > > + if (likely(idx + n < size)) { \ > > > > > > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + obj[i + 1] = ring[idx + 1]; \ > > > > > > + } \ > > > > > > + switch (n & 0x1) { \ > > > > > > + case 1: \ > > > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > > > + } \ > > > > > > + } else { \ > > > > > > + for (i = 0; idx < size; i++, idx++) \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > > > + obj[i] = ring[idx]; \ > > > > > > + } \ > > > > > > +} while (0) > > > > > > + > > > > > > +/* Between load and load. there might be cpu reorder in weak > > > > > > +model > > > > > > + * (powerpc/arm). > > > > > > + * There are 2 choices for the users > > > > > > + * 1.use rmb() memory barrier > > > > > > + * 2.use one-direction load_acquire/store_release > > > > > > +barrier,defined by > > > > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y > > > > > > + * It depends on performance test results. > > > > > > + * By default, move common functions to rte_ring_generic.h */ > > > > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h" > > > > > > +#else > > > > > > +#include "rte_ring_generic.h" > > > > > > +#endif > > > > > > + > > > > > > +/** > > > > > > + * @internal Enqueue several objects on the ring > > > > > > + * > > > > > > + * @param r > > > > > > + * A pointer to the ring structure. > > > > > > + * @param obj_table > > > > > > + * A pointer to a table of void * pointers (objects). > > > > > > + * @param esize > > > > > > + * The size of ring element, in bytes. It must be a multiple of > > > > > > 4. > > > > > > + * Currently, sizes 4, 8 and 16 are supported. This should be the > > same > > > > > > + * as passed while creating the ring, otherwise the results are > > undefined. > > > > > > + * @param n > > > > > > + * The number of objects to add in the ring from the obj_table. > > > > > > + * @param behavior > > > > > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items > > from a > > > > ring > > > > > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible > > > > from > > > > > > ring > > > > > > + * @param is_sp > > > > > > + * Indicates whether to use single producer or multi-producer > > > > > > head > > > > update > > > > > > + * @param free_space > > > > > > + * returns the amount of space after the enqueue operation has > > > > finished > > > > > > + * @return > > > > > > + * Actual number of objects enqueued. > > > > > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > > > > > + */ > > > > > > +static __rte_always_inline unsigned int > > > > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const > > obj_table, > > > > > > + unsigned int esize, unsigned int n, > > > > > > + enum rte_ring_queue_behavior behavior, unsigned > > int is_sp, > > > > > > + unsigned int *free_space) > > > > > > > > > > > > I like the idea to add esize as an argument to the public API, so > > > > the compiler can do it's jib optimizing calls with constant esize. > > > > Though I am not very happy with the rest of implementation: > > > > 1. It doesn't really provide configurable elem size - only 4/8/16B > > > > elems are supported. > > > Agree. I was thinking other sizes can be added on need basis. > > > However, I am wondering if we should just provide for 4B and then the > > users can use bulk operations to construct whatever they need? > > > > I suppose it could be plan B... if there would be no agreement on generic > > case. > > And for 4B elems, I guess you do have a particular use-case? > Yes > > > > > > It > > > would mean extra work for the users. > > > > > > > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE > > > > macros. > > > > > > > > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always > > > > does 32B copy per iteration. > > > Yes, I tried to keep it the same as the existing one (originally, I > > > guess the intention was to allow for 256b vector instructions to be > > > generated) > > > > > > > So wonder can we make a generic function that would do 32B copy per > > > > iteration in a main loop, and copy tail by 4B chunks? > > > > That would avoid copy duplication and will allow user to have any > > > > elem size (multiple of 4B) he wants. > > > > Something like that (note didn't test it, just a rough idea): > > > > > > > > static inline void > > > > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, > > > > uint32_t > > > > esize) { > > > > uint32_t i, sz; > > > > > > > > sz = (num * esize) / sizeof(uint32_t); > > > If 'num' is a compile time constant, 'sz' will be a compile time constant. > > Otherwise, this will result in a multiplication operation. > > > > Not always. > > If esize is compile time constant, then for esize as power of 2 > > (4,8,16,...), it > > would be just one shift. > > For other constant values it could be a 'mul' or in many cases just 2 > > shifts plus > > 'add' (if compiler is smart enough). > > I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num << > > 1). > With num * 15 it has to be (num << 3) + (num << 2) + (num << 1) + num > Not sure if the compiler will do this.
For 15, it can be just (num << 4) - num > > > I suppose for non-power of 2 elems it might be ok to get such small perf > > hit. > Agree, should be ok not to focus on right now. > > > > > >I have tried > > > to avoid the multiplication operation and try to use shift and mask > > operations (just like how the rest of the ring code does). > > > > > > > > > > > for (i = 0; i < (sz & ~7); i += 8) > > > > memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t)); > > > I had used memcpy to start with (for the entire copy operation), > > > performance is not the same for 64b elements when compared with the > > existing ring APIs (some cases more and some cases less). > > > > I remember that from one of your previous mails, that's why here I suggest > > to > > use in a loop memcpy() with fixed size. > > That way for each iteration complier will replace memcpy() with instructions > > to copy 32B in a way he thinks is optimal (same as for original macro, I > > think). > I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results are as > follows. The numbers in brackets are with the code on master. > gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0 > > RTE>>ring_perf_elem_autotest > ### Testing single element and burst enq/deq ### > SP/SC single enq/dequeue: 5 > MP/MC single enq/dequeue: 40 (35) > SP/SC burst enq/dequeue (size: 8): 2 > MP/MC burst enq/dequeue (size: 8): 6 > SP/SC burst enq/dequeue (size: 32): 1 (2) > MP/MC burst enq/dequeue (size: 32): 2 > > ### Testing empty dequeue ### > SC empty dequeue: 2.11 > MC empty dequeue: 1.41 (2.11) > > ### Testing using a single lcore ### > SP/SC bulk enq/dequeue (size: 8): 2.15 (2.86) > MP/MC bulk enq/dequeue (size: 8): 6.35 (6.91) > SP/SC bulk enq/dequeue (size: 32): 1.35 (2.06) > MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95) > > ### Testing using two physical cores ### > SP/SC bulk enq/dequeue (size: 8): 73.81 (15.33) > MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27) > SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58) > MP/MC bulk enq/dequeue (size: 32): 25.74 (20.91) > > ### Testing using two NUMA nodes ### > SP/SC bulk enq/dequeue (size: 8): 164.32 (50.66) > MP/MC bulk enq/dequeue (size: 8): 176.02 (173.43) > SP/SC bulk enq/dequeue (size: 32): 50.78 (23) > MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74) > > On one of the Arm platform > MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest are ok) So it shows better numbers for one core, but worse on 2, right? > On another Arm platform, all numbers are same or slightly better. > > I can post the patch with this change if you want to run some benchmarks on > your platform. Sure, please do. I'll try to run on my boxes. > I have not used the same code you have suggested, instead I have used the > same logic in a single macro with memcpy. >