Hi Honnappa,

> Add scatter gather APIs to avoid intermediate memcpy. Serial
> dequeue APIs are added to support access to ring elements
> before actual dequeue.
> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> Reviewed-by: Gavin Hu <gavin...@arm.com>
> Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com>
> ---
>  lib/librte_ring/Makefile           |   1 +
>  lib/librte_ring/meson.build        |   1 +
>  lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
>  lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_generic.h |  93 +++++++
>  5 files changed, 610 insertions(+)

As was already noticed by you this patch overlaps quite a bit with another one:
http://patches.dpdk.org/patch/66006/

Though it seems there are few significant differences in
our approaches (both API and implementation).
So we probably need to come-up with some common
view first, before moving forward with some unified version.
To start a discussion, I produced some comments, pls see below. 

I don't see changes in rte_ring.h itself, but I suppose
that's just because it is an RFC and it would be added in later versions?
Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED) mode,
I suppose _burst_ will also be added in later versions?

> diff --git a/lib/librte_ring/rte_ring_elem_sg.h 
> b/lib/librte_ring/rte_ring_elem_sg.h
> new file mode 100644
> index 000000000..a73f4fbfe
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem_sg.h
> @@ -0,0 +1,417 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2020 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_SG_H_
> +#define _RTE_RING_ELEM_SG_H_
> +
> +/**
> + * @file
> + * RTE Ring with
> + * 1) user defined element size
> + * 2) scatter gather feature to copy objects to/from the ring
> + * 3) ability to reserve, consume/discard elements in the ring
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +#include "rte_ring_elem.h"
> +
> +/* Between load and load. there might be cpu reorder in weak model
> + * (powerpc/arm).
> + * There are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direction load_acquire/store_release barrier,defined by
> + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> + * It depends on performance test results.
> + * By default, move common functions to rte_ring_generic.h
> + */
> +#ifdef RTE_USE_C11_MEM_MODEL
> +#include "rte_ring_c11_mem.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> +     uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +     uint32_t idx = head & r->mask;
> +     uint64_t *ring = (uint64_t *)&r[1];
> +
> +     *dst1 = ring + idx;
> +     *n1 = num;
> +
> +     if (idx + num > r->size) {
> +             *n1 = num - (r->size - idx - 1);
> +             *dst2 = ring;
> +     }
> +}
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> +     uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +     uint32_t idx = head & r->mask;
> +     rte_int128_t *ring = (rte_int128_t *)&r[1];
> +
> +     *dst1 = ring + idx;
> +     *n1 = num;
> +
> +     if (idx + num > r->size) {
> +             *n1 = num - (r->size - idx - 1);
> +             *dst2 = ring;
> +     }
> +}
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> +     uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +     if (esize == 8)
> +             return __rte_ring_get_elem_addr_64(r, head,
> +                                             num, dst1, n1, dst2);
> +     else if (esize == 16)
> +             return __rte_ring_get_elem_addr_128(r, head,
> +                                             num, dst1, n1, dst2);
> +     else {
> +             uint32_t idx, scale, nr_idx;
> +             uint32_t *ring = (uint32_t *)&r[1];
> +
> +             /* Normalize to uint32_t */
> +             scale = esize / sizeof(uint32_t);
> +             idx = head & r->mask;
> +             nr_idx = idx * scale;
> +
> +             *dst1 = ring + nr_idx;
> +             *n1 = num;
> +
> +             if (idx + num > r->size) {
> +                     *n1 = num - (r->size - idx - 1);
> +                     *dst2 = ring;
> +             }
> +     }
> +}
> +
> +/**
> + * @internal Reserve ring elements to enqueue several objects on the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of elements to reserve in the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer reserve
> + * @param old_head
> + *   Producer's head index before reservation.
> + * @param new_head
> + *   Producer's head index after reservation.
> + * @param free_space
> + *   returns the amount of space after the reserve operation has finished.
> + *   It is not updated if the number of reserved elements is zero.
> + * @param dst1
> + *   Pointer to location in the ring to copy the data.
> + * @param n1
> + *   Number of elements to copy at dst1
> + * @param dst2
> + *   In case of ring wrap around, this pointer provides the location to
> + *   copy the remaining elements. The number of elements to copy at this
> + *   location is equal to (number of elements reserved - n1)
> + * @return
> + *   Actual number of elements reserved.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,


I do understand the purpose of reserve, then either commit/abort for serial 
sync mode,
but what is the purpose of non-serial version of reserve/commit?
In serial  MP/MC case, after _reserve_(n) you always have to do
_commit_(n) - you can't reduce number of elements, or do _abort_.
Again you cannot avoid memcpy(n) here anyhow.
So what is the point of these functions for non-serial case? 

BTW, I think it would be good to have serial version of _enqueue_ too.

> +             unsigned int n, enum rte_ring_queue_behavior behavior,
> +             unsigned int is_sp, unsigned int *old_head,
> +             unsigned int *new_head, unsigned int *free_space,
> +             void **dst1, unsigned int *n1, void **dst2)

I do understand the intention to avoid memcpy(), but proposed API
seems overcomplicated, error prone, and not very convenient for the user.
I don't think that avoiding memcpy() will save us that many cycles here,
so probably better to keep API model a bit more regular:

n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space);
...
/* performs actual memcpy(), m<=n */ 
rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);

