> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h > new file mode 100644 > index 000000000..fc7fe127c > --- /dev/null > +++ b/lib/librte_ring/rte_ring_elem.h > @@ -0,0 +1,1002 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2019 Arm Limited > + * Copyright (c) 2010-2017 Intel Corporation > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_ELEM_H_ > +#define _RTE_RING_ELEM_H_ > + > +/** > + * @file > + * RTE Ring with user defined element size > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdio.h> > +#include <stdint.h> > +#include <sys/queue.h> > +#include <errno.h> > +#include <rte_common.h> > +#include <rte_config.h> > +#include <rte_memory.h> > +#include <rte_lcore.h> > +#include <rte_atomic.h> > +#include <rte_branch_prediction.h> > +#include <rte_memzone.h> > +#include <rte_pause.h> > + > +#include "rte_ring.h" > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Calculate the memory size needed for a ring with given element size > + * > + * This function returns the number of bytes needed for a ring, given > + * the number of elements in it and the size of the element. This value > + * is the sum of the size of the structure rte_ring and the size of the > + * memory needed for storing the elements. The value is aligned to a cache > + * line size. > + * > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param count > + * The number of elements in the ring (must be a power of 2). > + * @return > + * - The memory size needed for the ring on success. > + * - -EINVAL - esize is not a multiple of 4 or count provided is not a > + * power of 2. > + */ > +__rte_experimental > +ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Create a new ring named *name* that stores elements with given size. > + * > + * This function uses ``memzone_reserve()`` to allocate memory. Then it > + * calls rte_ring_init() to initialize an empty ring. > + * > + * The new ring size is set to *count*, which must be a power of > + * two. Water marking is disabled by default. The real usable ring size > + * is *count-1* instead of *count* to differentiate a free ring from an > + * empty ring. > + * > + * The ring is added in RTE_TAILQ_RING list. > + * > + * @param name > + * The name of the ring. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param count > + * The number of elements in the ring (must be a power of 2). > + * @param socket_id > + * The *socket_id* argument is the socket identifier in case of > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > + * constraint for the reserved zone. > + * @param flags > + * An OR of the following: > + * - RING_F_SP_ENQ: If this flag is set, the default behavior when > + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` > + * is "single-producer". Otherwise, it is "multi-producers". > + * - RING_F_SC_DEQ: If this flag is set, the default behavior when > + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > + * is "single-consumer". Otherwise, it is "multi-consumers". > + * @return > + * On success, the pointer to the new allocated ring. NULL on error with > + * rte_errno set appropriately. Possible errno values include: > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > structure > + * - E_RTE_SECONDARY - function was called from a secondary process > instance > + * - EINVAL - esize is not a multiple of 4 or count provided is not a > + * power of 2. > + * - ENOSPC - the maximum number of memzones has already been allocated > + * - EEXIST - a memzone with the same name already exists > + * - ENOMEM - no appropriate memory area found in which to create memzone > + */ > +__rte_experimental > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize, > + unsigned int count, int socket_id, unsigned int flags); > + > +static __rte_always_inline void > +enqueue_elems_32(struct rte_ring *r, uint32_t idx, > + const void *obj_table, uint32_t n) > +{ > + unsigned int i; > + const uint32_t size = r->size; > + uint32_t *ring = (uint32_t *)&r[1]; > + const uint32_t *obj = (const uint32_t *)obj_table; > + if (likely(idx + n < size)) { > + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { > + ring[idx] = obj[i]; > + ring[idx + 1] = obj[i + 1]; > + ring[idx + 2] = obj[i + 2]; > + ring[idx + 3] = obj[i + 3]; > + ring[idx + 4] = obj[i + 4]; > + ring[idx + 5] = obj[i + 5]; > + ring[idx + 6] = obj[i + 6]; > + ring[idx + 7] = obj[i + 7]; > + } > + switch (n & 0x7) { > + case 7: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 6: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 5: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 4: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 3: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 2: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 1: > + ring[idx++] = obj[i++]; /* fallthrough */ > + } > + } else { > + for (i = 0; idx < size; i++, idx++) > + ring[idx] = obj[i]; > + /* Start at the beginning */ > + for (idx = 0; i < n; i++, idx++) > + ring[idx] = obj[i]; > + } > +} > + > +static __rte_always_inline void > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, > + const void *obj_table, uint32_t n) > +{ > + unsigned int i; > + const uint32_t size = r->size; > + uint32_t idx = prod_head & r->mask; > + uint64_t *ring = (uint64_t *)&r[1]; > + const uint64_t *obj = (const uint64_t *)obj_table; > + if (likely(idx + n < size)) { > + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { > + ring[idx] = obj[i]; > + ring[idx + 1] = obj[i + 1]; > + ring[idx + 2] = obj[i + 2]; > + ring[idx + 3] = obj[i + 3]; > + } > + switch (n & 0x3) { > + case 3: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 2: > + ring[idx++] = obj[i++]; /* fallthrough */ > + case 1: > + ring[idx++] = obj[i++]; > + } > + } else { > + for (i = 0; idx < size; i++, idx++) > + ring[idx] = obj[i]; > + /* Start at the beginning */ > + for (idx = 0; i < n; i++, idx++) > + ring[idx] = obj[i]; > + } > +} > + > +static __rte_always_inline void > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, > + const void *obj_table, uint32_t n) > +{ > + unsigned int i; > + const uint32_t size = r->size; > + uint32_t idx = prod_head & r->mask; > + __uint128_t *ring = (__uint128_t *)&r[1]; > + const __uint128_t *obj = (const __uint128_t *)obj_table; > + if (likely(idx + n < size)) { > + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) { > + ring[idx] = obj[i]; > + ring[idx + 1] = obj[i + 1];
AFAIK, that implies 16B aligned obj_table... Would it always be the case? > + } > + switch (n & 0x1) { > + case 1: > + ring[idx++] = obj[i++]; > + } > + } else { > + for (i = 0; idx < size; i++, idx++) > + ring[idx] = obj[i]; > + /* Start at the beginning */ > + for (idx = 0; i < n; i++, idx++) > + ring[idx] = obj[i]; > + } > +} > + > +/* the actual enqueue of elements on the ring. > + * Placed here since identical code needed in both > + * single and multi producer enqueue functions. > + */ > +static __rte_always_inline void > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table, > + uint32_t esize, uint32_t num) > +{ > + uint32_t idx, nr_idx, nr_num; > + > + /* 8B and 16B copies implemented individually to retain > + * the current performance. > + */ > + if (esize == 8) > + enqueue_elems_64(r, prod_head, obj_table, num); > + else if (esize == 16) > + enqueue_elems_128(r, prod_head, obj_table, num); > + else { > + /* Normalize to uint32_t */ > + uint32_t scale = esize / sizeof(uint32_t); > + nr_num = num * scale; > + idx = prod_head & r->mask; > + nr_idx = idx * scale; > + enqueue_elems_32(r, nr_idx, obj_table, nr_num); > + } > +} > +