Hi,
> The timing of the update of the RTS enqueues/dequeues tail is > limited to the last enqueues/dequeues, which reduces concurrency, > so the RTS interface of the V2 version is added, which makes the tail > of the enqueues/dequeues not limited to the last enqueues/dequeues > and thus enables timely updates to increase concurrency. That's description is way too cryptic to me and really just creates more confusion instead of explain things: I have to go and read through the code to understand what you are up to. In fact, I don't think the approach you used will work properly dues to race Conditions (see below for more details). But for future reference, when you are introducing a new sync mechanism for the ring please do: 1. explain clearly what particular problem(s) with existing one(s) you are trying to address. 2. clearly explain new sync mechanism you are going to introduce and why/when you believe It would behave better than existing ones 3. In case of performance improvement claims - provide some reproducible numbers either with ring_stress_test or some dpdk packet processing sample app or both. > Add some corresponding test cases. > > Signed-off-by: Huichao Cai <chcch...@163.com> > --- ... > diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c > index aebb6d6728..ada1ae88fa 100644 > --- a/lib/ring/rte_ring.c > +++ b/lib/ring/rte_ring.c > @@ -43,7 +43,8 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > /* mask of all valid flag values to ring_create() */ > #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \ > RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ | \ > - RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ) > + RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ | \ > + RING_F_MP_RTS_V2_ENQ | RING_F_MC_RTS_V2_DEQ) > > /* true if x is a power of 2 */ > #define POWEROF2(x) ((((x)-1) & (x)) == 0) > @@ -106,6 +107,7 @@ reset_headtail(void *p) > ht->tail = 0; > break; > case RTE_RING_SYNC_MT_RTS: > + case RTE_RING_SYNC_MT_RTS_V2: > ht_rts->head.raw = 0; > ht_rts->tail.raw = 0; > break; > @@ -135,9 +137,11 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type > *prod_st, > enum rte_ring_sync_type *cons_st) > { > static const uint32_t prod_st_flags = > - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); > + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ | > + RING_F_MP_RTS_V2_ENQ); > static const uint32_t cons_st_flags = > - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); > + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ | > + RING_F_MC_RTS_V2_DEQ); > > switch (flags & prod_st_flags) { > case 0: > @@ -152,6 +156,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type > *prod_st, > case RING_F_MP_HTS_ENQ: > *prod_st = RTE_RING_SYNC_MT_HTS; > break; > + case RING_F_MP_RTS_V2_ENQ: > + *prod_st = RTE_RING_SYNC_MT_RTS_V2; > + break; > default: > return -EINVAL; > } > @@ -169,6 +176,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type > *prod_st, > case RING_F_MC_HTS_DEQ: > *cons_st = RTE_RING_SYNC_MT_HTS; > break; > + case RING_F_MC_RTS_V2_DEQ: > + *cons_st = RTE_RING_SYNC_MT_RTS_V2; > + break; > default: > return -EINVAL; > } > @@ -239,6 +249,28 @@ rte_ring_init(struct rte_ring *r, const char *name, > unsigned int count, > if (flags & RING_F_MC_RTS_DEQ) > rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); > > + /* set default values for head-tail distance and allocate memory to > cache */ > + if (flags & RING_F_MP_RTS_V2_ENQ) { > + rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF); > + r->rts_prod.rts_cache = (struct rte_ring_rts_cache > *)rte_zmalloc( > + "RTS_PROD_CACHE", sizeof(struct rte_ring_rts_cache) * > r->size, 0); That doesn't look right at all - rte_ring_init() not supposed to allocate extra memory. It is a caller responsibility to provide buffer large enough to hold whole ring (including actual data and meta-data). Ideally, if your sync mechanism needs extra space it should be reported by rte_ring_get_memsize_elem(). Though right now it takes only elem size and count... One approach might be to introduce new function for that, another introduce new high-level struct, instead of rte_ring. Though I suppose first thing that needs to be done fix race-conditions and prove that such addition is really worth from performance perspective. > + if (r->rts_prod.rts_cache == NULL) { > + RING_LOG(ERR, "Cannot reserve memory for rts prod > cache"); > + return -ENOMEM; > + } > + } > + if (flags & RING_F_MC_RTS_V2_DEQ) { > + rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); > + r->rts_cons.rts_cache = (struct rte_ring_rts_cache > *)rte_zmalloc( > + "RTS_CONS_CACHE", sizeof(struct rte_ring_rts_cache) * > r->size, 0); > + if (r->rts_cons.rts_cache == NULL) { > + if (flags & RING_F_MP_RTS_V2_ENQ) > + rte_free(r->rts_prod.rts_cache); > + RING_LOG(ERR, "Cannot reserve memory for rts cons > cache"); > + return -ENOMEM; > + } > + } > + > return 0; > } > .... > diff --git a/lib/ring/rte_ring_rts_elem_pvt.h > b/lib/ring/rte_ring_rts_elem_pvt.h > index 122650346b..4ce22a93ed 100644 > --- a/lib/ring/rte_ring_rts_elem_pvt.h > +++ b/lib/ring/rte_ring_rts_elem_pvt.h > @@ -46,6 +46,92 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail > *ht) > rte_memory_order_release, rte_memory_order_acquire) == > 0); > } > > +/** > + * @file rte_ring_rts_elem_pvt.h > + * It is not recommended to include this file directly, > + * include <rte_ring.h> instead. > + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode. > + * For more information please refer to <rte_ring_rts.h>. > + */ > + > +/** > + * @internal This function updates tail values. > + */ > +static __rte_always_inline void > +__rte_ring_rts_v2_update_tail(struct rte_ring_rts_headtail *ht, > + uint32_t old_tail, uint32_t num, uint32_t mask) > +{ > + union __rte_ring_rts_poscnt ot, nt; > + > + ot.val.cnt = nt.val.cnt = 0; > + ot.val.pos = old_tail; > + nt.val.pos = old_tail + num; > + > + /* > + * If the tail is equal to the current enqueues/dequeues, update > + * the tail with new value and then continue to try to update the > + * tail until the num of the cache is 0, otherwise write the num of > + * the current enqueues/dequeues to the cache. > + */ > + > + if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, > + (uint64_t *)(uintptr_t)&ot.raw, nt.raw, > + rte_memory_order_release, > rte_memory_order_acquire) == 0) { > + ot.val.pos = old_tail; > + > + /* > + * Write the num of the current enqueues/dequeues to the > + * corresponding cache. > + */ > + rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num, > + num, rte_memory_order_release); > + > + /* > + * There may be competition with another enqueues/dequeues > + * for the update tail. The winner continues to try to update > + * the tail, and the loser exits. > + */ > + if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, > + (uint64_t *)(uintptr_t)&ot.raw, nt.raw, > + rte_memory_order_release, > rte_memory_order_acquire) == 0) > + return; > + > + /* > + * Set the corresponding cache to 0 for next use. > + */ I think there is race condition between CAS above and the store below: After you updated the tail, other threads are free to re-use these ring elements. So if some thread get stalled/preempted here while other threads will continue to do enqueue/dequeue to the ring, this rts_cache[] element can be already re-used by other thread when given thread will wake-up and overwrite it with zero. In fact such race-condition can be easily reproduced on my box with: echo ring_stress_autotest | dpdk-test --lcores=(2-16)@(3-5) -n 8 --no-pci --no-huge all workers ends up in the infinite loop: (gdb) bt #0 _mm_pause () at /usr/lib64/gcc/x86_64-suse-linux/12/include/xmmintrin.h:1335 #1 rte_pause () at ../lib/eal/x86/include/rte_pause.h:18 #2 0x00000000010d008b in __rte_ring_rts_head_wait (h=0x7ff90c3ffca0, ht=0x103ed2d40) at ../lib/ring/rte_ring_rts_elem_pvt.h:148 #3 __rte_ring_rts_move_prod_head (free_entries=0x7ff90c3ffcc8, old_head=0x7ff90c3ffcc4, behavior=RTE_RING_QUEUE_FIXED, num=33, r=0x103ed2cc0) at ../lib/ring/rte_ring_rts_elem_pvt.h:177 #4 __rte_ring_do_rts_v2_enqueue_elem (free_space=0x0, behavior=RTE_RING_QUEUE_FIXED, n=33, esize=8, obj_table=0x7ff90c3ffec0, r=0x103ed2cc0) at ../lib/ring/rte_ring_rts_elem_pvt.h:336 #5 rte_ring_mp_rts_v2_enqueue_bulk_elem (free_space=0x0, n=33, esize=8, obj_table=0x7ff90c3ffec0, r=0x103ed2cc0) at ../lib/ring/rte_ring_rts.h:110 #6 rte_ring_mp_rts_v2_enqueue_bulk (free_space=0x0, n=33, obj_table=0x7ff90c3ffec0, r=0x103ed2cc0) at ../lib/ring/rte_ring_rts.h:345 #7 _st_ring_enqueue_bulk (r=0x103ed2cc0, obj=0x7ff90c3ffec0, n=33, free=0x0) at ../app/test/test_ring_rts_v2_stress.c:18 .... (gdb) print/x r->rts_prod.rts_cache[r->rts_prod.tail.val.pos & r->mask] $11 = {num = 0x0} (gdb) print r->rts_prod $13 = {tail = {raw = 127384228873633792, val = {cnt = 0, pos = 29658952}}, sync_type = RTE_RING_SYNC_MT_RTS_V2, htd_max = 2047, head = { raw = 127393072212139551, val = {cnt = 843295, pos = 29661011}}, rts_cache = 0x103ec2c40} (gdb) print 29661011-29658952 $14 = 2059 All in all - I don't think this approach is going to work. You need some extra stuff to synchronize between cache[] and tail. If interested, in SORING we solved similar thing by updating state[] before updating tail, plus making sure that only one thread at a time will update tail value: https://patchwork.dpdk.org/project/dpdk/patch/20241206183600.34758-6-konstantin.anan...@huawei.com/ Another minor thing - why do you re-use RTS head/tail struct? >From what I can read you don't .cnt part at all. > + rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num, > + 0, rte_memory_order_release); > + } > + > + /* > + * Try to update the tail until the num of the corresponding cache is 0. > + * Getting here means that the current enqueues/dequeues is trying to > update > + * the tail of another enqueues/dequeues. > + */ > + while (1) { > + num = rte_atomic_load_explicit(&ht->rts_cache[nt.val.pos & > mask].num, > + rte_memory_order_acquire); > + if (num == 0) > + break; > + > + ot.val.pos = nt.val.pos; > + nt.val.pos += num; > + > + /* > + * There may be competition with another enqueues/dequeues > + * for the update tail. The winner continues to try to update > + * the tail, and the loser exits. > + */ > + if (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, > + (uint64_t *)(uintptr_t)&ot.raw, nt.raw, > + rte_memory_order_release, > rte_memory_order_acquire) == 0) > + return; > + > + rte_atomic_store_explicit(&ht->rts_cache[ot.val.pos & mask].num, > + 0, rte_memory_order_release); > + }; > +} > +