> > > For improved performance over the current C11 based ring
> > > implementation following changes were made.
> > > (1) Replace tail store with RELEASE semantics in
> > > __rte_ring_update_tail with a RELEASE fence. Replace load of the tail
> > > with ACQUIRE semantics in __rte_ring_move_prod_head and
> > > __rte_ring_move_cons_head with ACQUIRE fences.
> > > (2) Remove ACQUIRE fences between load of the old_head and load of the
> > > cons_tail in __rte_ring_move_prod_head and __rte_ring_move_cons_head.
> > > These two fences are not required for the safety of the ring library.
> >
> > Hmm... with these changes, aren't we re-introducing the old bug fixed by 
> > this
> > commit:
> 
> Cover letter explains why this barrier does not solve what it intends to 
> solve and
> Why it should not matter.
> https://mails.dpdk.org/archives/dev/2023-June/270874.html

Ok, let's consider the case similar to yours (i), but when r->prod.head was 
moved for distance greater then r->capacity. 
To be more specific, let' start with the same initial state:
capacity = 32
r->cons.tail = 5
r->cons.head = 5
r->prod.head = 10
r-prod.tail = 10

time 0,  thread1: 
    /* re-ordered load */
     const_tail = r->cons.tail; //= 5

Now, thread1 was stalled for a bit, meanwhile there were few enqueue/dequeus
done by other threads, so current state of the ring:
r->cons.tail = 105
r->cons.head = 105
r->prod.head = 110
r-prod.tail = 110

time 1,  thread1:
old_head =  r->prod.head; // 110
*free_entries = (capacity + cons_tail - old_head); // = (uint32_t)(32 + 5 - 
110) ==  (uint32_t)-73 == 4294967223 
 
 So, free_entries value is way too big, and that comparison:
 
if (unlikely(n > *free_entries))

might provide wrong result.

So I still think we do need some sort of _read_fence_ between these two loads.
As I said before, that looks exactly like the old bug, fixed a while ago:
http://git.dpdk.org/dpdk/commit/?id=9bc2cbb007c0a3335c5582357ae9f6d37ea0b654
but now re-introduced for C11 case.

