Replaces use of the deprecated rte_atomic32 code with GCC builtin atomic operations on x86. The C11 version used on other architectures is unchanged.
Although it would be preferable to use C11 on all architectures, there is a performance loss if we do it that way. On x86 i9-13900H, two physical cores MP/MC (cycles/elem), ring_perf test with GCC 14.2: n asm sync c11 8 72.86 72.12 89.01 32 18.74 18.80 24.62 64 10.07 9.86 12.41 128 6.99 6.74 9.01 256 6.38 6.20 7.34 Pure C11 regresses 15-30% due to __atomic_compare_exchange_n's failure-writeback semantic. Drop the now-unused enqueue argument. Signed-off-by: Stephen Hemminger <[email protected]> Acked-by: Konstantin Ananyev <[email protected]> --- lib/ring/meson.build | 2 +- lib/ring/rte_ring_c11_pvt.h | 25 ---- lib/ring/rte_ring_elem_pvt.h | 37 +++-- ..._ring_generic_pvt.h => rte_ring_gcc_pvt.h} | 141 ++++++++---------- lib/ring/rte_ring_hts_elem_pvt.h | 8 +- lib/ring/soring.c | 10 +- 6 files changed, 99 insertions(+), 124 deletions(-) rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_gcc_pvt.h} (81%) diff --git a/lib/ring/meson.build b/lib/ring/meson.build index 21f2c12989..2ba160b178 100644 --- a/lib/ring/meson.build +++ b/lib/ring/meson.build @@ -9,7 +9,7 @@ indirect_headers += files ( 'rte_ring_elem.h', 'rte_ring_elem_pvt.h', 'rte_ring_c11_pvt.h', - 'rte_ring_generic_pvt.h', + 'rte_ring_gcc_pvt.h', 'rte_ring_hts.h', 'rte_ring_hts_elem_pvt.h', 'rte_ring_peek.h', diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h index 5afc14dec9..a6c14921d3 100644 --- a/lib/ring/rte_ring_c11_pvt.h +++ b/lib/ring/rte_ring_c11_pvt.h @@ -19,31 +19,6 @@ * For more information please refer to <rte_ring.h>. */ -/** - * @internal This function updates tail values. - */ -static __rte_always_inline void -__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, - uint32_t new_val, uint32_t single, uint32_t enqueue) -{ - RTE_SET_USED(enqueue); - - /* - * If there are other enqueues/dequeues in progress that preceded us, - * we need to wait for them to complete - */ - if (!single) - rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val, - rte_memory_order_relaxed); - - /* - * R0: Establishes a synchronizing edge with load-acquire of tail at A1. - * Ensures that memory effects by this thread on ring elements array - * is observed by a different thread of the other type. - */ - rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release); -} - /** * @internal This is a helper function that moves the producer/consumer head * optimized for single threaded case diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index a0fdec9812..29758d0bb8 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -299,17 +299,36 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, cons_head & r->mask, esize, num); } -/* Between load and load. there might be cpu reorder in weak model - * (powerpc/arm). - * There are 2 choices for the users - * 1.use rmb() memory barrier - * 2.use one-direction load_acquire/store_release barrier - * It depends on performance test results. +static __rte_always_inline void +__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, + uint32_t new_val, uint32_t single) +{ + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val, + rte_memory_order_relaxed); + + /* + * R0: Establishes a synchronizing edge with load-acquire of tail at A1. + * Ensures that memory effects by this thread on ring elements array + * is observed by a different thread of the other type. + */ + rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release); +} + +/* + * The function __rte_ring_headtail_move_head_mt,st has two versions + * based on what is most efficient on a given architecture. + * + * The C11 is preferred but on x86 GCC has 10% performance drop. */ #ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_pvt.h" #else -#include "rte_ring_generic_pvt.h" +#include "rte_ring_gcc_pvt.h" #endif /** @@ -426,7 +445,7 @@ __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table, __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n); - __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1); + __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp); end: if (free_space != NULL) *free_space = free_entries - n; @@ -473,7 +492,7 @@ __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n); - __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc); end: if (available != NULL) diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_gcc_pvt.h similarity index 81% rename from lib/ring/rte_ring_generic_pvt.h rename to lib/ring/rte_ring_gcc_pvt.h index c044b0824f..340ece28c7 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_gcc_pvt.h @@ -7,42 +7,21 @@ * Used as BSD-3 Licensed with permission from Kip Macy. */ -#ifndef _RTE_RING_GENERIC_PVT_H_ -#define _RTE_RING_GENERIC_PVT_H_ +#ifndef _RTE_RING_GCC_PVT_H_ +#define _RTE_RING_GCC_PVT_H_ /** - * @file rte_ring_generic_pvt.h + * @file rte_ring_gcc_pvt.h * It is not recommended to include this file directly, * include <rte_ring.h> instead. * Contains internal helper functions for MP/SP and MC/SC ring modes. * For more information please refer to <rte_ring.h>. */ -/** - * @internal This function updates tail values. - */ -static __rte_always_inline void -__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, - uint32_t new_val, uint32_t single, uint32_t enqueue) -{ - if (enqueue) - rte_smp_wmb(); - else - rte_smp_rmb(); - /* - * If there are other enqueues/dequeues in progress that preceded us, - * we need to wait for them to complete - */ - if (!single) - rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val, - rte_memory_order_relaxed); - - ht->tail = new_val; -} /** * @internal This is a helper function that moves the producer/consumer head - * for use in multi-thread safe path + * optimized for single threaded case * * @param d * A pointer to the headtail structure with head value to be moved @@ -67,52 +46,43 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, +__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d, const struct rte_ring_headtail *s, uint32_t capacity, - unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int n, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - unsigned int max = n; - int success; - - do { - /* Reset n to the initial burst count */ - n = max; - *old_head = d->head; + *old_head = d->head; - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 - */ - rte_smp_rmb(); + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); - /* - * The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * *old_head > s->tail). So 'entries' is always between 0 - * and capacity (which is < size). - */ - *entries = (capacity + s->tail - *old_head); + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > s->tail). So 'entries' is always between 0 + * and capacity (which is < size). + */ + *entries = capacity + s->tail - *old_head; - /* check that we have enough room in ring */ - if (unlikely(n > *entries)) - n = (behavior == RTE_RING_QUEUE_FIXED) ? - 0 : *entries; + /* check that we have enough room in ring */ + if (unlikely(n > *entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; - if (n == 0) - return 0; + if (n == 0) + return 0; - *new_head = *old_head + n; - success = rte_atomic32_cmpset( - (uint32_t *)(uintptr_t)&d->head, - *old_head, *new_head); - } while (unlikely(success == 0)); + *new_head = *old_head + n; + d->head = *new_head; return n; } /** * @internal This is a helper function that moves the producer/consumer head - * optimized for single threaded case + * for use in multi-thread safe path * * @param d * A pointer to the headtail structure with head value to be moved @@ -137,36 +107,49 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d, +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, const struct rte_ring_headtail *s, uint32_t capacity, - unsigned int n, - enum rte_ring_queue_behavior behavior, + unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - *old_head = d->head; + unsigned int max = n; + bool success; - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 - */ - rte_smp_rmb(); + do { + /* Reset n to the initial burst count */ + n = max; - /* - * The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * *old_head > s->tail). So 'entries' is always between 0 - * and capacity (which is < size). - */ - *entries = (capacity + s->tail - *old_head); + *old_head = d->head; - /* check that we have enough room in ring */ - if (unlikely(n > *entries)) - n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + /* add fence to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > s->tail). So 'entries' is always between 0 + * and capacity (which is < size). + */ + *entries = (capacity + s->tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *entries; + + if (n == 0) + return 0; - if (likely(n > 0)) { *new_head = *old_head + n; - d->head = *new_head; - } + + success = __sync_bool_compare_and_swap( + (uint32_t *)(uintptr_t)&d->head, + *old_head, *new_head); + } while (unlikely(!success)); + return n; } -#endif /* _RTE_RING_GENERIC_PVT_H_ */ +#endif /* _RTE_RING_GCC_PVT_H_ */ diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h index a01089d15d..97ae240e2e 100644 --- a/lib/ring/rte_ring_hts_elem_pvt.h +++ b/lib/ring/rte_ring_hts_elem_pvt.h @@ -25,12 +25,10 @@ */ static __rte_always_inline void __rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, - uint32_t num, uint32_t enqueue) + uint32_t num) { uint32_t tail; - RTE_SET_USED(enqueue); - tail = old_tail + num; /* @@ -217,7 +215,7 @@ __rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table, if (n != 0) { __rte_ring_enqueue_elems(r, head, obj_table, esize, n); - __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1); + __rte_ring_hts_update_tail(&r->hts_prod, head, n); } if (free_space != NULL) @@ -258,7 +256,7 @@ __rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table, if (n != 0) { __rte_ring_dequeue_elems(r, head, obj_table, esize, n); - __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0); + __rte_ring_hts_update_tail(&r->hts_cons, head, n); } if (available != NULL) diff --git a/lib/ring/soring.c b/lib/ring/soring.c index 22f9c60e9c..45292c0f78 100644 --- a/lib/ring/soring.c +++ b/lib/ring/soring.c @@ -202,21 +202,21 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num, static __rte_always_inline void __rte_soring_update_tail(struct __rte_ring_headtail *rht, - enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq) + enum rte_ring_sync_type st, uint32_t head, uint32_t next) { uint32_t n; switch (st) { case RTE_RING_SYNC_ST: case RTE_RING_SYNC_MT: - __rte_ring_update_tail(&rht->ht, head, next, st, enq); + __rte_ring_update_tail(&rht->ht, head, next, st); break; case RTE_RING_SYNC_MT_RTS: __rte_ring_rts_update_tail(&rht->rts); break; case RTE_RING_SYNC_MT_HTS: n = next - head; - __rte_ring_hts_update_tail(&rht->hts, head, n, enq); + __rte_ring_hts_update_tail(&rht->hts, head, n); break; default: /* unsupported mode, shouldn't be here */ @@ -295,7 +295,7 @@ soring_enqueue(struct rte_soring *r, const void *objs, &prod_head, &prod_next, &nb_free); if (n != 0) { __enqueue_elems(r, objs, meta, prod_head, n); - __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1); + __rte_soring_update_tail(&r->prod, st, prod_head, prod_next); } if (free_space != NULL) @@ -401,7 +401,7 @@ soring_dequeue(struct rte_soring *r, void *objs, void *meta, /* we have some elems to consume */ if (n != 0) { __dequeue_elems(r, objs, meta, cons_head, n); - __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0); + __rte_soring_update_tail(&r->cons, st, cons_head, cons_next); } if (available != NULL) -- 2.53.0

