> > > > For rings with producer/consumer in RTE_RING_SYNC_ST, > > RTE_RING_SYNC_MT_HTS mode, provide an ability to split enqueue/dequeue > > operation into two phases: > > - enqueue/dequeue start > > - enqueue/dequeue finish > > That allows user to inspect objects in the ring without removing them from > > it > > (aka MT safe peek). > > > > Signed-off-by: Konstantin Ananyev <konstantin.anan...@intel.com> > > --- > > lib/librte_ring/Makefile | 1 + > > lib/librte_ring/meson.build | 1 + > > lib/librte_ring/rte_ring_c11_mem.h | 44 +++ > > lib/librte_ring/rte_ring_elem.h | 4 + > > lib/librte_ring/rte_ring_generic.h | 48 ++++ > > lib/librte_ring/rte_ring_hts_generic.h | 47 ++- > > lib/librte_ring/rte_ring_peek.h | 379 +++++++++++++++++++++++++ > > 7 files changed, 519 insertions(+), 5 deletions(-) create mode 100644 > > lib/librte_ring/rte_ring_peek.h > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > > 6fe500f0d..5f8662737 100644 > > --- a/lib/librte_ring/Makefile > > +++ b/lib/librte_ring/Makefile > > @@ -22,6 +22,7 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := > > rte_ring.h \ > > rte_ring_hts.h \ > > rte_ring_hts_elem.h \ > > rte_ring_hts_generic.h \ > > + rte_ring_peek.h \ > > rte_ring_rts.h \ > > rte_ring_rts_elem.h \ > > rte_ring_rts_generic.h > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index > > 8e86e037a..f5f84dc6e 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -9,6 +9,7 @@ headers = files('rte_ring.h', > > 'rte_ring_hts.h', > > 'rte_ring_hts_elem.h', > > 'rte_ring_hts_generic.h', > > + 'rte_ring_peek.h', > > 'rte_ring_rts.h', > > 'rte_ring_rts_elem.h', > > 'rte_ring_rts_generic.h') > > diff --git a/lib/librte_ring/rte_ring_c11_mem.h > > b/lib/librte_ring/rte_ring_c11_mem.h > > index 0fb73a337..bb3096721 100644 > > --- a/lib/librte_ring/rte_ring_c11_mem.h > > +++ b/lib/librte_ring/rte_ring_c11_mem.h > > @@ -10,6 +10,50 @@ > > #ifndef _RTE_RING_C11_MEM_H_ > > #define _RTE_RING_C11_MEM_H_ > > > > +/** > > + * @internal get current tail value. > > + * This function should be used only for single thread producer/consumer. > > + * Check that user didn't request to move tail above the head. > Do we need this check? This could be a data path function, we could document > a warning and leave it to the users to provide the correct > value.
I don't think this extra check will cause any extra slowdown. >From other side, it seems useful to have it - might help people to debug/root-cause an issue. > > + * In that situation: > > + * - return zero, that will cause abort any pending changes and > > + * return head to its previous position. > > + * - throw an assert in debug mode. > > + */ > > +static __rte_always_inline uint32_t > > +__rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail, > > + uint32_t num) > > +{ > > + uint32_t h, n, t; > > + > > + h = ht->head; > > + t = ht->tail; > > + n = h - t; > > + > > + RTE_ASSERT(n >= num); > > + num = (n >= num) ? num : 0; > > + > > + *tail = h; > > + return num; > > +} > > + > > +/** > > + * @internal set new values for head and tail. > > + * This function should be used only for single thread producer/consumer. > > + * Should be used only in conjunction with __rte_ring_st_get_tail. > > + */ > > +static __rte_always_inline void > > +__rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail, > > + uint32_t num, uint32_t enqueue) > > +{ > > + uint32_t pos; > > + > > + RTE_SET_USED(enqueue); > > + > > + pos = tail + num; > > + ht->head = pos; > > + __atomic_store_n(&ht->tail, pos, __ATOMIC_RELEASE); } > > + > > static __rte_always_inline void > > update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t > > new_val, > > uint32_t single, uint32_t enqueue) > > diff --git a/lib/librte_ring/rte_ring_elem.h > > b/lib/librte_ring/rte_ring_elem.h > > index 010a564c1..5bf7c1c1b 100644 > > --- a/lib/librte_ring/rte_ring_elem.h > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -1083,6 +1083,10 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > void *obj_table, > > return 0; > > } > > > > +#ifdef ALLOW_EXPERIMENTAL_API > > +#include <rte_ring_peek.h> > > +#endif > > + > > #ifdef __cplusplus > > } > > #endif > > diff --git a/lib/librte_ring/rte_ring_generic.h > > b/lib/librte_ring/rte_ring_generic.h > > index 953cdbbd5..9f5fdf13b 100644 > > --- a/lib/librte_ring/rte_ring_generic.h > > +++ b/lib/librte_ring/rte_ring_generic.h > > @@ -10,6 +10,54 @@ > > #ifndef _RTE_RING_GENERIC_H_ > > #define _RTE_RING_GENERIC_H_ > > > > +/** > > + * @internal get current tail value. > > + * This function should be used only for single thread producer/consumer. > > + * Check that user didn't request to move tail above the head. > > + * In that situation: > > + * - return zero, that will cause abort any pending changes and > > + * return head to its previous position. > > + * - throw an assert in debug mode. > > + */ > > +static __rte_always_inline uint32_t > > +__rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail, > > + uint32_t num) > > +{ > > + uint32_t h, n, t; > > + > > + h = ht->head; > > + t = ht->tail; > > + n = h - t; > > + > > + RTE_ASSERT(n >= num); > > + num = (n >= num) ? num : 0; > > + > > + *tail = h; > > + return num; > > +} > > + > > +/** > > + * @internal set new values for head and tail. > > + * This function should be used only for single thread producer/consumer. > > + * Should be used only in conjunction with __rte_ring_st_get_tail. > > + */ > > +static __rte_always_inline void > > +__rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail, > > + uint32_t num, uint32_t enqueue) > > +{ > > + uint32_t pos; > > + > > + pos = tail + num; > > + > > + if (enqueue) > > + rte_smp_wmb(); > > + else > > + rte_smp_rmb(); > > + > > + ht->head = pos; > > + ht->tail = pos; > > +} > > + > > static __rte_always_inline void > > update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t > > new_val, > > uint32_t single, uint32_t enqueue) > > diff --git a/lib/librte_ring/rte_ring_hts_generic.h > > b/lib/librte_ring/rte_ring_hts_generic.h > > index da08f1d94..8e699c006 100644 > > --- a/lib/librte_ring/rte_ring_hts_generic.h > > +++ b/lib/librte_ring/rte_ring_hts_generic.h > > @@ -18,9 +18,38 @@ > > * For more information please refer to <rte_ring_hts.h>. > > */ > > > > +/** > > + * @internal get current tail value. > > + * Check that user didn't request to move tail above the head. > > + * In that situation: > > + * - return zero, that will cause abort any pending changes and > > + * return head to its previous position. > > + * - throw an assert in debug mode. > > + */ > > +static __rte_always_inline uint32_t > > +__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail, > > + uint32_t num) > > +{ > > + uint32_t n; > > + union rte_ring_ht_pos p; > > + > > + p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht- > > >ht.raw); > > + n = p.pos.head - p.pos.tail; > > + > > + RTE_ASSERT(n >= num); > > + num = (n >= num) ? num : 0; > > + > > + *tail = p.pos.tail; > > + return num; > > +} > > + > > +/** > > + * @internal set new values for head and tail as one atomic 64 bit > > operation. > > + * Should be used only in conjunction with __rte_ring_hts_get_tail. > > + */ > > static __rte_always_inline void > > -__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num, > > - uint32_t enqueue) > > +__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t > > tail, > > + uint32_t num, uint32_t enqueue) > > { > > union rte_ring_ht_pos p; > > > > @@ -29,14 +58,22 @@ __rte_ring_hts_update_tail(struct > > rte_ring_hts_headtail *ht, uint32_t num, > > else > > rte_smp_rmb(); > > > > - p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht- > > >ht.raw); > > - > > - p.pos.head = p.pos.tail + num; > > + p.pos.head = tail + num; > > p.pos.tail = p.pos.head; > > > > rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); } > > > > +static __rte_always_inline void > > +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num, > > + uint32_t enqueue) > > +{ > > + uint32_t tail; > > + > > + num = __rte_ring_hts_get_tail(ht, &tail, num); > > + __rte_ring_hts_set_head_tail(ht, tail, num, enqueue); } > > + > > /** > > * @internal waits till tail will become equal to head. > > * Means no writer/reader is active for that ring. > > diff --git a/lib/librte_ring/rte_ring_peek.h > > b/lib/librte_ring/rte_ring_peek.h > > new file mode 100644 index 000000000..baefd2f7b > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_peek.h > > @@ -0,0 +1,379 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_PEEK_H_ > > +#define _RTE_RING_PEEK_H_ > > + > > +/** > > + * @file > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * It is not recommended to include this file directly. > > + * Please include <rte_ring_elem.h> instead. > > + * > > + * Ring Peek AP > ^^^ API > > + * Introduction of rte_ring with serialized producer/consumer (HTS sync > > +mode) > > + * makes possible to split public enqueue/dequeue API into two phases: > > + * - enqueue/dequeue start > > + * - enqueue/dequeue finish > > + * That allows user to inspect objects in the ring without removing > > +them > > + * from it (aka MT safe peek). > > + * Note that right now this new API is avaialble only for two sync modes: > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > + * It is a user responsibility to create/init ring with appropriate > > +sync > > + * modes selected. > > + * As an example: > > + * // read 1 elem from the ring: > > + * n = rte_ring_hts_dequeue_bulk_start(ring, &obj, 1, NULL); > > + * if (n != 0) { > > + * //examine object > > + * if (object_examine(obj) == KEEP) > > + * //decided to keep it in the ring. > > + * rte_ring_hts_dequeue_finish(ring, 0); > > + * else > > + * //decided to remove it from the ring. > > + * rte_ring_hts_dequeue_finish(ring, n); > > + * } > > + * Note that between _start_ and _finish_ the ring is sort of locked - > > ^^^^^^^^^^^^^^^^^^^^ - 'locked' can mean different to different people, > may be remove this, > the next sentence anyway has the description > > + * none other thread can proceed with enqueue(/dequeue) operation till > ^^^^ no > > + * _finish_ will complete. > ^^^^^^^^^^^ completes > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > <snip> > > > + > > +/** > > + * Start to enqueue several objects on the ring. > > + * Note that no actual objects are put in the queue by this function, > > + * it just reserves for user such ability. > > + * User has to call appropriate enqueue_finish() to copy objects into > > +the > > + * queue and complete given enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * enqueue operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_bulk_start(struct rte_ring *r, unsigned int n, > > + unsigned int *free_space) > If one wants to use _elem_ APIs for ring peek, a combination of legacy API > (format) and a _elem_ API is required. > For ex: > rte_ring_enqueue_bulk_start > rte_ring_enqueue_elem_finish > > I understand why you have done this. I think this is getting somewhat too > inconsistent. > Agree, there could be some confusion. Don't know what would be a better approach here.... 2 similar functions with exactly same parameter list (one wrapper for another): rte_ring_enqueue_elem_bulk_start() and rte_ring_enqueue_elem_bulk_start(). ? > > +{ > > + return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_FIXED, > > + free_space); > > +} > > + >