> >
> > commit 9bc2cbb007c0a3335c5582357ae9f6d37ea0b654
> > Author: Jia He <justin...@arm.com>
> > Date:   Fri Nov 10 03:30:42 2017 +0000
> >
> >     ring: guarantee load/load order in enqueue and dequeue
> >
> >     We watched a rte panic of mbuf_autotest in our qualcomm arm64 server
> >     (Amberwing).
> >
> >     Root cause:
> >     In __rte_ring_move_cons_head()
> >     ...
> >             do {
> >                     /* Restore n as it may change every loop */
> >                     n = max;
> >
> >                     *old_head = r->cons.head;                //1st load
> >                     const uint32_t prod_tail = r->prod.tail; //2nd load
> >
> >     In weak memory order architectures (powerpc,arm), the 2nd load might be
> >     reodered before the 1st load, that makes *entries is bigger than we 
> > wanted.
> >     This nasty reording messed enque/deque up.
> >     ....
> > ?
> >
> > >
> > > Signed-off-by: Wathsala Vithanage <wathsala.vithan...@arm.com>
> > > Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> > > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
> > > ---
> > >  .mailmap                    |  1 +
> > >  lib/ring/rte_ring_c11_pvt.h | 35 ++++++++++++++++++++---------------
> > >  2 files changed, 21 insertions(+), 15 deletions(-)
> > >
> > > diff --git a/.mailmap b/.mailmap
> > > index 4018f0fc47..367115d134 100644
> > > --- a/.mailmap
> > > +++ b/.mailmap
> > > @@ -1430,6 +1430,7 @@ Walter Heymans
> > <walter.heym...@corigine.com>
> > > Wang Sheng-Hui <shh...@gmail.com>  Wangyu (Eric)
> > > <seven.wan...@huawei.com>  Waterman Cao <waterman....@intel.com>
> > > +Wathsala Vithanage <wathsala.vithan...@arm.com>
> > >  Weichun Chen <weichunx.c...@intel.com>  Wei Dai <wei....@intel.com>
> > > Weifeng Li <liweifen...@126.com> diff --git
> > > a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h index
> > > f895950df4..63fe58ce9e 100644
> > > --- a/lib/ring/rte_ring_c11_pvt.h
> > > +++ b/lib/ring/rte_ring_c11_pvt.h
> > > @@ -16,6 +16,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> > uint32_t old_val,
> > >           uint32_t new_val, uint32_t single, uint32_t enqueue)  {
> > >   RTE_SET_USED(enqueue);
> > > + /*
> > > +  * Updating of ht->tail cannot happen before elements are added to or
> > > +  * removed from the ring, as it could result in data races between
> > > +  * producer and consumer threads. Therefore we need a release
> > > +  * barrier here.
> > > +  */
> > > + rte_atomic_thread_fence(__ATOMIC_RELEASE);
> > >
> > >   /*
> > >    * If there are other enqueues/dequeues in progress that preceded
> > > us, @@ -24,7 +31,7 @@ __rte_ring_update_tail(struct rte_ring_headtail
> > *ht, uint32_t old_val,
> > >   if (!single)
> > >           rte_wait_until_equal_32(&ht->tail, old_val,
> > __ATOMIC_RELAXED);
> > >
> > > - __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
> > > + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELAXED);
> > >  }
> > >
> > >  /**
> > > @@ -66,14 +73,8 @@ __rte_ring_move_prod_head(struct rte_ring *r,
> > unsigned int is_sp,
> > >           /* Reset n to the initial burst count */
> > >           n = max;
> > >
> > > -         /* Ensure the head is read before tail */
> > > -         __atomic_thread_fence(__ATOMIC_ACQUIRE);
> > > -
> > > -         /* load-acquire synchronize with store-release of ht->tail
> > > -          * in update_tail.
> > > -          */
> > >           cons_tail = __atomic_load_n(&r->cons.tail,
> > > -                                 __ATOMIC_ACQUIRE);
> > > +                                 __ATOMIC_RELAXED);
> > >
> > >           /* The subtraction is done between two unsigned 32bits value
> > >            * (the result is always modulo 32 bits even if we have @@ -
> > 100,6
> > > +101,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int
> > is_sp,
> > >                                   0, __ATOMIC_RELAXED,
> > >                                   __ATOMIC_RELAXED);
> > >   } while (unlikely(success == 0));
> > > + /*
> > > +  * Ensure that updates to the ring doesn't rise above
> > > +  * load of the new_head in SP and MP cases.
> > > +  */
> > > + rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
> > >   return n;
> > >  }
> > >
> > > @@ -142,14 +148,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> > is_sc,
> > >           /* Restore n as it may change every loop */
> > >           n = max;
> > >
> > > -         /* Ensure the head is read before tail */
> > > -         __atomic_thread_fence(__ATOMIC_ACQUIRE);
> > > -
> > > -         /* this load-acquire synchronize with store-release of ht->tail
> > > -          * in update_tail.
> > > -          */
> > >           prod_tail = __atomic_load_n(&r->prod.tail,
> > > -                                 __ATOMIC_ACQUIRE);
> > > +                                 __ATOMIC_RELAXED);
> > >
> > >           /* The subtraction is done between two unsigned 32bits value
> > >            * (the result is always modulo 32 bits even if we have @@ -
> > 175,6
> > > +175,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> > >                                                   0,
> > __ATOMIC_RELAXED,
> > >                                                   __ATOMIC_RELAXED);
> > >   } while (unlikely(success == 0));
> > > + /*
> > > +  * Ensure that updates to the ring doesn't rise above
> > > +  * load of the new_head in SP and MP cases.
> > > +  */
> > > + rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
> > >   return n;
> > >  }
> > >
> > > --
> > > 2.25.1
> > >

Reply via email to