/* performs actual memcpy for num elems */ 
n = rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space);
....
/* m<=n */
rte_ring_mp_serial_dequeue_bulk_commit(ring, obj,  m);

Plus, we can have usual enqueue/dequeue API for serial sync mode:
rte_ring_serial_(enqueue/dequeue)_(bulk/burst)

> +{
> +     uint32_t free_entries;
> +
> +     n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> +                     old_head, new_head, &free_entries);
> +
> +     if (n == 0)
> +             goto end;

Here and in other similar places, why not just 'return 0;'?

> +
> +     __rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
> +
> +     if (free_space != NULL)
> +             *free_space = free_entries - n;
> +
> +end:
> +     return n;
> +}
> +
> +/**
> + * @internal Consume previously reserved ring elements (for enqueue)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Producer's head index before reservation.
> + * @param new_head
> + *   Producer's head index after reservation.
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + */
> +static __rte_always_inline void
> +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
> +             unsigned int old_head, unsigned int new_head,
> +             unsigned int is_sp)
> +{
> +     update_tail(&r->prod, old_head, new_head, is_sp, 1);
> +}
> +
> +/**
> + * Reserve one element for enqueuing one object on a ring
> + * (multi-producers safe). Application must call
> + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param old_head
> + *   Producer's head index before reservation. The same should be passed to
> + *   'rte_ring_mp_enqueue_elem_commit' function.
> + * @param new_head
> + *   Producer's head index after reservation. The same should be passed to
> + *   'rte_ring_mp_enqueue_elem_commit' function.
> + * @param free_space
> + *   Returns the amount of space after the reservation operation has 
> finished.
> + *   It is not updated if the number of reserved elements is zero.
> + * @param dst
> + *   Pointer to location in the ring to copy the data.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to reserve; no element is 
> reserved.
> + */
> +static __rte_always_inline int
> +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
> +             unsigned int *old_head, unsigned int *new_head,
> +             unsigned int *free_space, void **dst)
> +{
> +     unsigned int n;
> +
> +     return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
> +                     RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
> +                     free_space, dst, &n, NULL) ? 0 : -ENOBUFS;
> +}
> +
> +/**
> + * Consume previously reserved elements (for enqueue) in a ring
> + * (multi-producers safe). This API completes the enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Producer's head index before reservation. This value was returned
> + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> + * @param new_head
> + *   Producer's head index after reservation. This value was returned
> + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> + */
> +static __rte_always_inline void
> +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_head,
> +             unsigned int new_head)
> +{
> +     __rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0);
> +}
> +
> +/**
> + * @internal Reserve elements to dequeue several objects on the ring.
> + * This function blocks if there are elements reserved already.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to reserve in the ring
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
> + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param old_head
> + *   Consumer's head index before reservation.
> + * @param new_head
> + *   Consumer's head index after reservation.
> + * @param available
> + *   returns the number of remaining ring elements after the reservation
> + *   It is not updated if the number of reserved elements is zero.
> + * @param src1
> + *   Pointer to location in the ring to copy the data from.
> + * @param n1
> + *   Number of elements to copy from src1
> + * @param src2
> + *   In case of wrap around in the ring, this pointer provides the location
> + *   to copy the remaining elements from. The number of elements to copy from
> + *   this pointer is equal to (number of elements reserved - n1)
> + * @return
> + *   Actual number of elements reserved.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
> +             unsigned int esize, unsigned int n,
> +             enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> +             unsigned int *old_head, unsigned int *new_head,
> +             unsigned int *available, void **src1, unsigned int *n1,
> +             void **src2)
> +{
> +     uint32_t entries;
> +
> +     n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
> +                     old_head, new_head, &entries);
> +
> +     if (n == 0)
> +             goto end;
> +
> +     __rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
> +
> +     if (available != NULL)
> +             *available = entries - n;
> +
> +end:
> +     return n;
> +}
> +
> +/**
> + * @internal Consume previously reserved ring elements (for dequeue)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Consumer's head index before reservation.
> + * @param new_head
> + *   Consumer's head index after reservation.
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + */
> +static __rte_always_inline void
> +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
> +             unsigned int old_head, unsigned int new_head,
> +             unsigned int is_sc)
> +{

I think it is a bit dangerous and error-prone approach to let user
specify old_head/new_head manually.
Seems better just _commit(ring, num) - see above.
That way suer don't have to calculate new head mannualy,
plus we can have a check that ring.tail - ring.head >= num.    

> +     update_tail(&r->cons, old_head, new_head, is_sc, 1);

I think update_tail() is not enough here.
As in some cases we need to update  ring.head also:
let say user reserved 2 elems, but then decided to commit only one.  
So I think we need a special new function instead of update_tail() here.
BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in one 
go.
This is not really required - two 32bit ops would work too, I think.
As usual, both ways have some pros and cons:
using 64bit ops might be faster on 64-bit target, plus it is less error prone
(no need to think about head/tail read/write ordering, etc.),
though for 32-bit target it would mean some extra overhead. 

> +}
> +
> +/**
> + * Reserve one element on a ring for dequeue. This function blocks if there
> + * are elements reserved already. Application must call
> + * 'rte_ring_do_dequeue_elem_commit' or
> + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue 
> operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param old_head
> + *   Consumer's head index before reservation. The same should be passed to
> + *   'rte_ring_dequeue_elem_commit' function.
> + * @param new_head
> + *   Consumer's head index after reservation. The same should be passed to
> + *   'rte_ring_dequeue_elem_commit' function.
> + * @param available
> + *   returns the number of remaining ring elements after the reservation
> + *   It is not updated if the number of reserved elements is zero.
> + * @param src
> + *   Pointer to location in the ring to copy the data from.
> + * @return
> + *   - 0: Success; elements reserved
> + *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
> + */
> +static __rte_always_inline int
> +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int esize,
> +             unsigned int *old_head, unsigned int *new_head,
> +             unsigned int *available, void **src)
> +{
> +     unsigned int n;
> +
> +     return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
> +                     RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
> +                     new_head, available, src, &n, NULL) ? 0 : -ENOBUFS;
> +}
> +
> +/**
> + * Consume previously reserved elements (for dequeue) in a ring
> + * (multi-consumer safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Consumer's head index before reservation. This value was returned
> + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> + * @param new_head
> + *   Consumer's head index after reservation. This value was returned
> + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> + */
> +static __rte_always_inline void
> +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
> +             unsigned int new_head)
> +{
> +     __rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
> +                                             r->cons.single);
> +}
> +
> +/**
> + * Discard previously reserved elements (for dequeue) in a ring.
> + *
> + * @warning
> + * This API can be called only if the ring elements were reserved
> + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + */
> +static __rte_always_inline void
> +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r)
> +{
> +     __rte_ring_revert_head(&r->cons);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_ELEM_SG_H_ */
> diff --git a/lib/librte_ring/rte_ring_generic.h 
> b/lib/librte_ring/rte_ring_generic.h
> index 953cdbbd5..8d7a7ffcc 100644
> --- a/lib/librte_ring/rte_ring_generic.h
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned 
> int is_sc,
>       return n;
>  }
> 
> +/**
> + * @internal This function updates the consumer head if there are no
> + * prior reserved elements on the ring.
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to dequeue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
> +             unsigned int n, enum rte_ring_queue_behavior behavior,
> +             uint32_t *old_head, uint32_t *new_head,
> +             uint32_t *entries)
> +{
> +     unsigned int max = n;
> +     int success;
> +
> +     /* move cons.head atomically */
> +     do {
> +             /* Restore n as it may change every loop */
> +             n = max;
> +
> +             *old_head = r->cons.head;
> +
> +             /* add rmb barrier to avoid load/load reorder in weak
> +              * memory model. It is noop on x86
> +              */
> +             rte_smp_rmb();
> +
> +             /* Ensure that cons.tail and cons.head are the same */
> +             if (*old_head != r->cons.tail) {
> +                     rte_pause();
> +
> +                     success = 0;
> +                     continue;
> +             }
> +
> +             /* The subtraction is done between two unsigned 32bits value
> +              * (the result is always modulo 32 bits even if we have
> +              * cons_head > prod_tail). So 'entries' is always between 0
> +              * and size(ring)-1.
> +              */
> +             *entries = (r->prod.tail - *old_head);
> +
> +             /* Set the actual entries for dequeue */
> +             if (n > *entries)
> +                     n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +             if (unlikely(n == 0))
> +                     return 0;
> +
> +             *new_head = *old_head + n;
> +             if (is_sc) {
> +                     r->cons.head = *new_head;
> +                     rte_smp_rmb();
> +                     success = 1;

I don't think we need to worry about SC case in this function.
For sc(/sp)_serial (if we need such mode) - we probably can use normal 
move_(cons/prod)_head().

> +             } else {
> +                     success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> +                                     *new_head);
> +             }
> +     } while (unlikely(success == 0));
> +     return n;
> +}
> +
> +/**
> + * @internal This function updates the head to match the tail
> + *
> + * @param ht
> + *   A pointer to the ring's head-tail structure
> + */
> +static __rte_always_inline void
> +__rte_ring_revert_head(struct rte_ring_headtail *ht)
> +{
> +     /* Discard the reserved ring elements. */
> +     ht->head = ht->tail;
> +}
> +
>  #endif /* _RTE_RING_GENERIC_H_ */
> --
> 2.17.1

Reply via email to