Hi Honnappa,
> Add scatter gather APIs to avoid intermediate memcpy. Serial > dequeue APIs are added to support access to ring elements > before actual dequeue. > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Reviewed-by: Gavin Hu <gavin...@arm.com> > Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com> > --- > lib/librte_ring/Makefile | 1 + > lib/librte_ring/meson.build | 1 + > lib/librte_ring/rte_ring_c11_mem.h | 98 +++++++ > lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++ > lib/librte_ring/rte_ring_generic.h | 93 +++++++ > 5 files changed, 610 insertions(+) As was already noticed by you this patch overlaps quite a bit with another one: http://patches.dpdk.org/patch/66006/ Though it seems there are few significant differences in our approaches (both API and implementation). So we probably need to come-up with some common view first, before moving forward with some unified version. To start a discussion, I produced some comments, pls see below. I don't see changes in rte_ring.h itself, but I suppose that's just because it is an RFC and it would be added in later versions? Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED) mode, I suppose _burst_ will also be added in later versions? > diff --git a/lib/librte_ring/rte_ring_elem_sg.h > b/lib/librte_ring/rte_ring_elem_sg.h > new file mode 100644 > index 000000000..a73f4fbfe > --- /dev/null > +++ b/lib/librte_ring/rte_ring_elem_sg.h > @@ -0,0 +1,417 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2020 Arm Limited > + * Copyright (c) 2010-2017 Intel Corporation > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_ELEM_SG_H_ > +#define _RTE_RING_ELEM_SG_H_ > + > +/** > + * @file > + * RTE Ring with > + * 1) user defined element size > + * 2) scatter gather feature to copy objects to/from the ring > + * 3) ability to reserve, consume/discard elements in the ring > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdio.h> > +#include <stdint.h> > +#include <string.h> > +#include <sys/queue.h> > +#include <errno.h> > +#include <rte_common.h> > +#include <rte_config.h> > +#include <rte_memory.h> > +#include <rte_lcore.h> > +#include <rte_atomic.h> > +#include <rte_branch_prediction.h> > +#include <rte_memzone.h> > +#include <rte_pause.h> > + > +#include "rte_ring.h" > +#include "rte_ring_elem.h" > + > +/* Between load and load. there might be cpu reorder in weak model > + * (powerpc/arm). > + * There are 2 choices for the users > + * 1.use rmb() memory barrier > + * 2.use one-direction load_acquire/store_release barrier,defined by > + * CONFIG_RTE_USE_C11_MEM_MODEL=y > + * It depends on performance test results. > + * By default, move common functions to rte_ring_generic.h > + */ > +#ifdef RTE_USE_C11_MEM_MODEL > +#include "rte_ring_c11_mem.h" > +#else > +#include "rte_ring_generic.h" > +#endif > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx = head & r->mask; > + uint64_t *ring = (uint64_t *)&r[1]; > + > + *dst1 = ring + idx; > + *n1 = num; > + > + if (idx + num > r->size) { > + *n1 = num - (r->size - idx - 1); > + *dst2 = ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx = head & r->mask; > + rte_int128_t *ring = (rte_int128_t *)&r[1]; > + > + *dst1 = ring + idx; > + *n1 = num; > + > + if (idx + num > r->size) { > + *n1 = num - (r->size - idx - 1); > + *dst2 = ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + if (esize == 8) > + return __rte_ring_get_elem_addr_64(r, head, > + num, dst1, n1, dst2); > + else if (esize == 16) > + return __rte_ring_get_elem_addr_128(r, head, > + num, dst1, n1, dst2); > + else { > + uint32_t idx, scale, nr_idx; > + uint32_t *ring = (uint32_t *)&r[1]; > + > + /* Normalize to uint32_t */ > + scale = esize / sizeof(uint32_t); > + idx = head & r->mask; > + nr_idx = idx * scale; > + > + *dst1 = ring + nr_idx; > + *n1 = num; > + > + if (idx + num > r->size) { > + *n1 = num - (r->size - idx - 1); > + *dst2 = ring; > + } > + } > +} > + > +/** > + * @internal Reserve ring elements to enqueue several objects on the ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param n > + * The number of elements to reserve in the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Reserve a fixed number of elements from a ring > + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring > + * @param is_sp > + * Indicates whether to use single producer or multi-producer reserve > + * @param old_head > + * Producer's head index before reservation. > + * @param new_head > + * Producer's head index after reservation. > + * @param free_space > + * returns the amount of space after the reserve operation has finished. > + * It is not updated if the number of reserved elements is zero. > + * @param dst1 > + * Pointer to location in the ring to copy the data. > + * @param n1 > + * Number of elements to copy at dst1 > + * @param dst2 > + * In case of ring wrap around, this pointer provides the location to > + * copy the remaining elements. The number of elements to copy at this > + * location is equal to (number of elements reserved - n1) > + * @return > + * Actual number of elements reserved. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize, I do understand the purpose of reserve, then either commit/abort for serial sync mode, but what is the purpose of non-serial version of reserve/commit? In serial MP/MC case, after _reserve_(n) you always have to do _commit_(n) - you can't reduce number of elements, or do _abort_. Again you cannot avoid memcpy(n) here anyhow. So what is the point of these functions for non-serial case? BTW, I think it would be good to have serial version of _enqueue_ too. > + unsigned int n, enum rte_ring_queue_behavior behavior, > + unsigned int is_sp, unsigned int *old_head, > + unsigned int *new_head, unsigned int *free_space, > + void **dst1, unsigned int *n1, void **dst2) I do understand the intention to avoid memcpy(), but proposed API seems overcomplicated, error prone, and not very convenient for the user. I don't think that avoiding memcpy() will save us that many cycles here, so probably better to keep API model a bit more regular: n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ... /* performs actual memcpy(), m<=n */ rte_ring_mp_serial_enqueue_bulk_commit(ring, obj, m); /* performs actual memcpy for num elems */ n = rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space); .... /* m<=n */ rte_ring_mp_serial_dequeue_bulk_commit(ring, obj, m); Plus, we can have usual enqueue/dequeue API for serial sync mode: rte_ring_serial_(enqueue/dequeue)_(bulk/burst) > +{ > + uint32_t free_entries; > + > + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, > + old_head, new_head, &free_entries); > + > + if (n == 0) > + goto end; Here and in other similar places, why not just 'return 0;'? > + > + __rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2); > + > + if (free_space != NULL) > + *free_space = free_entries - n; > + > +end: > + return n; > +} > + > +/** > + * @internal Consume previously reserved ring elements (for enqueue) > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Producer's head index before reservation. > + * @param new_head > + * Producer's head index after reservation. > + * @param is_sp > + * Indicates whether to use single producer or multi-producer head update > + */ > +static __rte_always_inline void > +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r, > + unsigned int old_head, unsigned int new_head, > + unsigned int is_sp) > +{ > + update_tail(&r->prod, old_head, new_head, is_sp, 1); > +} > + > +/** > + * Reserve one element for enqueuing one object on a ring > + * (multi-producers safe). Application must call > + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param old_head > + * Producer's head index before reservation. The same should be passed to > + * 'rte_ring_mp_enqueue_elem_commit' function. > + * @param new_head > + * Producer's head index after reservation. The same should be passed to > + * 'rte_ring_mp_enqueue_elem_commit' function. > + * @param free_space > + * Returns the amount of space after the reservation operation has > finished. > + * It is not updated if the number of reserved elements is zero. > + * @param dst > + * Pointer to location in the ring to copy the data. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to reserve; no element is > reserved. > + */ > +static __rte_always_inline int > +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize, > + unsigned int *old_head, unsigned int *new_head, > + unsigned int *free_space, void **dst) > +{ > + unsigned int n; > + > + return __rte_ring_do_enqueue_elem_reserve(r, esize, 1, > + RTE_RING_QUEUE_FIXED, 0, old_head, new_head, > + free_space, dst, &n, NULL) ? 0 : -ENOBUFS; > +} > + > +/** > + * Consume previously reserved elements (for enqueue) in a ring > + * (multi-producers safe). This API completes the enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Producer's head index before reservation. This value was returned > + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. > + * @param new_head > + * Producer's head index after reservation. This value was returned > + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. > + */ > +static __rte_always_inline void > +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_head, > + unsigned int new_head) > +{ > + __rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0); > +} > + > +/** > + * @internal Reserve elements to dequeue several objects on the ring. > + * This function blocks if there are elements reserved already. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param n > + * The number of objects to reserve in the ring > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Reserve fixed number of elements in a ring > + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a ring > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head update > + * @param old_head > + * Consumer's head index before reservation. > + * @param new_head > + * Consumer's head index after reservation. > + * @param available > + * returns the number of remaining ring elements after the reservation > + * It is not updated if the number of reserved elements is zero. > + * @param src1 > + * Pointer to location in the ring to copy the data from. > + * @param n1 > + * Number of elements to copy from src1 > + * @param src2 > + * In case of wrap around in the ring, this pointer provides the location > + * to copy the remaining elements from. The number of elements to copy from > + * this pointer is equal to (number of elements reserved - n1) > + * @return > + * Actual number of elements reserved. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r, > + unsigned int esize, unsigned int n, > + enum rte_ring_queue_behavior behavior, unsigned int is_sc, > + unsigned int *old_head, unsigned int *new_head, > + unsigned int *available, void **src1, unsigned int *n1, > + void **src2) > +{ > + uint32_t entries; > + > + n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior, > + old_head, new_head, &entries); > + > + if (n == 0) > + goto end; > + > + __rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2); > + > + if (available != NULL) > + *available = entries - n; > + > +end: > + return n; > +} > + > +/** > + * @internal Consume previously reserved ring elements (for dequeue) > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Consumer's head index before reservation. > + * @param new_head > + * Consumer's head index after reservation. > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head update > + */ > +static __rte_always_inline void > +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r, > + unsigned int old_head, unsigned int new_head, > + unsigned int is_sc) > +{ I think it is a bit dangerous and error-prone approach to let user specify old_head/new_head manually. Seems better just _commit(ring, num) - see above. That way suer don't have to calculate new head mannualy, plus we can have a check that ring.tail - ring.head >= num. > + update_tail(&r->cons, old_head, new_head, is_sc, 1); I think update_tail() is not enough here. As in some cases we need to update ring.head also: let say user reserved 2 elems, but then decided to commit only one. So I think we need a special new function instead of update_tail() here. BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in one go. This is not really required - two 32bit ops would work too, I think. As usual, both ways have some pros and cons: using 64bit ops might be faster on 64-bit target, plus it is less error prone (no need to think about head/tail read/write ordering, etc.), though for 32-bit target it would mean some extra overhead. > +} > + > +/** > + * Reserve one element on a ring for dequeue. This function blocks if there > + * are elements reserved already. Application must call > + * 'rte_ring_do_dequeue_elem_commit' or > + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue > operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param old_head > + * Consumer's head index before reservation. The same should be passed to > + * 'rte_ring_dequeue_elem_commit' function. > + * @param new_head > + * Consumer's head index after reservation. The same should be passed to > + * 'rte_ring_dequeue_elem_commit' function. > + * @param available > + * returns the number of remaining ring elements after the reservation > + * It is not updated if the number of reserved elements is zero. > + * @param src > + * Pointer to location in the ring to copy the data from. > + * @return > + * - 0: Success; elements reserved > + * - -ENOBUFS: Not enough room in the ring; no element is reserved. > + */ > +static __rte_always_inline int > +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int esize, > + unsigned int *old_head, unsigned int *new_head, > + unsigned int *available, void **src) > +{ > + unsigned int n; > + > + return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1, > + RTE_RING_QUEUE_FIXED, r->cons.single, old_head, > + new_head, available, src, &n, NULL) ? 0 : -ENOBUFS; > +} > + > +/** > + * Consume previously reserved elements (for dequeue) in a ring > + * (multi-consumer safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param old_head > + * Consumer's head index before reservation. This value was returned > + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. > + * @param new_head > + * Consumer's head index after reservation. This value was returned > + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. > + */ > +static __rte_always_inline void > +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head, > + unsigned int new_head) > +{ > + __rte_ring_do_dequeue_elem_commit(r, old_head, new_head, > + r->cons.single); > +} > + > +/** > + * Discard previously reserved elements (for dequeue) in a ring. > + * > + * @warning > + * This API can be called only if the ring elements were reserved > + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs. > + * > + * @param r > + * A pointer to the ring structure. > + */ > +static __rte_always_inline void > +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r) > +{ > + __rte_ring_revert_head(&r->cons); > +} > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RING_ELEM_SG_H_ */ > diff --git a/lib/librte_ring/rte_ring_generic.h > b/lib/librte_ring/rte_ring_generic.h > index 953cdbbd5..8d7a7ffcc 100644 > --- a/lib/librte_ring/rte_ring_generic.h > +++ b/lib/librte_ring/rte_ring_generic.h > @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned > int is_sc, > return n; > } > > +/** > + * @internal This function updates the consumer head if there are no > + * prior reserved elements on the ring. > + * > + * @param r > + * A pointer to the ring structure > + * @param is_sc > + * Indicates whether multi-consumer path is needed or not > + * @param n > + * The number of elements we will want to dequeue, i.e. how far should the > + * head be moved > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring > + * @param old_head > + * Returns head value as it was before the move, i.e. where dequeue starts > + * @param new_head > + * Returns the current/new head value i.e. where dequeue finishes > + * @param entries > + * Returns the number of entries in the ring BEFORE head was moved > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, > + uint32_t *entries) > +{ > + unsigned int max = n; > + int success; > + > + /* move cons.head atomically */ > + do { > + /* Restore n as it may change every loop */ > + n = max; > + > + *old_head = r->cons.head; > + > + /* add rmb barrier to avoid load/load reorder in weak > + * memory model. It is noop on x86 > + */ > + rte_smp_rmb(); > + > + /* Ensure that cons.tail and cons.head are the same */ > + if (*old_head != r->cons.tail) { > + rte_pause(); > + > + success = 0; > + continue; > + } > + > + /* The subtraction is done between two unsigned 32bits value > + * (the result is always modulo 32 bits even if we have > + * cons_head > prod_tail). So 'entries' is always between 0 > + * and size(ring)-1. > + */ > + *entries = (r->prod.tail - *old_head); > + > + /* Set the actual entries for dequeue */ > + if (n > *entries) > + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; > + > + if (unlikely(n == 0)) > + return 0; > + > + *new_head = *old_head + n; > + if (is_sc) { > + r->cons.head = *new_head; > + rte_smp_rmb(); > + success = 1; I don't think we need to worry about SC case in this function. For sc(/sp)_serial (if we need such mode) - we probably can use normal move_(cons/prod)_head(). > + } else { > + success = rte_atomic32_cmpset(&r->cons.head, *old_head, > + *new_head); > + } > + } while (unlikely(success == 0)); > + return n; > +} > + > +/** > + * @internal This function updates the head to match the tail > + * > + * @param ht > + * A pointer to the ring's head-tail structure > + */ > +static __rte_always_inline void > +__rte_ring_revert_head(struct rte_ring_headtail *ht) > +{ > + /* Discard the reserved ring elements. */ > + ht->head = ht->tail; > +} > + > #endif /* _RTE_RING_GENERIC_H_ */ > -- > 2.17.1