> > > For improved performance over the current C11 based ring > > > implementation following changes were made. > > > (1) Replace tail store with RELEASE semantics in > > > __rte_ring_update_tail with a RELEASE fence. Replace load of the tail > > > with ACQUIRE semantics in __rte_ring_move_prod_head and > > > __rte_ring_move_cons_head with ACQUIRE fences. > > > (2) Remove ACQUIRE fences between load of the old_head and load of the > > > cons_tail in __rte_ring_move_prod_head and __rte_ring_move_cons_head. > > > These two fences are not required for the safety of the ring library. > > > > Hmm... with these changes, aren't we re-introducing the old bug fixed by > > this > > commit: > > Cover letter explains why this barrier does not solve what it intends to > solve and > Why it should not matter. > https://mails.dpdk.org/archives/dev/2023-June/270874.html
Ok, let's consider the case similar to yours (i), but when r->prod.head was moved for distance greater then r->capacity. To be more specific, let' start with the same initial state: capacity = 32 r->cons.tail = 5 r->cons.head = 5 r->prod.head = 10 r-prod.tail = 10 time 0, thread1: /* re-ordered load */ const_tail = r->cons.tail; //= 5 Now, thread1 was stalled for a bit, meanwhile there were few enqueue/dequeus done by other threads, so current state of the ring: r->cons.tail = 105 r->cons.head = 105 r->prod.head = 110 r-prod.tail = 110 time 1, thread1: old_head = r->prod.head; // 110 *free_entries = (capacity + cons_tail - old_head); // = (uint32_t)(32 + 5 - 110) == (uint32_t)-73 == 4294967223 So, free_entries value is way too big, and that comparison: if (unlikely(n > *free_entries)) might provide wrong result. So I still think we do need some sort of _read_fence_ between these two loads. As I said before, that looks exactly like the old bug, fixed a while ago: http://git.dpdk.org/dpdk/commit/?id=9bc2cbb007c0a3335c5582357ae9f6d37ea0b654 but now re-introduced for C11 case. > > > > commit 9bc2cbb007c0a3335c5582357ae9f6d37ea0b654 > > Author: Jia He <justin...@arm.com> > > Date: Fri Nov 10 03:30:42 2017 +0000 > > > > ring: guarantee load/load order in enqueue and dequeue > > > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server > > (Amberwing). > > > > Root cause: > > In __rte_ring_move_cons_head() > > ... > > do { > > /* Restore n as it may change every loop */ > > n = max; > > > > *old_head = r->cons.head; //1st load > > const uint32_t prod_tail = r->prod.tail; //2nd load > > > > In weak memory order architectures (powerpc,arm), the 2nd load might be > > reodered before the 1st load, that makes *entries is bigger than we > > wanted. > > This nasty reording messed enque/deque up. > > .... > > ? > > > > > > > > Signed-off-by: Wathsala Vithanage <wathsala.vithan...@arm.com> > > > Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > > > --- > > > .mailmap | 1 + > > > lib/ring/rte_ring_c11_pvt.h | 35 ++++++++++++++++++++--------------- > > > 2 files changed, 21 insertions(+), 15 deletions(-) > > > > > > diff --git a/.mailmap b/.mailmap > > > index 4018f0fc47..367115d134 100644 > > > --- a/.mailmap > > > +++ b/.mailmap > > > @@ -1430,6 +1430,7 @@ Walter Heymans > > <walter.heym...@corigine.com> > > > Wang Sheng-Hui <shh...@gmail.com> Wangyu (Eric) > > > <seven.wan...@huawei.com> Waterman Cao <waterman....@intel.com> > > > +Wathsala Vithanage <wathsala.vithan...@arm.com> > > > Weichun Chen <weichunx.c...@intel.com> Wei Dai <wei....@intel.com> > > > Weifeng Li <liweifen...@126.com> diff --git > > > a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h index > > > f895950df4..63fe58ce9e 100644 > > > --- a/lib/ring/rte_ring_c11_pvt.h > > > +++ b/lib/ring/rte_ring_c11_pvt.h > > > @@ -16,6 +16,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, > > uint32_t old_val, > > > uint32_t new_val, uint32_t single, uint32_t enqueue) { > > > RTE_SET_USED(enqueue); > > > + /* > > > + * Updating of ht->tail cannot happen before elements are added to or > > > + * removed from the ring, as it could result in data races between > > > + * producer and consumer threads. Therefore we need a release > > > + * barrier here. > > > + */ > > > + rte_atomic_thread_fence(__ATOMIC_RELEASE); > > > > > > /* > > > * If there are other enqueues/dequeues in progress that preceded > > > us, @@ -24,7 +31,7 @@ __rte_ring_update_tail(struct rte_ring_headtail > > *ht, uint32_t old_val, > > > if (!single) > > > rte_wait_until_equal_32(&ht->tail, old_val, > > __ATOMIC_RELAXED); > > > > > > - __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); > > > + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELAXED); > > > } > > > > > > /** > > > @@ -66,14 +73,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, > > unsigned int is_sp, > > > /* Reset n to the initial burst count */ > > > n = max; > > > > > > - /* Ensure the head is read before tail */ > > > - __atomic_thread_fence(__ATOMIC_ACQUIRE); > > > - > > > - /* load-acquire synchronize with store-release of ht->tail > > > - * in update_tail. > > > - */ > > > cons_tail = __atomic_load_n(&r->cons.tail, > > > - __ATOMIC_ACQUIRE); > > > + __ATOMIC_RELAXED); > > > > > > /* The subtraction is done between two unsigned 32bits value > > > * (the result is always modulo 32 bits even if we have @@ - > > 100,6 > > > +101,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int > > is_sp, > > > 0, __ATOMIC_RELAXED, > > > __ATOMIC_RELAXED); > > > } while (unlikely(success == 0)); > > > + /* > > > + * Ensure that updates to the ring doesn't rise above > > > + * load of the new_head in SP and MP cases. > > > + */ > > > + rte_atomic_thread_fence(__ATOMIC_ACQUIRE); > > > return n; > > > } > > > > > > @@ -142,14 +148,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int > > is_sc, > > > /* Restore n as it may change every loop */ > > > n = max; > > > > > > - /* Ensure the head is read before tail */ > > > - __atomic_thread_fence(__ATOMIC_ACQUIRE); > > > - > > > - /* this load-acquire synchronize with store-release of ht->tail > > > - * in update_tail. > > > - */ > > > prod_tail = __atomic_load_n(&r->prod.tail, > > > - __ATOMIC_ACQUIRE); > > > + __ATOMIC_RELAXED); > > > > > > /* The subtraction is done between two unsigned 32bits value > > > * (the result is always modulo 32 bits even if we have @@ - > > 175,6 > > > +175,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > > > 0, > > __ATOMIC_RELAXED, > > > __ATOMIC_RELAXED); > > > } while (unlikely(success == 0)); > > > + /* > > > + * Ensure that updates to the ring doesn't rise above > > > + * load of the new_head in SP and MP cases. > > > + */ > > > + rte_atomic_thread_fence(__ATOMIC_ACQUIRE); > > > return n; > > > } > > > > > > -- > > > 2.25.1 > > >