I think you will want a peek-type operation on the reader side. That more for
convenience, rather than that I think the copies will actually be there in the
object code (such should be eliminated by the compiler, given that the
barriers are gone).
3) More APIs and the rest of the implementation will come in subsequent
versions
lib/st_ring/rte_st_ring.h | 567
++++++++++++++++++++++++++++++++++++++
1 file changed, 567 insertions(+)
create mode 100644 lib/st_ring/rte_st_ring.h
diff --git a/lib/st_ring/rte_st_ring.h b/lib/st_ring/rte_st_ring.h new
file mode 100644 index 0000000000..8cb8832591
--- /dev/null
+++ b/lib/st_ring/rte_st_ring.h
@@ -0,0 +1,567 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Arm Limited
+ */
+
+#ifndef _RTE_ST_RING_H_
+#define _RTE_ST_RING_H_
+
+/**
+ * @file
+ * RTE Signle Thread Ring (ST Ring)
+ *
+ * The ST Ring is a fixed-size queue intended to be accessed
+ * by one thread at a time. It does not provide concurrent access to
+ * multiple threads. If there are multiple threads accessing the ST
+ring,
+ * then the threads have to use locks to protect the ring from
+ * getting corrupted.
You are basically saying the same thing three times here.
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Consumer and producer part of same thread.
+ * - Multi-thread producers and consumers need locking.
...two more times here. One might get the impression you really don't trust
the reader.
+ * - Single/Bulk/burst dequeue at Tail or Head
+ * - Single/Bulk/burst enqueue at Head or Tail
Does this not sound more like a deque, than a FIFO/circular buffer? Are there
any examples where this functionality (the double-endedness) is needed in
the DPDK code base?
+
+/**
+ * Calculate the memory size needed for a ST ring
+ *
+ * This function returns the number of bytes needed for a ST ring,
+given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_st_ring and the size of the memory needed by the
+ * elements. The value is aligned to a cache line size.
+ *
+ * @param count
+ * The number of elements in the ring (must be a power of 2).
+ * @return
+ * - The memory size needed for the ST ring on success.
+ * - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_st_ring_get_memsize(unsigned int count);
+
+/**
+ * Initialize a ST ring structure.
+ *
+ * Initialize a ST ring structure in memory pointed by "r". The size
+of the
+ * memory area must be large enough to store the ring structure and
+the
+ * object table. It is advised to use rte_st_ring_get_memsize() to
+get the
+ * appropriate size.
+ *
+ * The ST ring size is set to *count*, which must be a power of two.
+ * The real usable ring size is *count-1* instead of *count* to
+ * differentiate a full ring from an empty ring.
+ *
+ * The ring is not added in RTE_TAILQ_ST_RING global list. Indeed,
+the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ * The pointer to the ring structure followed by the elements table.
+ * @param name
+ * The name of the ring.
+ * @param count
+ * The number of elements in the ring (must be a power of 2,
+ * unless RTE_ST_RING_F_EXACT_SZ is set in flags).
+ * @param flags
+ * An OR of the following:
+ * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold
+ * exactly the requested number of entries, and the requested size
+ * will be rounded up to the next power of two, but the usable space
+ * will be exactly that requested. Worst case, if a power-of-2 size is
+ * requested, half the ring space will be wasted.
+ * Without this flag set, the ring size requested must be a power of 2,
+ * and the usable space will be that size - 1.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int rte_st_ring_init(struct rte_st_ring *r, const char *name,
+ unsigned int count, unsigned int flags);
+
+/**
+ * Create a new ST ring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then
+it
+ * calls rte_st_ring_init() to initialize an empty ring.
+ *
+ * The new ring size is set to *count*, which must be a power of two.
+ * The real usable ring size is *count-1* instead of *count* to
+ * differentiate a full ring from an empty ring.
+ *
+ * The ring is added in RTE_TAILQ_ST_RING list.
+ *
+ * @param name
+ * The name of the ring.
+ * @param count
+ * The size of the ring (must be a power of 2,
+ * unless RTE_ST_RING_F_EXACT_SZ is set in flags).
+ * @param socket_id
+ * The *socket_id* argument is the socket identifier in case of
+ * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ * constraint for the reserved zone.
+ * @param flags
+ * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold exactly
the
+ * requested number of entries, and the requested size will be rounded
up
+ * to the next power of two, but the usable space will be exactly that
+ * requested. Worst case, if a power-of-2 size is requested, half the
+ * ring space will be wasted.
+ * Without this flag set, the ring size requested must be a power of 2,
+ * and the usable space will be that size - 1.
+ * @return
+ * On success, the pointer to the new allocated ring. NULL on error with
+ * rte_errno set appropriately. Possible errno values include:
+ * - E_RTE_NO_CONFIG - function could not get pointer to rte_config
structure
+ * - EINVAL - count provided is not a power of 2
+ * - ENOSPC - the maximum number of memzones has already been
allocated
+ * - EEXIST - a memzone with the same name already exists
+ * - ENOMEM - no appropriate memory area found in which to create
memzone
+ */
+struct rte_st_ring *rte_st_ring_create(const char *name, unsigned int
count,
+ int socket_id, unsigned int flags);
+
+/**
+ * De-allocate all memory used by the ring.
+ *
+ * @param r
+ * Ring to free.
+ * If NULL then, the function does nothing.
+ */
+void rte_st_ring_free(struct rte_st_ring *r);
+
+/**
+ * Dump the status of the ring to a file.
+ *
+ * @param f
+ * A pointer to a file for output
+ * @param r
+ * A pointer to the ring structure.
+ */
+void rte_st_ring_dump(FILE *f, const struct rte_st_ring *r);
+
+/**
+ * Enqueue fixed number of objects on a ST ring.
+ *
+ * This function copies the objects at the head of the ring and
+ * moves the head index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_enqueue_bulk(struct rte_st_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space) {
+ return rte_st_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *),
+ n, free_space);
+}
+
+/**
+ * Enqueue upto a maximum number of objects on a ST ring.
+ *
+ * This function copies the objects at the head of the ring and
+ * moves the head index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_enqueue_burst(struct rte_st_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space) {
+ return rte_st_ring_enqueue_burst_elem(r, obj_table, sizeof(void *),
+ n, free_space);
+}
+
+/**
+ * Enqueue one object on a ST ring.
+ *
+ * This function copies one object at the head of the ring and
+ * moves the head index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is
enqueued.
+ */
+static __rte_always_inline int
+rte_st_ring_enqueue(struct rte_st_ring *r, void *obj) {
+ return rte_st_ring_enqueue_elem(r, &obj, sizeof(void *)); }
+
+/**
+ * Enqueue fixed number of objects on a ST ring at the tail.
+ *
+ * This function copies the objects at the tail of the ring and
+ * moves the tail index (backwards).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_enqueue_at_tail_bulk(struct rte_st_ring *r,
+ void * const *obj_table, unsigned int n,
+ unsigned int *free_space)
+{
+ return rte_st_ring_enqueue_at_tail_bulk_elem(r, obj_table,
+ sizeof(void *), n, free_space);
+}
+
+/**
+ * Enqueue upto a maximum number of objects on a ST ring at the tail.
+ *
+ * This function copies the objects at the tail of the ring and
+ * moves the tail index (backwards).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_enqueue_at_tail_burst(struct rte_st_ring *r,
+ void * const *obj_table, unsigned int n,
+ unsigned int *free_space)
+{
+ return rte_st_ring_enqueue_at_tail_burst_elem(r, obj_table,
+ sizeof(void *), n, free_space);
+}
+
+/**
+ * Enqueue one object on a ST ring at tail.
+ *
+ * This function copies one object at the tail of the ring and
+ * moves the tail index (backwards).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj
+ * A pointer to the object to be added.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is
enqueued.
+ */
+static __rte_always_inline int
+rte_st_ring_enqueue_at_tail(struct rte_st_ring *r, void *obj) {
+ return rte_st_ring_enqueue_at_tail_elem(r, &obj, sizeof(void *)); }
+
+/**
+ * Dequeue a fixed number of objects from a ST ring.
+ *
+ * This function copies the objects from the tail of the ring and
+ * moves the tail index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_dequeue_bulk(struct rte_st_ring *r, void **obj_table,
unsigned int n,
+ unsigned int *available)
+{
+ return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
+ n, available);
+}
+
+/**
+ * Dequeue upto a maximum number of objects from a ST ring.
+ *
+ * This function copies the objects from the tail of the ring and
+ * moves the tail index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Number of objects dequeued
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_dequeue_burst(struct rte_st_ring *r, void **obj_table,
+ unsigned int n, unsigned int *available) {
+ return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
+ n, available);
+}
+
+/**
+ * Dequeue one object from a ST ring.
+ *
+ * This function copies one object from the tail of the ring and
+ * moves the tail index.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to a void * pointer (object) that will be filled.
+ * @return
+ * - 0: Success, objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ * dequeued.
+ */
+static __rte_always_inline int
+rte_st_ring_dequeue(struct rte_st_ring *r, void **obj_p) {
+ return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); }
+
+/**
+ * Dequeue a fixed number of objects from a ST ring from the head.
+ *
+ * This function copies the objects from the head of the ring and
+ * moves the head index (backwards).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_dequeue_at_head_bulk(struct rte_st_ring *r, void
**obj_table, unsigned int n,
+ unsigned int *available)
+{
+ return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
+ n, available);
+}
+
+/**
+ * Dequeue upto a maximum number of objects from a ST ring from the
head.
+ *
+ * This function copies the objects from the head of the ring and
+ * moves the head index (backwards).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ * If non-NULL, returns the number of remaining ring entries after the
+ * dequeue has finished.
+ * @return
+ * - Number of objects dequeued
+ */
+static __rte_always_inline unsigned int
+rte_st_ring_dequeue_at_head_burst(struct rte_st_ring *r, void
**obj_table,
+ unsigned int n, unsigned int *available) {
+ return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
+ n, available);
+}
+
+/**
+ * Dequeue one object from a ST ring from the head.
+ *
+ * This function copies the objects from the head of the ring and
+ * moves the head index (backwards).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_p
+ * A pointer to a void * pointer (object) that will be filled.
+ * @return
+ * - 0: Success, objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ * dequeued.
+ */
+static __rte_always_inline int
+rte_st_ring_at_head_dequeue(struct rte_st_ring *r, void **obj_p) {
+ return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); }
+
+/**
+ * Flush a ST ring.
+ *
+ * This function flush all the elements in a ST ring
+ *
+ * @warning
+ * Make sure the ring is not in use while calling this function.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ */
+void
+rte_st_ring_reset(struct rte_st_ring *r);
+
+/**
+ * Return the number of entries in a ST ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The number of entries in the ring.
+ */
+static inline unsigned int
+rte_st_ring_count(const struct rte_st_ring *r) {
+ uint32_t count = (r->head - r->tail) & r->mask;
+ return count;
+}
+
+/**
+ * Return the number of free entries in a ST ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The number of free entries in the ring.
+ */
+static inline unsigned int
+rte_st_ring_free_count(const struct rte_st_ring *r) {
+ return r->capacity - rte_st_ring_count(r); }
+
+/**
+ * Test if a ST ring is full.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * - 1: The ring is full.
+ * - 0: The ring is not full.
+ */
+static inline int
+rte_st_ring_full(const struct rte_st_ring *r) {
+ return rte_st_ring_free_count(r) == 0; }
+
+/**
+ * Test if a ST ring is empty.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * - 1: The ring is empty.
+ * - 0: The ring is not empty.
+ */
+static inline int
+rte_st_ring_empty(const struct rte_st_ring *r) {
+ return r->tail == r->head;
+}
+
+/**
+ * Return the size of the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The size of the data store used by the ring.
+ * NOTE: this is not the same as the usable space in the ring. To query that
+ * use ``rte_st_ring_get_capacity()``.
+ */
+static inline unsigned int
+rte_st_ring_get_size(const struct rte_st_ring *r) {
+ return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The usable size of the ring.
+ */
+static inline unsigned int
+rte_st_ring_get_capacity(const struct rte_st_ring *r) {
+ return r->capacity;
+}
+
+/**
+ * Dump the status of all rings on the console
+ *
+ * @param f
+ * A pointer to a file for output
+ */
+void rte_st_ring_list_dump(FILE *f);
+
+/**
+ * Search a ST ring from its name
+ *
+ * @param name
+ * The name of the ring.
+ * @return
+ * The pointer to the ring matching the name, or NULL if not found,
+ * with rte_errno set appropriately. Possible rte_errno values include:
+ * - ENOENT - required entry not available to return.
+ */
+struct rte_st_ring *rte_st_ring_lookup(const char *name);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_ST_RING_H_ */