> -----Original Message----- > From: Mattias Rönnblom <hof...@lysator.liu.se> > Sent: Monday, August 21, 2023 4:14 PM > To: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>; > jack...@nvidia.com; konstantin.v.anan...@yandex.ru > Cc: dev@dpdk.org; Ruifeng Wang <ruifeng.w...@arm.com>; Aditya > Ambadipudi <aditya.ambadip...@arm.com>; Wathsala Wathawana > Vithanage <wathsala.vithan...@arm.com>; nd <n...@arm.com> > Subject: Re: [RFC] lib/st_ring: add single thread ring > > On 2023-08-21 08:04, Honnappa Nagarahalli wrote: > > Add a single thread safe and multi-thread unsafe ring data structure. > > One must have set the bar very low, if one needs to specify that an API is > single-thread safe. > > > This library provides an simple and efficient alternative to > > multi-thread safe ring when multi-thread safety is not required. > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > --- > > v1: > > 1) The code is very prelimnary and is not even compiled > > 2) This is intended to show the APIs and some thoughts on > > implementation > > If you haven't done it already, maybe it might be worth looking around in the > code base for already-existing, more-or-less open-coded fifo/circular buffer > type data structures. Just to make sure those can be eliminated if this makes > it into DPDK. > > There's one in rte_event_eth_rx_adapter.c, and I think one in the SW > eventdev as well. Seems to be one in cmdline_cirbuf.h as well. I'm sure there > are many more. I knew there are some, but have not looked at them yet. I will look at them.
> > You could pick some other name for it, instead of the slightly awkward > "st_ring" (e.g., "fifo", "cbuf", "cbuffer", "circ_buffer"). That would also > leave > you with more freedom to stray from the MT safe ring API without surprising > the user, if needed (and I think it is needed). The thought was to make it clear that this is for single-thread use (i.e.not even producer and consumer on different threads), may be I do not need to try hard. "fifo" might not be good option given that dequeue/enqueue at both ends of the ring are required/allowed. Wikipedia [1] and others [2], [3] indicates that this data structure should be called 'deque' (pronounced as deck). I would prefer to go with this (assuming this will be outside of 'rte_ring') [1] https://en.wikipedia.org/wiki/Double-ended_queue [2] https://www.geeksforgeeks.org/deque-set-1-introduction-applications/ [3] https://stackoverflow.com/questions/3880254/why-do-we-need-deque-data-structures-in-the-real-world#:~:text=A%20Deque%20is%20a%20double,thing%20on%20front%20of%20queue. > > Hopefully you can reduce API complexity compared to the MT-safe version. > Having a name for these kinds of data structures doesn't make a lot of sense, > for example. Skip the dump function. Relax from always_inline to just regular > inline. Yes, plan is to reduce complexity (compared to rte_ring) and some APIs can be skipped until there is a need. > > I'm not sure you need bulk/burst type operations. Without any memory > fences, an optimizing compiler should do a pretty good job of unrolling > multiple-element access type operations, assuming you leave the ST ring > code in the header files (otherwise LTO is needed). IMO, bulk/burst APIs are about the functionality rather than loop unrolling. APIs to work with single objects can be skipped (use bulk APIs with n=1). > > I think you will want a peek-type operation on the reader side. That more for > convenience, rather than that I think the copies will actually be there in the > object code (such should be eliminated by the compiler, given that the > barriers are gone). > > > 3) More APIs and the rest of the implementation will come in subsequent > > versions > > > > lib/st_ring/rte_st_ring.h | 567 > ++++++++++++++++++++++++++++++++++++++ > > 1 file changed, 567 insertions(+) > > create mode 100644 lib/st_ring/rte_st_ring.h > > > > diff --git a/lib/st_ring/rte_st_ring.h b/lib/st_ring/rte_st_ring.h new > > file mode 100644 index 0000000000..8cb8832591 > > --- /dev/null > > +++ b/lib/st_ring/rte_st_ring.h > > @@ -0,0 +1,567 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * Copyright(c) 2023 Arm Limited > > + */ > > + > > +#ifndef _RTE_ST_RING_H_ > > +#define _RTE_ST_RING_H_ > > + > > +/** > > + * @file > > + * RTE Signle Thread Ring (ST Ring) > > + * > > + * The ST Ring is a fixed-size queue intended to be accessed > > + * by one thread at a time. It does not provide concurrent access to > > + * multiple threads. If there are multiple threads accessing the ST > > +ring, > > + * then the threads have to use locks to protect the ring from > > + * getting corrupted. > > You are basically saying the same thing three times here. > > > + * > > + * - FIFO (First In First Out) > > + * - Maximum size is fixed; the pointers are stored in a table. > > + * - Consumer and producer part of same thread. > > + * - Multi-thread producers and consumers need locking. > > ...two more times here. One might get the impression you really don't trust > the reader. > > > + * - Single/Bulk/burst dequeue at Tail or Head > > + * - Single/Bulk/burst enqueue at Head or Tail > > Does this not sound more like a deque, than a FIFO/circular buffer? Are there > any examples where this functionality (the double-endedness) is needed in > the DPDK code base? I see, you are calling it 'deque' as well. Basically, this patch originated due to a requirement in MLX PMD [1] [1] https://github.com/DPDK/dpdk/blob/main/drivers/net/mlx5/mlx5_hws_cnt.h#L381 > > > + * > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <rte_st_ring_core.h> > > +#include <rte_st_ring_elem.h> > > Is the intention to provide a ring with compile-time variable element size? In > other words, where the elements of a particular ring instance has the same > element size, but different rings may have different element sizes. > > Seems like a good idea to me, in that case. Although often you will have > pointers, it would be useful to store larger things like small structs, and > maybe smaller elements as well. Yes, the idea is to make the element size flexible and also compile-time constant. > > > + > > +/** > > + * Calculate the memory size needed for a ST ring > > + * > > + * This function returns the number of bytes needed for a ST ring, > > +given > > + * the number of elements in it. This value is the sum of the size of > > + * the structure rte_st_ring and the size of the memory needed by the > > + * elements. The value is aligned to a cache line size. > > + * > > + * @param count > > + * The number of elements in the ring (must be a power of 2). > > + * @return > > + * - The memory size needed for the ST ring on success. > > + * - -EINVAL if count is not a power of 2. > > + */ > > +ssize_t rte_st_ring_get_memsize(unsigned int count); > > + > > +/** > > + * Initialize a ST ring structure. > > + * > > + * Initialize a ST ring structure in memory pointed by "r". The size > > +of the > > + * memory area must be large enough to store the ring structure and > > +the > > + * object table. It is advised to use rte_st_ring_get_memsize() to > > +get the > > + * appropriate size. > > + * > > + * The ST ring size is set to *count*, which must be a power of two. > > + * The real usable ring size is *count-1* instead of *count* to > > + * differentiate a full ring from an empty ring. > > + * > > + * The ring is not added in RTE_TAILQ_ST_RING global list. Indeed, > > +the > > + * memory given by the caller may not be shareable among dpdk > > + * processes. > > + * > > + * @param r > > + * The pointer to the ring structure followed by the elements table. > > + * @param name > > + * The name of the ring. > > + * @param count > > + * The number of elements in the ring (must be a power of 2, > > + * unless RTE_ST_RING_F_EXACT_SZ is set in flags). > > + * @param flags > > + * An OR of the following: > > + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold > > + * exactly the requested number of entries, and the requested size > > + * will be rounded up to the next power of two, but the usable space > > + * will be exactly that requested. Worst case, if a power-of-2 size is > > + * requested, half the ring space will be wasted. > > + * Without this flag set, the ring size requested must be a power of 2, > > + * and the usable space will be that size - 1. > > + * @return > > + * 0 on success, or a negative value on error. > > + */ > > +int rte_st_ring_init(struct rte_st_ring *r, const char *name, > > + unsigned int count, unsigned int flags); > > + > > +/** > > + * Create a new ST ring named *name* in memory. > > + * > > + * This function uses ``memzone_reserve()`` to allocate memory. Then > > +it > > + * calls rte_st_ring_init() to initialize an empty ring. > > + * > > + * The new ring size is set to *count*, which must be a power of two. > > + * The real usable ring size is *count-1* instead of *count* to > > + * differentiate a full ring from an empty ring. > > + * > > + * The ring is added in RTE_TAILQ_ST_RING list. > > + * > > + * @param name > > + * The name of the ring. > > + * @param count > > + * The size of the ring (must be a power of 2, > > + * unless RTE_ST_RING_F_EXACT_SZ is set in flags). > > + * @param socket_id > > + * The *socket_id* argument is the socket identifier in case of > > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > > + * constraint for the reserved zone. > > + * @param flags > > + * - RTE_ST_RING_F_EXACT_SZ: If this flag is set, the ring will hold > > exactly > the > > + * requested number of entries, and the requested size will be rounded > up > > + * to the next power of two, but the usable space will be exactly that > > + * requested. Worst case, if a power-of-2 size is requested, half the > > + * ring space will be wasted. > > + * Without this flag set, the ring size requested must be a power of 2, > > + * and the usable space will be that size - 1. > > + * @return > > + * On success, the pointer to the new allocated ring. NULL on error with > > + * rte_errno set appropriately. Possible errno values include: > > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > structure > > + * - EINVAL - count provided is not a power of 2 > > + * - ENOSPC - the maximum number of memzones has already been > allocated > > + * - EEXIST - a memzone with the same name already exists > > + * - ENOMEM - no appropriate memory area found in which to create > memzone > > + */ > > +struct rte_st_ring *rte_st_ring_create(const char *name, unsigned int > count, > > + int socket_id, unsigned int flags); > > + > > +/** > > + * De-allocate all memory used by the ring. > > + * > > + * @param r > > + * Ring to free. > > + * If NULL then, the function does nothing. > > + */ > > +void rte_st_ring_free(struct rte_st_ring *r); > > + > > +/** > > + * Dump the status of the ring to a file. > > + * > > + * @param f > > + * A pointer to a file for output > > + * @param r > > + * A pointer to the ring structure. > > + */ > > +void rte_st_ring_dump(FILE *f, const struct rte_st_ring *r); > > + > > +/** > > + * Enqueue fixed number of objects on a ST ring. > > + * > > + * This function copies the objects at the head of the ring and > > + * moves the head index. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * The number of objects enqueued, either 0 or n > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_enqueue_bulk(struct rte_st_ring *r, void * const *obj_table, > > + unsigned int n, unsigned int *free_space) { > > + return rte_st_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *), > > + n, free_space); > > +} > > + > > +/** > > + * Enqueue upto a maximum number of objects on a ST ring. > > + * > > + * This function copies the objects at the head of the ring and > > + * moves the head index. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * - n: Actual number of objects enqueued. > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_enqueue_burst(struct rte_st_ring *r, void * const *obj_table, > > + unsigned int n, unsigned int *free_space) { > > + return rte_st_ring_enqueue_burst_elem(r, obj_table, sizeof(void *), > > + n, free_space); > > +} > > + > > +/** > > + * Enqueue one object on a ST ring. > > + * > > + * This function copies one object at the head of the ring and > > + * moves the head index. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj > > + * A pointer to the object to be added. > > + * @return > > + * - 0: Success; objects enqueued. > > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is > enqueued. > > + */ > > +static __rte_always_inline int > > +rte_st_ring_enqueue(struct rte_st_ring *r, void *obj) { > > + return rte_st_ring_enqueue_elem(r, &obj, sizeof(void *)); } > > + > > +/** > > + * Enqueue fixed number of objects on a ST ring at the tail. > > + * > > + * This function copies the objects at the tail of the ring and > > + * moves the tail index (backwards). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * The number of objects enqueued, either 0 or n > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_enqueue_at_tail_bulk(struct rte_st_ring *r, > > + void * const *obj_table, unsigned int n, > > + unsigned int *free_space) > > +{ > > + return rte_st_ring_enqueue_at_tail_bulk_elem(r, obj_table, > > + sizeof(void *), n, free_space); > > +} > > + > > +/** > > + * Enqueue upto a maximum number of objects on a ST ring at the tail. > > + * > > + * This function copies the objects at the tail of the ring and > > + * moves the tail index (backwards). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * - n: Actual number of objects enqueued. > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_enqueue_at_tail_burst(struct rte_st_ring *r, > > + void * const *obj_table, unsigned int n, > > + unsigned int *free_space) > > +{ > > + return rte_st_ring_enqueue_at_tail_burst_elem(r, obj_table, > > + sizeof(void *), n, free_space); > > +} > > + > > +/** > > + * Enqueue one object on a ST ring at tail. > > + * > > + * This function copies one object at the tail of the ring and > > + * moves the tail index (backwards). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj > > + * A pointer to the object to be added. > > + * @return > > + * - 0: Success; objects enqueued. > > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is > enqueued. > > + */ > > +static __rte_always_inline int > > +rte_st_ring_enqueue_at_tail(struct rte_st_ring *r, void *obj) { > > + return rte_st_ring_enqueue_at_tail_elem(r, &obj, sizeof(void *)); } > > + > > +/** > > + * Dequeue a fixed number of objects from a ST ring. > > + * > > + * This function copies the objects from the tail of the ring and > > + * moves the tail index. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be filled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * The number of objects dequeued, either 0 or n > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_dequeue_bulk(struct rte_st_ring *r, void **obj_table, > unsigned int n, > > + unsigned int *available) > > +{ > > + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), > > + n, available); > > +} > > + > > +/** > > + * Dequeue upto a maximum number of objects from a ST ring. > > + * > > + * This function copies the objects from the tail of the ring and > > + * moves the tail index. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be filled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * - Number of objects dequeued > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_dequeue_burst(struct rte_st_ring *r, void **obj_table, > > + unsigned int n, unsigned int *available) { > > + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), > > + n, available); > > +} > > + > > +/** > > + * Dequeue one object from a ST ring. > > + * > > + * This function copies one object from the tail of the ring and > > + * moves the tail index. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_p > > + * A pointer to a void * pointer (object) that will be filled. > > + * @return > > + * - 0: Success, objects dequeued. > > + * - -ENOENT: Not enough entries in the ring to dequeue, no object is > > + * dequeued. > > + */ > > +static __rte_always_inline int > > +rte_st_ring_dequeue(struct rte_st_ring *r, void **obj_p) { > > + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); } > > + > > +/** > > + * Dequeue a fixed number of objects from a ST ring from the head. > > + * > > + * This function copies the objects from the head of the ring and > > + * moves the head index (backwards). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be filled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * The number of objects dequeued, either 0 or n > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_dequeue_at_head_bulk(struct rte_st_ring *r, void > **obj_table, unsigned int n, > > + unsigned int *available) > > +{ > > + return rte_st_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *), > > + n, available); > > +} > > + > > +/** > > + * Dequeue upto a maximum number of objects from a ST ring from the > head. > > + * > > + * This function copies the objects from the head of the ring and > > + * moves the head index (backwards). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects) that will be filled. > > + * @param n > > + * The number of objects to dequeue from the ring to the obj_table. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * - Number of objects dequeued > > + */ > > +static __rte_always_inline unsigned int > > +rte_st_ring_dequeue_at_head_burst(struct rte_st_ring *r, void > **obj_table, > > + unsigned int n, unsigned int *available) { > > + return rte_st_ring_dequeue_burst_elem(r, obj_table, sizeof(void *), > > + n, available); > > +} > > + > > +/** > > + * Dequeue one object from a ST ring from the head. > > + * > > + * This function copies the objects from the head of the ring and > > + * moves the head index (backwards). > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_p > > + * A pointer to a void * pointer (object) that will be filled. > > + * @return > > + * - 0: Success, objects dequeued. > > + * - -ENOENT: Not enough entries in the ring to dequeue, no object is > > + * dequeued. > > + */ > > +static __rte_always_inline int > > +rte_st_ring_at_head_dequeue(struct rte_st_ring *r, void **obj_p) { > > + return rte_st_ring_dequeue_elem(r, obj_p, sizeof(void *)); } > > + > > +/** > > + * Flush a ST ring. > > + * > > + * This function flush all the elements in a ST ring > > + * > > + * @warning > > + * Make sure the ring is not in use while calling this function. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + */ > > +void > > +rte_st_ring_reset(struct rte_st_ring *r); > > + > > +/** > > + * Return the number of entries in a ST ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * The number of entries in the ring. > > + */ > > +static inline unsigned int > > +rte_st_ring_count(const struct rte_st_ring *r) { > > + uint32_t count = (r->head - r->tail) & r->mask; > > + return count; > > +} > > + > > +/** > > + * Return the number of free entries in a ST ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * The number of free entries in the ring. > > + */ > > +static inline unsigned int > > +rte_st_ring_free_count(const struct rte_st_ring *r) { > > + return r->capacity - rte_st_ring_count(r); } > > + > > +/** > > + * Test if a ST ring is full. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * - 1: The ring is full. > > + * - 0: The ring is not full. > > + */ > > +static inline int > > +rte_st_ring_full(const struct rte_st_ring *r) { > > + return rte_st_ring_free_count(r) == 0; } > > + > > +/** > > + * Test if a ST ring is empty. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * - 1: The ring is empty. > > + * - 0: The ring is not empty. > > + */ > > +static inline int > > +rte_st_ring_empty(const struct rte_st_ring *r) { > > + return r->tail == r->head; > > +} > > + > > +/** > > + * Return the size of the ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * The size of the data store used by the ring. > > + * NOTE: this is not the same as the usable space in the ring. To query > > that > > + * use ``rte_st_ring_get_capacity()``. > > + */ > > +static inline unsigned int > > +rte_st_ring_get_size(const struct rte_st_ring *r) { > > + return r->size; > > +} > > + > > +/** > > + * Return the number of elements which can be stored in the ring. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @return > > + * The usable size of the ring. > > + */ > > +static inline unsigned int > > +rte_st_ring_get_capacity(const struct rte_st_ring *r) { > > + return r->capacity; > > +} > > + > > +/** > > + * Dump the status of all rings on the console > > + * > > + * @param f > > + * A pointer to a file for output > > + */ > > +void rte_st_ring_list_dump(FILE *f); > > + > > +/** > > + * Search a ST ring from its name > > + * > > + * @param name > > + * The name of the ring. > > + * @return > > + * The pointer to the ring matching the name, or NULL if not found, > > + * with rte_errno set appropriately. Possible rte_errno values include: > > + * - ENOENT - required entry not available to return. > > + */ > > +struct rte_st_ring *rte_st_ring_lookup(const char *name); > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_ST_RING_H_ */