> Add a single thread safe and multi-thread unsafe ring data structure.
> This library provides an simple and efficient alternative to multi-thread
> safe ring when multi-thread safety is not required.
Just a thought: do we really need whole new library for that?
>From what I understand all we need right now just one extra function:
Sorry for ugly name :)
To dequeue N elems from prod.tail.
Or you think there would be some extra advantages in ST version of the ring:
extra usages, better performance, etc.?
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> ---
> v1:
> 1) The code is very prelimnary and is not even compiled
> 2) This is intended to show the APIs and some thoughts on implementation
> 3) More APIs and the rest of the implementation will come in subsequent
> versions
> lib/st_ring/rte_st_ring.h | 567 ++++++++++++++++++++++++++++++++++++++
> 1 file changed, 567 insertions(+)
> create mode 100644 lib/st_ring/rte_st_ring.h
> diff --git a/lib/st_ring/rte_st_ring.h b/lib/st_ring/rte_st_ring.h
> new file mode 100644
> index 0000000000..8cb8832591
> --- /dev/null
> +++ b/lib/st_ring/rte_st_ring.h
> @@ -0,0 +1,567 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2023 Arm Limited
> + */
> +
> +#ifndef _RTE_ST_RING_H_
> +#define _RTE_ST_RING_H_
> +
> +/**
> + * @file
> + * RTE Signle Thread Ring (ST Ring)
> + *
> + * The ST Ring is a fixed-size queue intended to be accessed
> + * by one thread at a time. It does not provide concurrent access to
> + * multiple threads. If there are multiple threads accessing the ST ring,
> + * then the threads have to use locks to protect the ring from
> + * getting corrupted.
> + *
> + * - FIFO (First In First Out)
> + * - Maximum size is fixed; the pointers are stored in a table.
> + * - Consumer and producer part of same thread.
> + * - Multi-thread producers and consumers need locking.
> + * - Single/Bulk/burst dequeue at Tail or Head
> + * - Single/Bulk/burst enqueue at Head or Tail
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_st_ring_core.h>
> +#include <rte_st_ring_elem.h>
> +
> +/**
> + * Calculate the memory size needed for a ST ring
> + *
> + * This function returns the number of bytes needed for a ST ring, given
> + * the number of elements in it. This value is the sum of the size of
> + * the structure rte_st_ring and the size of the memory needed by the
> + * elements. The value is aligned to a cache line size.
> + *
> + * @param count
> + * The number of elements in the ring (must be a power of 2).
> + * @return
> + * - The memory size needed for the ST ring on success.
> + * - -EINVAL if count is not a power of 2.
> + */
> +ssize_t rte_st_ring_get_memsize(unsigned int count);
> +
> +/**
> + * Initialize a ST ring structure.
> + *
> + * Initialize a ST ring structure in memory pointed by "r". The size of the
> + * memory area must be large enough to store the ring structure and the
> + * object table. It is advised to use rte_st_ring_get_memsize() to get the
> + * appropriate size.
> + *
> + * The ST ring size is set to *count*, which must be a power of two.
> + * The real usable ring size is *count-1* instead of *count* to
> + * differentiate a full ring from an empty ring.
> + *
> + * The ring is not added in RTE_TAILQ_ST_RING global list. Indeed, the
> + * memory given by the caller may not be shareable among dpdk
> + * processes.
> + *
> + * @param r
> + * The pointer to the ring structure followed by the elements table.
> + * @param name
> + * The name of the ring.
> + * @param count
> + * The number of elements in the ring (must be a power of 2,
> + * unless RTE_ST_RING_F_EXACT_SZ is set in flags).
> + * @param flags
> + * An OR of the following:
> + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold
> + * exactly the requested number of entries, and the requested size
> + * will be rounded up to the next power of two, but the usable space
> + * will be exactly that requested. Worst case, if a power-of-2 size is
> + * requested, half the ring space will be wasted.
> + * Without this flag set, the ring size requested must be a power of 2,
> + * and the usable space will be that size - 1.
> + * @return
> + * 0 on success, or a negative value on error.
> + */
> +int rte_st_ring_init(struct rte_st_ring *r, const char *name,
> + unsigned int count, unsigned int flags);
> +
> +/**
> + * Create a new ST ring named *name* in memory.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_st_ring_init() to initialize an empty ring.
> + *
> + * The new ring size is set to *count*, which must be a power of two.
> + * The real usable ring size is *count-1* instead of *count* to
> + * differentiate a full ring from an empty ring.
> + *
> + * The ring is added in RTE_TAILQ_ST_RING list.
> + *
> + * @param name
> + * The name of the ring.
> + * @param count
> + * The size of the ring (must be a power of 2,
> + * unless RTE_ST_RING_F_EXACT_SZ is set in flags).
> + * @param socket_id
> + * The *socket_id* argument is the socket identifier in case of
> + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + * constraint for the reserved zone.
> + * @param flags
> + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold
> exactly the
> + * requested number of entries, and the requested size will be rounded up
> + * to the next power of two, but the usable space will be exactly that
> + * requested. Worst case, if a power-of-2 size is requested, half the
> + * ring space will be wasted.
> + * Without this flag set, the ring size requested must be a power of 2,
> + * and the usable space will be that size - 1.
> + * @return
> + * On success, the pointer to the new allocated ring. NULL on error with
> + * rte_errno set appropriately. Possible errno values include:
> + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + * - EINVAL - count provided is not a power of 2
> + * - ENOSPC - the maximum number of memzones has already been allocated
> + * - EEXIST - a memzone with the same name already exists
> + * - ENOMEM - no appropriate memory area found in which to create memzone
> + */
> +struct rte_st_ring *rte_st_ring_create(const char *name, unsigned int count,
> + int socket_id, unsigned int flags);
> +
> +/**
> + * De-allocate all memory used by the ring.
> + *
> + * @param r
> + * Ring to free.
> + * If NULL then, the function does nothing.
> + */
> +void rte_st_ring_free(struct rte_st_ring *r);
> +
> +/**
> + * Dump the status of the ring to a file.
> + *
> + * @param f
> + * A pointer to a file for output
> + * @param r
> + * A pointer to the ring structure.
> + */
> +void rte_st_ring_dump(FILE *f, const struct rte_st_ring *r);
> +
> +/**
> + * Enqueue fixed number of objects on a ST ring.
> + *
> + * This function copies the objects at the head of the ring and
> + * moves the head index.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + * if non-NULL, returns the amount of space in the ring after the
> + * enqueue operation has finished.
> + * @return
> + * The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_enqueue_bulk(struct rte_st_ring *r, void * const *obj_table,
> + unsigned int n, unsigned int *free_space)
> +{
> + return rte_st_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *),
> + n, free_space);
> +}
> +
> +/**
> + * Enqueue upto a maximum number of objects on a ST ring.
> + *
> + * This function copies the objects at the head of the ring and
> + * moves the head index.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + * if non-NULL, returns the amount of space in the ring after the
> + * enqueue operation has finished.
> + * @return
> + * - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_enqueue_burst(struct rte_st_ring *r, void * const *obj_table,
> + unsigned int n, unsigned int *free_space)
> +{
> + return rte_st_ring_enqueue_burst_elem(r, obj_table, sizeof(void *),
> + n, free_space);
> +}
> +
> +/**
> + * Enqueue one object on a ST ring.
> + *
> + * This function copies one object at the head of the ring and
> + * moves the head index.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj
> + * A pointer to the object to be added.
> + * @return
> + * - 0: Success; objects enqueued.
> + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_st_ring_enqueue(struct rte_st_ring *r, void *obj)
> +{
> + return rte_st_ring_enqueue_elem(r, &obj, sizeof(void *));
> +}
> +
> +/**
> + * Enqueue fixed number of objects on a ST ring at the tail.
> + *
> + * This function copies the objects at the tail of the ring and
> + * moves the tail index (backwards).
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + * if non-NULL, returns the amount of space in the ring after the
> + * enqueue operation has finished.
> + * @return
> + * The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_enqueue_at_tail_bulk(struct rte_st_ring *r,
> + void * const *obj_table, unsigned int n,
> + unsigned int *free_space)
> +{
> + return rte_st_ring_enqueue_at_tail_bulk_elem(r, obj_table,
> + sizeof(void *), n, free_space);
> +}
> +
> +/**
> + * Enqueue upto a maximum number of objects on a ST ring at the tail.
> + *
> + * This function copies the objects at the tail of the ring and
> + * moves the tail index (backwards).
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + * if non-NULL, returns the amount of space in the ring after the
> + * enqueue operation has finished.
> + * @return
> + * - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_enqueue_at_tail_burst(struct rte_st_ring *r,
> + void * const *obj_table, unsigned int n,
> + unsigned int *free_space)
> +{
> + return rte_st_ring_enqueue_at_tail_burst_elem(r, obj_table,
> + sizeof(void *), n, free_space);
> +}
> +
> +/**
> + * Enqueue one object on a ST ring at tail.
> + *
> + * This function copies one object at the tail of the ring and
> + * moves the tail index (backwards).
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj
> + * A pointer to the object to be added.
> + * @return
> + * - 0: Success; objects enqueued.
> + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_st_ring_enqueue_at_tail(struct rte_st_ring *r, void *obj)
> +{
> + return rte_st_ring_enqueue_at_tail_elem(r, &obj, sizeof(void *));
> +}
> +
> +/**
> + * Dequeue a fixed number of objects from a ST ring.
> + *
> + * This function copies the objects from the tail of the ring and
> + * moves the tail index.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + * The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries after the
> + * dequeue has finished.
> + * @return
> + * The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_dequeue_bulk(struct rte_st_ring *r, void **obj_table, unsigned
> int n,
> + unsigned int *available)
> +{
> + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
> + n, available);
> +}
> +
> +/**
> + * Dequeue upto a maximum number of objects from a ST ring.
> + *
> + * This function copies the objects from the tail of the ring and
> + * moves the tail index.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + * The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries after the
> + * dequeue has finished.
> + * @return
> + * - Number of objects dequeued
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_dequeue_burst(struct rte_st_ring *r, void **obj_table,
> + unsigned int n, unsigned int *available)
> +{
> + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
> + n, available);
> +}
> +
> +/**
> + * Dequeue one object from a ST ring.
> + *
> + * This function copies one object from the tail of the ring and
> + * moves the tail index.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_p
> + * A pointer to a void * pointer (object) that will be filled.
> + * @return
> + * - 0: Success, objects dequeued.
> + * - -ENOENT: Not enough entries in the ring to dequeue, no object is
> + * dequeued.
> + */
> +static __rte_always_inline int
> +rte_st_ring_dequeue(struct rte_st_ring *r, void **obj_p)
> +{
> + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *));
> +}
> +
> +/**
> + * Dequeue a fixed number of objects from a ST ring from the head.
> + *
> + * This function copies the objects from the head of the ring and
> + * moves the head index (backwards).
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + * The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries after the
> + * dequeue has finished.
> + * @return
> + * The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_dequeue_at_head_bulk(struct rte_st_ring *r, void **obj_table,
> unsigned int n,
> + unsigned int *available)
> +{
> + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
> + n, available);
> +}
> +
> +/**
> + * Dequeue upto a maximum number of objects from a ST ring from the head.
> + *
> + * This function copies the objects from the head of the ring and
> + * moves the head index (backwards).
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + * The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + * If non-NULL, returns the number of remaining ring entries after the
> + * dequeue has finished.
> + * @return
> + * - Number of objects dequeued
> + */
> +static __rte_always_inline unsigned int
> +rte_st_ring_dequeue_at_head_burst(struct rte_st_ring *r, void **obj_table,
> + unsigned int n, unsigned int *available)
> +{
> + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
> + n, available);
> +}
> +
> +/**
> + * Dequeue one object from a ST ring from the head.
> + *
> + * This function copies the objects from the head of the ring and
> + * moves the head index (backwards).
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_p
> + * A pointer to a void * pointer (object) that will be filled.
> + * @return
> + * - 0: Success, objects dequeued.
> + * - -ENOENT: Not enough entries in the ring to dequeue, no object is
> + * dequeued.
> + */
> +static __rte_always_inline int
> +rte_st_ring_at_head_dequeue(struct rte_st_ring *r, void **obj_p)
> +{
> + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *));
> +}
> +
> +/**
> + * Flush a ST ring.
> + *
> + * This function flush all the elements in a ST ring
> + *
> + * @warning
> + * Make sure the ring is not in use while calling this function.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + */
> +void
> +rte_st_ring_reset(struct rte_st_ring *r);
> +
> +/**
> + * Return the number of entries in a ST ring.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * The number of entries in the ring.
> + */
> +static inline unsigned int
> +rte_st_ring_count(const struct rte_st_ring *r)
> +{
> + uint32_t count = (r->head - r->tail) & r->mask;
> + return count;
> +}
> +
> +/**
> + * Return the number of free entries in a ST ring.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * The number of free entries in the ring.
> + */
> +static inline unsigned int
> +rte_st_ring_free_count(const struct rte_st_ring *r)
> +{
> + return r->capacity - rte_st_ring_count(r);
> +}
> +
> +/**
> + * Test if a ST ring is full.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * - 1: The ring is full.
> + * - 0: The ring is not full.
> + */
> +static inline int
> +rte_st_ring_full(const struct rte_st_ring *r)
> +{
> + return rte_st_ring_free_count(r) == 0;
> +}
> +
> +/**
> + * Test if a ST ring is empty.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * - 1: The ring is empty.
> + * - 0: The ring is not empty.
> + */
> +static inline int
> +rte_st_ring_empty(const struct rte_st_ring *r)
> +{
> + return r->tail == r->head;
> +}
> +
> +/**
> + * Return the size of the ring.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * The size of the data store used by the ring.
> + * NOTE: this is not the same as the usable space in the ring. To query
> that
> + * use ``rte_st_ring_get_capacity()``.
> + */
> +static inline unsigned int
> +rte_st_ring_get_size(const struct rte_st_ring *r)
> +{
> + return r->size;
> +}
> +
> +/**
> + * Return the number of elements which can be stored in the ring.
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @return
> + * The usable size of the ring.
> + */
> +static inline unsigned int
> +rte_st_ring_get_capacity(const struct rte_st_ring *r)
> +{
> + return r->capacity;
> +}
> +
> +/**
> + * Dump the status of all rings on the console
> + *
> + * @param f
> + * A pointer to a file for output
> + */
> +void rte_st_ring_list_dump(FILE *f);
> +
> +/**
> + * Search a ST ring from its name
> + *
> + * @param name
> + * The name of the ring.
> + * @return
> + * The pointer to the ring matching the name, or NULL if not found,
> + * with rte_errno set appropriately. Possible rte_errno values include:
> + * - ENOENT - required entry not available to return.
> + */
> +struct rte_st_ring *rte_st_ring_lookup(const char *name);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_ST_RING_H_ */
> --
> 2.25.1