> > > Add a single thread safe and multi-thread unsafe ring data structure.
> > > This library provides an simple and efficient alternative to
> > > multi-thread safe ring when multi-thread safety is not required.
> >
> > Just a thought: do we really need whole new library for that?
> > From what I understand all we need right now just one extra function:
> > rte_ring_mt_unsafe_prod_deque(...)
> > Sorry for ugly name :)
> > To dequeue N elems from prod.tail.
> > Or you think there would be some extra advantages in ST version of the ring:
> > extra usages, better performance, etc.?
> There are multiple implementations of the ST ring being used in other parts 
> of DPDK. Mattias Ronnblom pointed out some (distributed
> scheduler, eth RX adapter, cmdline) [1] existing ones which will be replaced 
> by this one.
> This implementation will not use atomic instructions, head and tail indices 
> will be in the same cache line and it will be a double ended
> queue. So, I am expecting better perf and more use cases (some might not be 
> applicable currently).

Yep, I do understand that we can skip sync logic for ST case.
Ok, if we do have multiple use-cases it might be plausible to have a separate 
API for it.

> 
> [1] https://mails.dpdk.org/archives/dev/2023-August/275003.html
> 
> >
> > >
> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> > > ---
> > > v1:
> > > 1) The code is very prelimnary and is not even compiled
> > > 2) This is intended to show the APIs and some thoughts on
> > > implementation
> > > 3) More APIs and the rest of the implementation will come in subsequent
> > >    versions
> > >
> > >  lib/st_ring/rte_st_ring.h | 567
> > > ++++++++++++++++++++++++++++++++++++++
> > >  1 file changed, 567 insertions(+)
> > >  create mode 100644 lib/st_ring/rte_st_ring.h
> > >
> > > diff --git a/lib/st_ring/rte_st_ring.h b/lib/st_ring/rte_st_ring.h new
> > > file mode 100644 index 0000000000..8cb8832591
> > > --- /dev/null
> > > +++ b/lib/st_ring/rte_st_ring.h
> > > @@ -0,0 +1,567 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + * Copyright(c) 2023 Arm Limited
> > > + */
> > > +
> > > +#ifndef _RTE_ST_RING_H_
> > > +#define _RTE_ST_RING_H_
> > > +
> > > +/**
> > > + * @file
> > > + * RTE Signle Thread Ring (ST Ring)
> > > + *
> > > + * The ST Ring is a fixed-size queue intended to be accessed
> > > + * by one thread at a time. It does not provide concurrent access to
> > > + * multiple threads. If there are multiple threads accessing the ST
> > > +ring,
> > > + * then the threads have to use locks to protect the ring from
> > > + * getting corrupted.
> > > + *
> > > + * - FIFO (First In First Out)
> > > + * - Maximum size is fixed; the pointers are stored in a table.
> > > + * - Consumer and producer part of same thread.
> > > + * - Multi-thread producers and consumers need locking.
> > > + * - Single/Bulk/burst dequeue at Tail or Head
> > > + * - Single/Bulk/burst enqueue at Head or Tail
> > > + *
> > > + */
> > > +
> > > +#ifdef __cplusplus
> > > +extern "C" {
> > > +#endif
> > > +
> > > +#include <rte_st_ring_core.h>
> > > +#include <rte_st_ring_elem.h>
> > > +
> > > +/**
> > > + * Calculate the memory size needed for a ST ring
> > > + *
> > > + * This function returns the number of bytes needed for a ST ring,
> > > +given
> > > + * the number of elements in it. This value is the sum of the size of
> > > + * the structure rte_st_ring and the size of the memory needed by the
> > > + * elements. The value is aligned to a cache line size.
> > > + *
> > > + * @param count
> > > + *   The number of elements in the ring (must be a power of 2).
> > > + * @return
> > > + *   - The memory size needed for the ST ring on success.
> > > + *   - -EINVAL if count is not a power of 2.
> > > + */
> > > +ssize_t rte_st_ring_get_memsize(unsigned int count);
> > > +
> > > +/**
> > > + * Initialize a ST ring structure.
> > > + *
> > > + * Initialize a ST ring structure in memory pointed by "r". The size
> > > +of the
> > > + * memory area must be large enough to store the ring structure and
> > > +the
> > > + * object table. It is advised to use rte_st_ring_get_memsize() to
> > > +get the
> > > + * appropriate size.
> > > + *
> > > + * The ST ring size is set to *count*, which must be a power of two.
> > > + * The real usable ring size is *count-1* instead of *count* to
> > > + * differentiate a full ring from an empty ring.
> > > + *
> > > + * The ring is not added in RTE_TAILQ_ST_RING global list. Indeed,
> > > +the
> > > + * memory given by the caller may not be shareable among dpdk
> > > + * processes.
> > > + *
> > > + * @param r
> > > + *   The pointer to the ring structure followed by the elements table.
> > > + * @param name
> > > + *   The name of the ring.
> > > + * @param count
> > > + *   The number of elements in the ring (must be a power of 2,
> > > + *   unless RTE_ST_RING_F_EXACT_SZ is set in flags).
> > > + * @param flags
> > > + *   An OR of the following:
> > > + *   - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold
> > > + *     exactly the requested number of entries, and the requested size
> > > + *     will be rounded up to the next power of two, but the usable space
> > > + *     will be exactly that requested. Worst case, if a power-of-2 size 
> > > is
> > > + *     requested, half the ring space will be wasted.
> > > + *     Without this flag set, the ring size requested must be a power of 
> > > 2,
> > > + *     and the usable space will be that size - 1.
> > > + * @return
> > > + *   0 on success, or a negative value on error.
> > > + */
> > > +int rte_st_ring_init(struct rte_st_ring *r, const char *name,
> > > + unsigned int count, unsigned int flags);
> > > +
> > > +/**
> > > + * Create a new ST ring named *name* in memory.
> > > + *
> > > + * This function uses ``memzone_reserve()`` to allocate memory. Then
> > > +it
> > > + * calls rte_st_ring_init() to initialize an empty ring.
> > > + *
> > > + * The new ring size is set to *count*, which must be a power of two.
> > > + * The real usable ring size is *count-1* instead of *count* to
> > > + * differentiate a full ring from an empty ring.
> > > + *
> > > + * The ring is added in RTE_TAILQ_ST_RING list.
> > > + *
> > > + * @param name
> > > + *   The name of the ring.
> > > + * @param count
> > > + *   The size of the ring (must be a power of 2,
> > > + *   unless RTE_ST_RING_F_EXACT_SZ is set in flags).
> > > + * @param socket_id
> > > + *   The *socket_id* argument is the socket identifier in case of
> > > + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> > > + *   constraint for the reserved zone.
> > > + * @param flags
> > > + *   - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold 
> > > exactly
> > the
> > > + *     requested number of entries, and the requested size will be 
> > > rounded up
> > > + *     to the next power of two, but the usable space will be exactly 
> > > that
> > > + *     requested. Worst case, if a power-of-2 size is requested, half the
> > > + *     ring space will be wasted.
> > > + *     Without this flag set, the ring size requested must be a power of 
> > > 2,
> > > + *     and the usable space will be that size - 1.
> > > + * @return
> > > + *   On success, the pointer to the new allocated ring. NULL on error 
> > > with
> > > + *    rte_errno set appropriately. Possible errno values include:
> > > + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> > structure
> > > + *    - EINVAL - count provided is not a power of 2
> > > + *    - ENOSPC - the maximum number of memzones has already been
> > allocated
> > > + *    - EEXIST - a memzone with the same name already exists
> > > + *    - ENOMEM - no appropriate memory area found in which to create
> > memzone
> > > + */
> > > +struct rte_st_ring *rte_st_ring_create(const char *name, unsigned int 
> > > count,
> > > +                          int socket_id, unsigned int flags);
> > > +
> > > +/**
> > > + * De-allocate all memory used by the ring.
> > > + *
> > > + * @param r
> > > + *   Ring to free.
> > > + *   If NULL then, the function does nothing.
> > > + */
> > > +void rte_st_ring_free(struct rte_st_ring *r);
> > > +
> > > +/**
> > > + * Dump the status of the ring to a file.
> > > + *
> > > + * @param f
> > > + *   A pointer to a file for output
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + */
> > > +void rte_st_ring_dump(FILE *f, const struct rte_st_ring *r);
> > > +
> > > +/**
> > > + * Enqueue fixed number of objects on a ST ring.
> > > + *
> > > + * This function copies the objects at the head of the ring and
> > > + * moves the head index.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param free_space
> > > + *   if non-NULL, returns the amount of space in the ring after the
> > > + *   enqueue operation has finished.
> > > + * @return
> > > + *   The number of objects enqueued, either 0 or n
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_enqueue_bulk(struct rte_st_ring *r, void * const *obj_table,
> > > +               unsigned int n, unsigned int *free_space) {
> > > + return rte_st_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *),
> > > +                 n, free_space);
> > > +}
> > > +
> > > +/**
> > > + * Enqueue upto a maximum number of objects on a ST ring.
> > > + *
> > > + * This function copies the objects at the head of the ring and
> > > + * moves the head index.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param free_space
> > > + *   if non-NULL, returns the amount of space in the ring after the
> > > + *   enqueue operation has finished.
> > > + * @return
> > > + *   - n: Actual number of objects enqueued.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_enqueue_burst(struct rte_st_ring *r, void * const *obj_table,
> > > +               unsigned int n, unsigned int *free_space) {
> > > + return rte_st_ring_enqueue_burst_elem(r, obj_table, sizeof(void *),
> > > +                 n, free_space);
> > > +}
> > > +
> > > +/**
> > > + * Enqueue one object on a ST ring.
> > > + *
> > > + * This function copies one object at the head of the ring and
> > > + * moves the head index.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj
> > > + *   A pointer to the object to be added.
> > > + * @return
> > > + *   - 0: Success; objects enqueued.
> > > + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> > enqueued.
> > > + */
> > > +static __rte_always_inline int
> > > +rte_st_ring_enqueue(struct rte_st_ring *r, void *obj) {
> > > + return rte_st_ring_enqueue_elem(r, &obj, sizeof(void *)); }
> > > +
> > > +/**
> > > + * Enqueue fixed number of objects on a ST ring at the tail.
> > > + *
> > > + * This function copies the objects at the tail of the ring and
> > > + * moves the tail index (backwards).
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param free_space
> > > + *   if non-NULL, returns the amount of space in the ring after the
> > > + *   enqueue operation has finished.
> > > + * @return
> > > + *   The number of objects enqueued, either 0 or n
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_enqueue_at_tail_bulk(struct rte_st_ring *r,
> > > +                          void * const *obj_table, unsigned int n,
> > > +                          unsigned int *free_space)
> > > +{
> > > + return rte_st_ring_enqueue_at_tail_bulk_elem(r, obj_table,
> > > +                 sizeof(void *), n, free_space);
> > > +}
> > > +
> > > +/**
> > > + * Enqueue upto a maximum number of objects on a ST ring at the tail.
> > > + *
> > > + * This function copies the objects at the tail of the ring and
> > > + * moves the tail index (backwards).
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param free_space
> > > + *   if non-NULL, returns the amount of space in the ring after the
> > > + *   enqueue operation has finished.
> > > + * @return
> > > + *   - n: Actual number of objects enqueued.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_enqueue_at_tail_burst(struct rte_st_ring *r,
> > > +                           void * const *obj_table, unsigned int n,
> > > +                           unsigned int *free_space)
> > > +{
> > > + return rte_st_ring_enqueue_at_tail_burst_elem(r, obj_table,
> > > +                 sizeof(void *), n, free_space);
> > > +}
> > > +
> > > +/**
> > > + * Enqueue one object on a ST ring at tail.
> > > + *
> > > + * This function copies one object at the tail of the ring and
> > > + * moves the tail index (backwards).
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj
> > > + *   A pointer to the object to be added.
> > > + * @return
> > > + *   - 0: Success; objects enqueued.
> > > + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> > enqueued.
> > > + */
> > > +static __rte_always_inline int
> > > +rte_st_ring_enqueue_at_tail(struct rte_st_ring *r, void *obj) {
> > > + return rte_st_ring_enqueue_at_tail_elem(r, &obj, sizeof(void *)); }
> > > +
> > > +/**
> > > + * Dequeue a fixed number of objects from a ST ring.
> > > + *
> > > + * This function copies the objects from the tail of the ring and
> > > + * moves the tail index.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects) that will be 
> > > filled.
> > > + * @param n
> > > + *   The number of objects to dequeue from the ring to the obj_table.
> > > + * @param available
> > > + *   If non-NULL, returns the number of remaining ring entries after the
> > > + *   dequeue has finished.
> > > + * @return
> > > + *   The number of objects dequeued, either 0 or n
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_dequeue_bulk(struct rte_st_ring *r, void **obj_table, 
> > > unsigned
> > int n,
> > > +         unsigned int *available)
> > > +{
> > > + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
> > > +                 n, available);
> > > +}
> > > +
> > > +/**
> > > + * Dequeue upto a maximum number of objects from a ST ring.
> > > + *
> > > + * This function copies the objects from the tail of the ring and
> > > + * moves the tail index.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects) that will be 
> > > filled.
> > > + * @param n
> > > + *   The number of objects to dequeue from the ring to the obj_table.
> > > + * @param available
> > > + *   If non-NULL, returns the number of remaining ring entries after the
> > > + *   dequeue has finished.
> > > + * @return
> > > + *   - Number of objects dequeued
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_dequeue_burst(struct rte_st_ring *r, void **obj_table,
> > > +         unsigned int n, unsigned int *available) {
> > > + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
> > > +                 n, available);
> > > +}
> > > +
> > > +/**
> > > + * Dequeue one object from a ST ring.
> > > + *
> > > + * This function copies one object from the tail of the ring and
> > > + * moves the tail index.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_p
> > > + *   A pointer to a void * pointer (object) that will be filled.
> > > + * @return
> > > + *   - 0: Success, objects dequeued.
> > > + *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> > > + *     dequeued.
> > > + */
> > > +static __rte_always_inline int
> > > +rte_st_ring_dequeue(struct rte_st_ring *r, void **obj_p) {
> > > + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); }
> > > +
> > > +/**
> > > + * Dequeue a fixed number of objects from a ST ring from the head.
> > > + *
> > > + * This function copies the objects from the head of the ring and
> > > + * moves the head index (backwards).
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects) that will be 
> > > filled.
> > > + * @param n
> > > + *   The number of objects to dequeue from the ring to the obj_table.
> > > + * @param available
> > > + *   If non-NULL, returns the number of remaining ring entries after the
> > > + *   dequeue has finished.
> > > + * @return
> > > + *   The number of objects dequeued, either 0 or n
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_dequeue_at_head_bulk(struct rte_st_ring *r, void **obj_table,
> > unsigned int n,
> > > +         unsigned int *available)
> > > +{
> > > + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
> > > +                 n, available);
> > > +}
> > > +
> > > +/**
> > > + * Dequeue upto a maximum number of objects from a ST ring from the
> > head.
> > > + *
> > > + * This function copies the objects from the head of the ring and
> > > + * moves the head index (backwards).
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects) that will be 
> > > filled.
> > > + * @param n
> > > + *   The number of objects to dequeue from the ring to the obj_table.
> > > + * @param available
> > > + *   If non-NULL, returns the number of remaining ring entries after the
> > > + *   dequeue has finished.
> > > + * @return
> > > + *   - Number of objects dequeued
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +rte_st_ring_dequeue_at_head_burst(struct rte_st_ring *r, void
> > **obj_table,
> > > +         unsigned int n, unsigned int *available) {
> > > + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
> > > +                 n, available);
> > > +}
> > > +
> > > +/**
> > > + * Dequeue one object from a ST ring from the head.
> > > + *
> > > + * This function copies the objects from the head of the ring and
> > > + * moves the head index (backwards).
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_p
> > > + *   A pointer to a void * pointer (object) that will be filled.
> > > + * @return
> > > + *   - 0: Success, objects dequeued.
> > > + *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> > > + *     dequeued.
> > > + */
> > > +static __rte_always_inline int
> > > +rte_st_ring_at_head_dequeue(struct rte_st_ring *r, void **obj_p) {
> > > + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); }
> > > +
> > > +/**
> > > + * Flush a ST ring.
> > > + *
> > > + * This function flush all the elements in a ST ring
> > > + *
> > > + * @warning
> > > + * Make sure the ring is not in use while calling this function.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + */
> > > +void
> > > +rte_st_ring_reset(struct rte_st_ring *r);
> > > +
> > > +/**
> > > + * Return the number of entries in a ST ring.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   The number of entries in the ring.
> > > + */
> > > +static inline unsigned int
> > > +rte_st_ring_count(const struct rte_st_ring *r) {
> > > + uint32_t count = (r->head - r->tail) & r->mask;
> > > + return count;
> > > +}
> > > +
> > > +/**
> > > + * Return the number of free entries in a ST ring.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   The number of free entries in the ring.
> > > + */
> > > +static inline unsigned int
> > > +rte_st_ring_free_count(const struct rte_st_ring *r) {
> > > + return r->capacity - rte_st_ring_count(r); }
> > > +
> > > +/**
> > > + * Test if a ST ring is full.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   - 1: The ring is full.
> > > + *   - 0: The ring is not full.
> > > + */
> > > +static inline int
> > > +rte_st_ring_full(const struct rte_st_ring *r) {
> > > + return rte_st_ring_free_count(r) == 0; }
> > > +
> > > +/**
> > > + * Test if a ST ring is empty.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   - 1: The ring is empty.
> > > + *   - 0: The ring is not empty.
> > > + */
> > > +static inline int
> > > +rte_st_ring_empty(const struct rte_st_ring *r) {
> > > + return r->tail == r->head;
> > > +}
> > > +
> > > +/**
> > > + * Return the size of the ring.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   The size of the data store used by the ring.
> > > + *   NOTE: this is not the same as the usable space in the ring. To 
> > > query that
> > > + *   use ``rte_st_ring_get_capacity()``.
> > > + */
> > > +static inline unsigned int
> > > +rte_st_ring_get_size(const struct rte_st_ring *r) {
> > > + return r->size;
> > > +}
> > > +
> > > +/**
> > > + * Return the number of elements which can be stored in the ring.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   The usable size of the ring.
> > > + */
> > > +static inline unsigned int
> > > +rte_st_ring_get_capacity(const struct rte_st_ring *r) {
> > > + return r->capacity;
> > > +}
> > > +
> > > +/**
> > > + * Dump the status of all rings on the console
> > > + *
> > > + * @param f
> > > + *   A pointer to a file for output
> > > + */
> > > +void rte_st_ring_list_dump(FILE *f);
> > > +
> > > +/**
> > > + * Search a ST ring from its name
> > > + *
> > > + * @param name
> > > + *   The name of the ring.
> > > + * @return
> > > + *   The pointer to the ring matching the name, or NULL if not found,
> > > + *   with rte_errno set appropriately. Possible rte_errno values include:
> > > + *    - ENOENT - required entry not available to return.
> > > + */
> > > +struct rte_st_ring *rte_st_ring_lookup(const char *name);
> > > +
> > > +#ifdef __cplusplus
> > > +}
> > > +#endif
> > > +
> > > +#endif /* _RTE_ST_RING_H_ */
> > > --
> > > 2.25.1
> > >

Reply via email to