Hi Honnappa, > > > > > > > > Current APIs assume ring elements to be pointers. However, in many > > > > use cases, the size can be different. Add new APIs to support > > > > configurable ring element sizes. > > > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > > > Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com> > > > > Reviewed-by: Gavin Hu <gavin...@arm.com> > > > > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > > > > --- > > > > lib/librte_ring/Makefile | 3 +- > > > > lib/librte_ring/meson.build | 3 + > > > > lib/librte_ring/rte_ring.c | 45 +- > > > > lib/librte_ring/rte_ring.h | 1 + > > > > lib/librte_ring/rte_ring_elem.h | 946 +++++++++++++++++++++++++++ > > > > lib/librte_ring/rte_ring_version.map | 2 + > > > > 6 files changed, 991 insertions(+), 9 deletions(-) create mode > > > > 100644 lib/librte_ring/rte_ring_elem.h > > > > > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > > > > index 21a36770d..515a967bb 100644 > > > > --- a/lib/librte_ring/Makefile > > > > +++ b/lib/librte_ring/Makefile > > > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name > > > > LIB = librte_ring.a > > > > > > > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 > > > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 - > > > > DALLOW_EXPERIMENTAL_API > > > > LDLIBS += -lrte_eal > > > > > > > > EXPORT_MAP := rte_ring_version.map > > > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > > > > > > > # install includes > > > > SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > > > > + rte_ring_elem.h \ > > > > rte_ring_generic.h \ > > > > rte_ring_c11_mem.h > > > > > > > > diff --git a/lib/librte_ring/meson.build > > > > b/lib/librte_ring/meson.build index ab8b0b469..74219840a 100644 > > > > --- a/lib/librte_ring/meson.build > > > > +++ b/lib/librte_ring/meson.build > > > > @@ -6,3 +6,6 @@ sources = files('rte_ring.c') headers = > > > > files('rte_ring.h', > > > > 'rte_ring_c11_mem.h', > > > > 'rte_ring_generic.h') > > > > + > > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are > > > > +experimental allow_experimental_apis = true > > > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c > > > > index d9b308036..6fed3648b 100644 > > > > --- a/lib/librte_ring/rte_ring.c > > > > +++ b/lib/librte_ring/rte_ring.c > > > > @@ -33,6 +33,7 @@ > > > > #include <rte_tailq.h> > > > > > > > > #include "rte_ring.h" > > > > +#include "rte_ring_elem.h" > > > > > > > > TAILQ_HEAD(rte_ring_list, rte_tailq_entry); > > > > > > > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > > > > > > > > /* return the size of memory occupied by a ring */ ssize_t - > > > > rte_ring_get_memsize(unsigned count) > > > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize) > > > > { > > > > ssize_t sz; > > > > > > > > + /* Supported esize values are 4/8/16. > > > > + * Others can be added on need basis. > > > > + */ > > > > + if ((esize != 4) && (esize != 8) && (esize != 16)) { > > > > + RTE_LOG(ERR, RING, > > > > + "Unsupported esize value. Supported values are > > > > 4, 8 > > > > and 16\n"); > > > > + > > > > + return -EINVAL; > > > > + } > > > > + > > > > /* count must be a power of 2 */ > > > > if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { > > > > RTE_LOG(ERR, RING, > > > > - "Requested size is invalid, must be power of 2, > > > > and " > > > > - "do not exceed the size limit %u\n", > > > > RTE_RING_SZ_MASK); > > > > + "Requested number of elements is invalid, must > > > > be " > > > > + "power of 2, and do not exceed the limit %u\n", > > > > + RTE_RING_SZ_MASK); > > > > + > > > > return -EINVAL; > > > > } > > > > > > > > - sz = sizeof(struct rte_ring) + count * sizeof(void *); > > > > + sz = sizeof(struct rte_ring) + count * esize; > > > > sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > > > > return sz; > > > > } > > > > > > > > +/* return the size of memory occupied by a ring */ ssize_t > > > > +rte_ring_get_memsize(unsigned count) { > > > > + return rte_ring_get_memsize_elem(count, sizeof(void *)); } > > > > + > > > > void > > > > rte_ring_reset(struct rte_ring *r) > > > > { > > > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char > > > > *name, unsigned count, > > > > return 0; > > > > } > > > > > > > > -/* create the ring */ > > > > +/* create the ring for a given element size */ > > > > struct rte_ring * > > > > -rte_ring_create(const char *name, unsigned count, int socket_id, > > > > - unsigned flags) > > > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize, > > > > + int socket_id, unsigned flags) > > > > { > > > > char mz_name[RTE_MEMZONE_NAMESIZE]; > > > > struct rte_ring *r; > > > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned > > > > count, int socket_id, > > > > if (flags & RING_F_EXACT_SZ) > > > > count = rte_align32pow2(count + 1); > > > > > > > > - ring_size = rte_ring_get_memsize(count); > > > > + ring_size = rte_ring_get_memsize_elem(count, esize); > > > > if (ring_size < 0) { > > > > rte_errno = ring_size; > > > > return NULL; > > > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned > > > > count, int socket_id, > > > > return r; > > > > } > > > > > > > > +/* create the ring */ > > > > +struct rte_ring * > > > > +rte_ring_create(const char *name, unsigned count, int socket_id, > > > > + unsigned flags) > > > > +{ > > > > + return rte_ring_create_elem(name, count, sizeof(void *), > > > > socket_id, > > > > + flags); > > > > +} > > > > + > > > > /* free the ring */ > > > > void > > > > rte_ring_free(struct rte_ring *r) > > > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > > > > index > > > > 2a9f768a1..18fc5d845 100644 > > > > --- a/lib/librte_ring/rte_ring.h > > > > +++ b/lib/librte_ring/rte_ring.h > > > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char > > > > *name, unsigned count, > > > > */ > > > > struct rte_ring *rte_ring_create(const char *name, unsigned count, > > > > int socket_id, unsigned flags); > > > > + > > > > /** > > > > * De-allocate all memory used by the ring. > > > > * > > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index > > > > 000000000..860f059ad > > > > --- /dev/null > > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > > @@ -0,0 +1,946 @@ > > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > > + * > > > > + * Copyright (c) 2019 Arm Limited > > > > + * Copyright (c) 2010-2017 Intel Corporation > > > > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > > > > + * All rights reserved. > > > > + * Derived from FreeBSD's bufring.h > > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > > + */ > > > > + > > > > +#ifndef _RTE_RING_ELEM_H_ > > > > +#define _RTE_RING_ELEM_H_ > > > > + > > > > +/** > > > > + * @file > > > > + * RTE Ring with flexible element size */ > > > > + > > > > +#ifdef __cplusplus > > > > +extern "C" { > > > > +#endif > > > > + > > > > +#include <stdio.h> > > > > +#include <stdint.h> > > > > +#include <sys/queue.h> > > > > +#include <errno.h> > > > > +#include <rte_common.h> > > > > +#include <rte_config.h> > > > > +#include <rte_memory.h> > > > > +#include <rte_lcore.h> > > > > +#include <rte_atomic.h> > > > > +#include <rte_branch_prediction.h> > > > > +#include <rte_memzone.h> > > > > +#include <rte_pause.h> > > > > + > > > > +#include "rte_ring.h" > > > > + > > > > +/** > > > > + * @warning > > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > > + * > > > > + * Calculate the memory size needed for a ring with given element > > > > +size > > > > + * > > > > + * This function returns the number of bytes needed for a ring, > > > > +given > > > > + * the number of elements in it and the size of the element. This > > > > +value > > > > + * is the sum of the size of the structure rte_ring and the size of > > > > +the > > > > + * memory needed for storing the elements. The value is aligned to > > > > +a cache > > > > + * line size. > > > > + * > > > > + * @param count > > > > + * The number of elements in the ring (must be a power of 2). > > > > + * @param esize > > > > + * The size of ring element, in bytes. It must be a multiple of 4. > > > > + * Currently, sizes 4, 8 and 16 are supported. > > > > + * @return > > > > + * - The memory size needed for the ring on success. > > > > + * - -EINVAL if count is not a power of 2. > > > > + */ > > > > +__rte_experimental > > > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize); > > > > + > > > > +/** > > > > + * @warning > > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > > + * > > > > + * Create a new ring named *name* that stores elements with given size. > > > > + * > > > > + * This function uses ``memzone_reserve()`` to allocate memory. > > > > +Then it > > > > + * calls rte_ring_init() to initialize an empty ring. > > > > + * > > > > + * The new ring size is set to *count*, which must be a power of > > > > + * two. Water marking is disabled by default. The real usable ring > > > > +size > > > > + * is *count-1* instead of *count* to differentiate a free ring > > > > +from an > > > > + * empty ring. > > > > + * > > > > + * The ring is added in RTE_TAILQ_RING list. > > > > + * > > > > + * @param name > > > > + * The name of the ring. > > > > + * @param count > > > > + * The number of elements in the ring (must be a power of 2). > > > > + * @param esize > > > > + * The size of ring element, in bytes. It must be a multiple of 4. > > > > + * Currently, sizes 4, 8 and 16 are supported. > > > > + * @param socket_id > > > > + * The *socket_id* argument is the socket identifier in case of > > > > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > > > > + * constraint for the reserved zone. > > > > + * @param flags > > > > + * An OR of the following: > > > > + * - RING_F_SP_ENQ: If this flag is set, the default behavior when > > > > + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` > > > > + * is "single-producer". Otherwise, it is "multi-producers". > > > > + * - RING_F_SC_DEQ: If this flag is set, the default behavior when > > > > + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > > > > + * is "single-consumer". Otherwise, it is "multi-consumers". > > > > + * @return > > > > + * On success, the pointer to the new allocated ring. NULL on error > > > > with > > > > + * rte_errno set appropriately. Possible errno values include: > > > > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > > > > structure > > > > + * - E_RTE_SECONDARY - function was called from a secondary process > > > > instance > > > > + * - EINVAL - count provided is not a power of 2 > > > > + * - ENOSPC - the maximum number of memzones has already been > > > > allocated > > > > + * - EEXIST - a memzone with the same name already exists > > > > + * - ENOMEM - no appropriate memory area found in which to create > > > > memzone > > > > + */ > > > > +__rte_experimental > > > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count, > > > > + unsigned esize, int socket_id, unsigned > > > > flags); > > > > + > > > > +/* the actual enqueue of pointers on the ring. > > > > + * Placed here since identical code needed in both > > > > + * single and multi producer enqueue functions. > > > > + */ > > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, > > > > +esize, n) > > > > do { \ > > > > + if (esize == 4) \ > > > > + ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, > > > > n); \ > > > > + else if (esize == 8) \ > > > > + ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, > > > > n); \ > > > > + else if (esize == 16) \ > > > > + ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, > > > > n); > > \ } > > > > while > > > > +(0) > > > > + > > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \ > > > > + unsigned int i; \ > > > > + const uint32_t size = (r)->size; \ > > > > + uint32_t idx = prod_head & (r)->mask; \ > > > > + uint32_t *ring = (uint32_t *)ring_start; \ > > > > + uint32_t *obj = (uint32_t *)obj_table; \ > > > > + if (likely(idx + n < size)) { \ > > > > + for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx > > > > += 8) { \ > > > > + ring[idx] = obj[i]; \ > > > > + ring[idx + 1] = obj[i + 1]; \ > > > > + ring[idx + 2] = obj[i + 2]; \ > > > > + ring[idx + 3] = obj[i + 3]; \ > > > > + ring[idx + 4] = obj[i + 4]; \ > > > > + ring[idx + 5] = obj[i + 5]; \ > > > > + ring[idx + 6] = obj[i + 6]; \ > > > > + ring[idx + 7] = obj[i + 7]; \ > > > > + } \ > > > > + switch (n & 0x7) { \ > > > > + case 7: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 6: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 5: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 4: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 3: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 2: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 1: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + } \ > > > > + } else { \ > > > > + for (i = 0; idx < size; i++, idx++)\ > > > > + ring[idx] = obj[i]; \ > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > + ring[idx] = obj[i]; \ > > > > + } \ > > > > +} while (0) > > > > + > > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \ > > > > + unsigned int i; \ > > > > + const uint32_t size = (r)->size; \ > > > > + uint32_t idx = prod_head & (r)->mask; \ > > > > + uint64_t *ring = (uint64_t *)ring_start; \ > > > > + uint64_t *obj = (uint64_t *)obj_table; \ > > > > + if (likely(idx + n < size)) { \ > > > > + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx > > > > += 4) { \ > > > > + ring[idx] = obj[i]; \ > > > > + ring[idx + 1] = obj[i + 1]; \ > > > > + ring[idx + 2] = obj[i + 2]; \ > > > > + ring[idx + 3] = obj[i + 3]; \ > > > > + } \ > > > > + switch (n & 0x3) { \ > > > > + case 3: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 2: \ > > > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > > > + case 1: \ > > > > + ring[idx++] = obj[i++]; \ > > > > + } \ > > > > + } else { \ > > > > + for (i = 0; idx < size; i++, idx++)\ > > > > + ring[idx] = obj[i]; \ > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > + ring[idx] = obj[i]; \ > > > > + } \ > > > > +} while (0) > > > > + > > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do > > { \ > > > > + unsigned int i; \ > > > > + const uint32_t size = (r)->size; \ > > > > + uint32_t idx = prod_head & (r)->mask; \ > > > > + __uint128_t *ring = (__uint128_t *)ring_start; \ > > > > + __uint128_t *obj = (__uint128_t *)obj_table; \ > > > > + if (likely(idx + n < size)) { \ > > > > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > > > > + ring[idx] = obj[i]; \ > > > > + ring[idx + 1] = obj[i + 1]; \ > > > > + } \ > > > > + switch (n & 0x1) { \ > > > > + case 1: \ > > > > + ring[idx++] = obj[i++]; \ > > > > + } \ > > > > + } else { \ > > > > + for (i = 0; idx < size; i++, idx++)\ > > > > + ring[idx] = obj[i]; \ > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > + ring[idx] = obj[i]; \ > > > > + } \ > > > > +} while (0) > > > > + > > > > +/* the actual copy of pointers on the ring to obj_table. > > > > + * Placed here since identical code needed in both > > > > + * single and multi consumer dequeue functions. > > > > + */ > > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, > > > > +esize, n) > > > > do { \ > > > > + if (esize == 4) \ > > > > + DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, > > > > n); \ > > > > + else if (esize == 8) \ > > > > + DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, > > > > n); \ > > > > + else if (esize == 16) \ > > > > + DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, > > > > n); > > \ } > > > > while > > > > +(0) > > > > + > > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \ > > > > + unsigned int i; \ > > > > + uint32_t idx = cons_head & (r)->mask; \ > > > > + const uint32_t size = (r)->size; \ > > > > + uint32_t *ring = (uint32_t *)ring_start; \ > > > > + uint32_t *obj = (uint32_t *)obj_table; \ > > > > + if (likely(idx + n < size)) { \ > > > > + for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += > > > > 8) {\ > > > > + obj[i] = ring[idx]; \ > > > > + obj[i + 1] = ring[idx + 1]; \ > > > > + obj[i + 2] = ring[idx + 2]; \ > > > > + obj[i + 3] = ring[idx + 3]; \ > > > > + obj[i + 4] = ring[idx + 4]; \ > > > > + obj[i + 5] = ring[idx + 5]; \ > > > > + obj[i + 6] = ring[idx + 6]; \ > > > > + obj[i + 7] = ring[idx + 7]; \ > > > > + } \ > > > > + switch (n & 0x7) { \ > > > > + case 7: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 6: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 5: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 4: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 3: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 2: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 1: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + } \ > > > > + } else { \ > > > > + for (i = 0; idx < size; i++, idx++) \ > > > > + obj[i] = ring[idx]; \ > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > + obj[i] = ring[idx]; \ > > > > + } \ > > > > +} while (0) > > > > + > > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \ > > > > + unsigned int i; \ > > > > + uint32_t idx = cons_head & (r)->mask; \ > > > > + const uint32_t size = (r)->size; \ > > > > + uint64_t *ring = (uint64_t *)ring_start; \ > > > > + uint64_t *obj = (uint64_t *)obj_table; \ > > > > + if (likely(idx + n < size)) { \ > > > > + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += > > > > 4) {\ > > > > + obj[i] = ring[idx]; \ > > > > + obj[i + 1] = ring[idx + 1]; \ > > > > + obj[i + 2] = ring[idx + 2]; \ > > > > + obj[i + 3] = ring[idx + 3]; \ > > > > + } \ > > > > + switch (n & 0x3) { \ > > > > + case 3: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 2: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + case 1: \ > > > > + obj[i++] = ring[idx++]; \ > > > > + } \ > > > > + } else { \ > > > > + for (i = 0; idx < size; i++, idx++) \ > > > > + obj[i] = ring[idx]; \ > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > + obj[i] = ring[idx]; \ > > > > + } \ > > > > +} while (0) > > > > + > > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do > > { \ > > > > + unsigned int i; \ > > > > + uint32_t idx = cons_head & (r)->mask; \ > > > > + const uint32_t size = (r)->size; \ > > > > + __uint128_t *ring = (__uint128_t *)ring_start; \ > > > > + __uint128_t *obj = (__uint128_t *)obj_table; \ > > > > + if (likely(idx + n < size)) { \ > > > > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > > > > + obj[i] = ring[idx]; \ > > > > + obj[i + 1] = ring[idx + 1]; \ > > > > + } \ > > > > + switch (n & 0x1) { \ > > > > + case 1: \ > > > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > > > + } \ > > > > + } else { \ > > > > + for (i = 0; idx < size; i++, idx++) \ > > > > + obj[i] = ring[idx]; \ > > > > + for (idx = 0; i < n; i++, idx++) \ > > > > + obj[i] = ring[idx]; \ > > > > + } \ > > > > +} while (0) > > > > + > > > > +/* Between load and load. there might be cpu reorder in weak model > > > > + * (powerpc/arm). > > > > + * There are 2 choices for the users > > > > + * 1.use rmb() memory barrier > > > > + * 2.use one-direction load_acquire/store_release barrier,defined > > > > +by > > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y > > > > + * It depends on performance test results. > > > > + * By default, move common functions to rte_ring_generic.h */ > > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h" > > > > +#else > > > > +#include "rte_ring_generic.h" > > > > +#endif > > > > + > > > > +/** > > > > + * @internal Enqueue several objects on the ring > > > > + * > > > > + * @param r > > > > + * A pointer to the ring structure. > > > > + * @param obj_table > > > > + * A pointer to a table of void * pointers (objects). > > > > + * @param esize > > > > + * The size of ring element, in bytes. It must be a multiple of 4. > > > > + * Currently, sizes 4, 8 and 16 are supported. This should be the > > > > same > > > > + * as passed while creating the ring, otherwise the results are > > > > undefined. > > > > + * @param n > > > > + * The number of objects to add in the ring from the obj_table. > > > > + * @param behavior > > > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a > > ring > > > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible > > from > > > > ring > > > > + * @param is_sp > > > > + * Indicates whether to use single producer or multi-producer head > > update > > > > + * @param free_space > > > > + * returns the amount of space after the enqueue operation has > > finished > > > > + * @return > > > > + * Actual number of objects enqueued. > > > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > > > + */ > > > > +static __rte_always_inline unsigned int > > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table, > > > > + unsigned int esize, unsigned int n, > > > > + enum rte_ring_queue_behavior behavior, unsigned int > > > > is_sp, > > > > + unsigned int *free_space) > > > > > > I like the idea to add esize as an argument to the public API, so the > > compiler > > can do it's jib optimizing calls with constant esize. > > Though I am not very happy with the rest of implementation: > > 1. It doesn't really provide configurable elem size - only 4/8/16B elems are > > supported. > Agree. I was thinking other sizes can be added on need basis. > However, I am wondering if we should just provide for 4B and then the users > can use bulk operations to construct whatever they need?
I suppose it could be plan B... if there would be no agreement on generic case. And for 4B elems, I guess you do have a particular use-case? > It > would mean extra work for the users. > > > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE > > macros. > > > > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does > > 32B copy per iteration. > Yes, I tried to keep it the same as the existing one (originally, I guess the > intention was to allow for 256b vector instructions to be > generated) > > > So wonder can we make a generic function that would do 32B copy per > > iteration in a main loop, and copy tail by 4B chunks? > > That would avoid copy duplication and will allow user to have any elem size > > (multiple of 4B) he wants. > > Something like that (note didn't test it, just a rough idea): > > > > static inline void > > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t > > esize) { > > uint32_t i, sz; > > > > sz = (num * esize) / sizeof(uint32_t); > If 'num' is a compile time constant, 'sz' will be a compile time constant. > Otherwise, this will result in a multiplication operation. Not always. If esize is compile time constant, then for esize as power of 2 (4,8,16,...), it would be just one shift. For other constant values it could be a 'mul' or in many cases just 2 shifts plus 'add' (if compiler is smart enough). I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num << 1). I suppose for non-power of 2 elems it might be ok to get such small perf hit. >I have tried > to avoid the multiplication operation and try to use shift and mask > operations (just like how the rest of the ring code does). > > > > > for (i = 0; i < (sz & ~7); i += 8) > > memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t)); > I had used memcpy to start with (for the entire copy operation), performance > is not the same for 64b elements when compared with the > existing ring APIs (some cases more and some cases less). I remember that from one of your previous mails, that's why here I suggest to use in a loop memcpy() with fixed size. That way for each iteration complier will replace memcpy() with instructions to copy 32B in a way he thinks is optimal (same as for original macro, I think). > > IMO, we have to keep the performance of the 64b and 128b the same as what we > get with the existing ring and event-ring APIs. That would > allow us to replace them with these new APIs. I suggest that we keep the > macros in this patch for 64b and 128b. I still think we probably can achieve that without duplicating macros, while still supporting arbitrary elem size. See above. > For the rest of the sizes, we could put a for loop around 32b macro (this > would allow for all sizes as well). > > > > > switch (sz & 7) { > > case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */ > > case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */ > > case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */ > > case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */ > > case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */ > > case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */ > > case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */ > > } > > } > > > > static inline void > > enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head, > > void *obj_table, uint32_t num, uint32_t esize) { > > uint32_t idx, n; > > uint32_t *du32; > > > > const uint32_t size = r->size; > > > > idx = prod_head & (r)->mask; > > > > du32 = ring_start + idx * sizeof(uint32_t); > > > > if (idx + num < size) > > copy_elems(du32, obj_table, num, esize); > > else { > > n = size - idx; > > copy_elems(du32, obj_table, n, esize); > > copy_elems(ring_start, obj_table + n * sizeof(uint32_t), > > num - n, esize); > > } > > } > > > > And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just: > > > > enqueue_elems(r, &r[1], prod_head, obj_table, n, esize); > > > > > > > > +{ > > > > + uint32_t prod_head, prod_next; > > > > + uint32_t free_entries; > > > > + > > > > + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, > > > > + &prod_head, &prod_next, &free_entries); > > > > + if (n == 0) > > > > + goto end; > > > > + > > > > + ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n); > > > > + > > > > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); > > > > +end: > > > > + if (free_space != NULL) > > > > + *free_space = free_entries - n; > > > > + return n; > > > > +} > > > > +