> > > > Current APIs assume ring elements to be pointers. However, in many use > > cases, > > the size can be different. Add new APIs to support configurable ring element > > sizes. > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com> > > Reviewed-by: Gavin Hu <gavin...@arm.com> > > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > > --- > > lib/librte_ring/Makefile | 3 +- > > lib/librte_ring/meson.build | 3 + > > lib/librte_ring/rte_ring.c | 45 +- > > lib/librte_ring/rte_ring.h | 1 + > > lib/librte_ring/rte_ring_elem.h | 946 +++++++++++++++++++++++++++ > > lib/librte_ring/rte_ring_version.map | 2 + > > 6 files changed, 991 insertions(+), 9 deletions(-) create mode 100644 > > lib/librte_ring/rte_ring_elem.h > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > > 21a36770d..515a967bb 100644 > > --- a/lib/librte_ring/Makefile > > +++ b/lib/librte_ring/Makefile > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = > > librte_ring.a > > > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 - > > DALLOW_EXPERIMENTAL_API > > LDLIBS += -lrte_eal > > > > EXPORT_MAP := rte_ring_version.map > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > > > # install includes > > SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > > + rte_ring_elem.h \ > > rte_ring_generic.h \ > > rte_ring_c11_mem.h > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index > > ab8b0b469..74219840a 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -6,3 +6,6 @@ sources = files('rte_ring.c') headers = files('rte_ring.h', > > 'rte_ring_c11_mem.h', > > 'rte_ring_generic.h') > > + > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental > > +allow_experimental_apis = true > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index > > d9b308036..6fed3648b 100644 > > --- a/lib/librte_ring/rte_ring.c > > +++ b/lib/librte_ring/rte_ring.c > > @@ -33,6 +33,7 @@ > > #include <rte_tailq.h> > > > > #include "rte_ring.h" > > +#include "rte_ring_elem.h" > > > > TAILQ_HEAD(rte_ring_list, rte_tailq_entry); > > > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > > > > /* return the size of memory occupied by a ring */ ssize_t - > > rte_ring_get_memsize(unsigned count) > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize) > > { > > ssize_t sz; > > > > + /* Supported esize values are 4/8/16. > > + * Others can be added on need basis. > > + */ > > + if ((esize != 4) && (esize != 8) && (esize != 16)) { > > + RTE_LOG(ERR, RING, > > + "Unsupported esize value. Supported values are 4, 8 > > and 16\n"); > > + > > + return -EINVAL; > > + } > > + > > /* count must be a power of 2 */ > > if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { > > RTE_LOG(ERR, RING, > > - "Requested size is invalid, must be power of 2, and " > > - "do not exceed the size limit %u\n", > > RTE_RING_SZ_MASK); > > + "Requested number of elements is invalid, must be " > > + "power of 2, and do not exceed the limit %u\n", > > + RTE_RING_SZ_MASK); > > + > > return -EINVAL; > > } > > > > - sz = sizeof(struct rte_ring) + count * sizeof(void *); > > + sz = sizeof(struct rte_ring) + count * esize; > > sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > > return sz; > > } > > > > +/* return the size of memory occupied by a ring */ ssize_t > > +rte_ring_get_memsize(unsigned count) { > > + return rte_ring_get_memsize_elem(count, sizeof(void *)); } > > + > > void > > rte_ring_reset(struct rte_ring *r) > > { > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char *name, > > unsigned count, > > return 0; > > } > > > > -/* create the ring */ > > +/* create the ring for a given element size */ > > struct rte_ring * > > -rte_ring_create(const char *name, unsigned count, int socket_id, > > - unsigned flags) > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize, > > + int socket_id, unsigned flags) > > { > > char mz_name[RTE_MEMZONE_NAMESIZE]; > > struct rte_ring *r; > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned count, > > int socket_id, > > if (flags & RING_F_EXACT_SZ) > > count = rte_align32pow2(count + 1); > > > > - ring_size = rte_ring_get_memsize(count); > > + ring_size = rte_ring_get_memsize_elem(count, esize); > > if (ring_size < 0) { > > rte_errno = ring_size; > > return NULL; > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned count, > > int socket_id, > > return r; > > } > > > > +/* create the ring */ > > +struct rte_ring * > > +rte_ring_create(const char *name, unsigned count, int socket_id, > > + unsigned flags) > > +{ > > + return rte_ring_create_elem(name, count, sizeof(void *), socket_id, > > + flags); > > +} > > + > > /* free the ring */ > > void > > rte_ring_free(struct rte_ring *r) > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index > > 2a9f768a1..18fc5d845 100644 > > --- a/lib/librte_ring/rte_ring.h > > +++ b/lib/librte_ring/rte_ring.h > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *name, > > unsigned count, > > */ > > struct rte_ring *rte_ring_create(const char *name, unsigned count, > > int socket_id, unsigned flags); > > + > > /** > > * De-allocate all memory used by the ring. > > * > > diff --git a/lib/librte_ring/rte_ring_elem.h > > b/lib/librte_ring/rte_ring_elem.h > > new file mode 100644 index 000000000..860f059ad > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -0,0 +1,946 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2019 Arm Limited > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_ELEM_H_ > > +#define _RTE_RING_ELEM_H_ > > + > > +/** > > + * @file > > + * RTE Ring with flexible element size > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <stdio.h> > > +#include <stdint.h> > > +#include <sys/queue.h> > > +#include <errno.h> > > +#include <rte_common.h> > > +#include <rte_config.h> > > +#include <rte_memory.h> > > +#include <rte_lcore.h> > > +#include <rte_atomic.h> > > +#include <rte_branch_prediction.h> > > +#include <rte_memzone.h> > > +#include <rte_pause.h> > > + > > +#include "rte_ring.h" > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Calculate the memory size needed for a ring with given element size > > + * > > + * This function returns the number of bytes needed for a ring, given > > + * the number of elements in it and the size of the element. This value > > + * is the sum of the size of the structure rte_ring and the size of the > > + * memory needed for storing the elements. The value is aligned to a > > +cache > > + * line size. > > + * > > + * @param count > > + * The number of elements in the ring (must be a power of 2). > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * Currently, sizes 4, 8 and 16 are supported. > > + * @return > > + * - The memory size needed for the ring on success. > > + * - -EINVAL if count is not a power of 2. > > + */ > > +__rte_experimental > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Create a new ring named *name* that stores elements with given size. > > + * > > + * This function uses ``memzone_reserve()`` to allocate memory. Then it > > + * calls rte_ring_init() to initialize an empty ring. > > + * > > + * The new ring size is set to *count*, which must be a power of > > + * two. Water marking is disabled by default. The real usable ring size > > + * is *count-1* instead of *count* to differentiate a free ring from an > > + * empty ring. > > + * > > + * The ring is added in RTE_TAILQ_RING list. > > + * > > + * @param name > > + * The name of the ring. > > + * @param count > > + * The number of elements in the ring (must be a power of 2). > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * Currently, sizes 4, 8 and 16 are supported. > > + * @param socket_id > > + * The *socket_id* argument is the socket identifier in case of > > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > > + * constraint for the reserved zone. > > + * @param flags > > + * An OR of the following: > > + * - RING_F_SP_ENQ: If this flag is set, the default behavior when > > + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` > > + * is "single-producer". Otherwise, it is "multi-producers". > > + * - RING_F_SC_DEQ: If this flag is set, the default behavior when > > + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > > + * is "single-consumer". Otherwise, it is "multi-consumers". > > + * @return > > + * On success, the pointer to the new allocated ring. NULL on error with > > + * rte_errno set appropriately. Possible errno values include: > > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > > structure > > + * - E_RTE_SECONDARY - function was called from a secondary process > > instance > > + * - EINVAL - count provided is not a power of 2 > > + * - ENOSPC - the maximum number of memzones has already been > > allocated > > + * - EEXIST - a memzone with the same name already exists > > + * - ENOMEM - no appropriate memory area found in which to create > > memzone > > + */ > > +__rte_experimental > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count, > > + unsigned esize, int socket_id, unsigned flags); > > + > > +/* the actual enqueue of pointers on the ring. > > + * Placed here since identical code needed in both > > + * single and multi producer enqueue functions. > > + */ > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, n) > > do { \ > > + if (esize == 4) \ > > + ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \ > > + else if (esize == 8) \ > > + ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \ > > + else if (esize == 16) \ > > + ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \ } > > while > > +(0) > > + > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \ > > + unsigned int i; \ > > + const uint32_t size = (r)->size; \ > > + uint32_t idx = prod_head & (r)->mask; \ > > + uint32_t *ring = (uint32_t *)ring_start; \ > > + uint32_t *obj = (uint32_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \ > > + ring[idx] = obj[i]; \ > > + ring[idx + 1] = obj[i + 1]; \ > > + ring[idx + 2] = obj[i + 2]; \ > > + ring[idx + 3] = obj[i + 3]; \ > > + ring[idx + 4] = obj[i + 4]; \ > > + ring[idx + 5] = obj[i + 5]; \ > > + ring[idx + 6] = obj[i + 6]; \ > > + ring[idx + 7] = obj[i + 7]; \ > > + } \ > > + switch (n & 0x7) { \ > > + case 7: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 6: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 5: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 4: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 3: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 2: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 1: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + } \ > > + } else { \ > > + for (i = 0; idx < size; i++, idx++)\ > > + ring[idx] = obj[i]; \ > > + for (idx = 0; i < n; i++, idx++) \ > > + ring[idx] = obj[i]; \ > > + } \ > > +} while (0) > > + > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \ > > + unsigned int i; \ > > + const uint32_t size = (r)->size; \ > > + uint32_t idx = prod_head & (r)->mask; \ > > + uint64_t *ring = (uint64_t *)ring_start; \ > > + uint64_t *obj = (uint64_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \ > > + ring[idx] = obj[i]; \ > > + ring[idx + 1] = obj[i + 1]; \ > > + ring[idx + 2] = obj[i + 2]; \ > > + ring[idx + 3] = obj[i + 3]; \ > > + } \ > > + switch (n & 0x3) { \ > > + case 3: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 2: \ > > + ring[idx++] = obj[i++]; /* fallthrough */ \ > > + case 1: \ > > + ring[idx++] = obj[i++]; \ > > + } \ > > + } else { \ > > + for (i = 0; idx < size; i++, idx++)\ > > + ring[idx] = obj[i]; \ > > + for (idx = 0; i < n; i++, idx++) \ > > + ring[idx] = obj[i]; \ > > + } \ > > +} while (0) > > + > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { \ > > + unsigned int i; \ > > + const uint32_t size = (r)->size; \ > > + uint32_t idx = prod_head & (r)->mask; \ > > + __uint128_t *ring = (__uint128_t *)ring_start; \ > > + __uint128_t *obj = (__uint128_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > > + ring[idx] = obj[i]; \ > > + ring[idx + 1] = obj[i + 1]; \ > > + } \ > > + switch (n & 0x1) { \ > > + case 1: \ > > + ring[idx++] = obj[i++]; \ > > + } \ > > + } else { \ > > + for (i = 0; idx < size; i++, idx++)\ > > + ring[idx] = obj[i]; \ > > + for (idx = 0; i < n; i++, idx++) \ > > + ring[idx] = obj[i]; \ > > + } \ > > +} while (0) > > + > > +/* the actual copy of pointers on the ring to obj_table. > > + * Placed here since identical code needed in both > > + * single and multi consumer dequeue functions. > > + */ > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, n) > > do { \ > > + if (esize == 4) \ > > + DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \ > > + else if (esize == 8) \ > > + DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \ > > + else if (esize == 16) \ > > + DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \ } > > while > > +(0) > > + > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \ > > + unsigned int i; \ > > + uint32_t idx = cons_head & (r)->mask; \ > > + const uint32_t size = (r)->size; \ > > + uint32_t *ring = (uint32_t *)ring_start; \ > > + uint32_t *obj = (uint32_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\ > > + obj[i] = ring[idx]; \ > > + obj[i + 1] = ring[idx + 1]; \ > > + obj[i + 2] = ring[idx + 2]; \ > > + obj[i + 3] = ring[idx + 3]; \ > > + obj[i + 4] = ring[idx + 4]; \ > > + obj[i + 5] = ring[idx + 5]; \ > > + obj[i + 6] = ring[idx + 6]; \ > > + obj[i + 7] = ring[idx + 7]; \ > > + } \ > > + switch (n & 0x7) { \ > > + case 7: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 6: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 5: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 4: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 3: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 2: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 1: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + } \ > > + } else { \ > > + for (i = 0; idx < size; i++, idx++) \ > > + obj[i] = ring[idx]; \ > > + for (idx = 0; i < n; i++, idx++) \ > > + obj[i] = ring[idx]; \ > > + } \ > > +} while (0) > > + > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \ > > + unsigned int i; \ > > + uint32_t idx = cons_head & (r)->mask; \ > > + const uint32_t size = (r)->size; \ > > + uint64_t *ring = (uint64_t *)ring_start; \ > > + uint64_t *obj = (uint64_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\ > > + obj[i] = ring[idx]; \ > > + obj[i + 1] = ring[idx + 1]; \ > > + obj[i + 2] = ring[idx + 2]; \ > > + obj[i + 3] = ring[idx + 3]; \ > > + } \ > > + switch (n & 0x3) { \ > > + case 3: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 2: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + case 1: \ > > + obj[i++] = ring[idx++]; \ > > + } \ > > + } else { \ > > + for (i = 0; idx < size; i++, idx++) \ > > + obj[i] = ring[idx]; \ > > + for (idx = 0; i < n; i++, idx++) \ > > + obj[i] = ring[idx]; \ > > + } \ > > +} while (0) > > + > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { \ > > + unsigned int i; \ > > + uint32_t idx = cons_head & (r)->mask; \ > > + const uint32_t size = (r)->size; \ > > + __uint128_t *ring = (__uint128_t *)ring_start; \ > > + __uint128_t *obj = (__uint128_t *)obj_table; \ > > + if (likely(idx + n < size)) { \ > > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > > + obj[i] = ring[idx]; \ > > + obj[i + 1] = ring[idx + 1]; \ > > + } \ > > + switch (n & 0x1) { \ > > + case 1: \ > > + obj[i++] = ring[idx++]; /* fallthrough */ \ > > + } \ > > + } else { \ > > + for (i = 0; idx < size; i++, idx++) \ > > + obj[i] = ring[idx]; \ > > + for (idx = 0; i < n; i++, idx++) \ > > + obj[i] = ring[idx]; \ > > + } \ > > +} while (0) > > + > > +/* Between load and load. there might be cpu reorder in weak model > > + * (powerpc/arm). > > + * There are 2 choices for the users > > + * 1.use rmb() memory barrier > > + * 2.use one-direction load_acquire/store_release barrier,defined by > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y > > + * It depends on performance test results. > > + * By default, move common functions to rte_ring_generic.h */ #ifdef > > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h" > > +#else > > +#include "rte_ring_generic.h" > > +#endif > > + > > +/** > > + * @internal Enqueue several objects on the ring > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > > + * as passed while creating the ring, otherwise the results are > > undefined. > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param is_sp > > + * Indicates whether to use single producer or multi-producer head update > > + * @param free_space > > + * returns the amount of space after the enqueue operation has finished > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table, > > + unsigned int esize, unsigned int n, > > + enum rte_ring_queue_behavior behavior, unsigned int is_sp, > > + unsigned int *free_space)
I like the idea to add esize as an argument to the public API, so the compiler can do it's jib optimizing calls with constant esize. Though I am not very happy with the rest of implementation: 1. It doesn't really provide configurable elem size - only 4/8/16B elems are supported. 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE macros. Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does 32B copy per iteration. So wonder can we make a generic function that would do 32B copy per iteration in a main loop, and copy tail by 4B chunks? That would avoid copy duplication and will allow user to have any elem size (multiple of 4B) he wants. Something like that (note didn't test it, just a rough idea): static inline void copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t esize) { uint32_t i, sz; sz = (num * esize) / sizeof(uint32_t); for (i = 0; i < (sz & ~7); i += 8) memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t)); switch (sz & 7) { case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */ case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */ case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */ case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */ case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */ case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */ case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */ } } static inline void enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head, void *obj_table, uint32_t num, uint32_t esize) { uint32_t idx, n; uint32_t *du32; const uint32_t size = r->size; idx = prod_head & (r)->mask; du32 = ring_start + idx * sizeof(uint32_t); if (idx + num < size) copy_elems(du32, obj_table, num, esize); else { n = size - idx; copy_elems(du32, obj_table, n, esize); copy_elems(ring_start, obj_table + n * sizeof(uint32_t), num - n, esize); } } And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just: enqueue_elems(r, &r[1], prod_head, obj_table, n, esize); > > +{ > > + uint32_t prod_head, prod_next; > > + uint32_t free_entries; > > + > > + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, > > + &prod_head, &prod_next, &free_entries); > > + if (n == 0) > > + goto end; > > + > > + ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n); > > + > > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); > > +end: > > + if (free_space != NULL) > > + *free_space = free_entries - n; > > + return n; > > +} > > +