> For improved performance over the current C11 based ring implementation
> following changes were made.
> (1) Replace tail store with RELEASE semantics in __rte_ring_update_tail
> with a RELEASE fence. Replace load of the tail with ACQUIRE semantics
> in __rte_ring_move_prod_head and __rte_ring_move_cons_head with ACQUIRE
> fences.
> (2) Remove ACQUIRE fences between load of the old_head and load of the
> cons_tail in __rte_ring_move_prod_head and __rte_ring_move_cons_head.
> These two fences are not required for the safety of the ring library.

Hmm... with these changes, aren't we re-introducing the old bug fixed by
this commit:

commit 9bc2cbb007c0a3335c5582357ae9f6d37ea0b654
Author: Jia He <justin...@arm.com>
Date:   Fri Nov 10 03:30:42 2017 +0000

    ring: guarantee load/load order in enqueue and dequeue

    We watched a rte panic of mbuf_autotest in our qualcomm arm64 server
    (Amberwing).

    Root cause:
    In __rte_ring_move_cons_head()
    ...
            do {
                    /* Restore n as it may change every loop */
                    n = max;

                    *old_head = r->cons.head;                //1st load
                    const uint32_t prod_tail = r->prod.tail; //2nd load

    In weak memory order architectures (powerpc,arm), the 2nd load might be
    reodered before the 1st load, that makes *entries is bigger than we wanted.
    This nasty reording messed enque/deque up. 
    ....
?

> 
> Signed-off-by: Wathsala Vithanage <wathsala.vithan...@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
> ---
>  .mailmap                    |  1 +
>  lib/ring/rte_ring_c11_pvt.h | 35 ++++++++++++++++++++---------------
>  2 files changed, 21 insertions(+), 15 deletions(-)
> 
> diff --git a/.mailmap b/.mailmap
> index 4018f0fc47..367115d134 100644
> --- a/.mailmap
> +++ b/.mailmap
> @@ -1430,6 +1430,7 @@ Walter Heymans <walter.heym...@corigine.com>
>  Wang Sheng-Hui <shh...@gmail.com>
>  Wangyu (Eric) <seven.wan...@huawei.com>
>  Waterman Cao <waterman....@intel.com>
> +Wathsala Vithanage <wathsala.vithan...@arm.com>
>  Weichun Chen <weichunx.c...@intel.com>
>  Wei Dai <wei....@intel.com>
>  Weifeng Li <liweifen...@126.com>
> diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
> index f895950df4..63fe58ce9e 100644
> --- a/lib/ring/rte_ring_c11_pvt.h
> +++ b/lib/ring/rte_ring_c11_pvt.h
> @@ -16,6 +16,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, 
> uint32_t old_val,
>               uint32_t new_val, uint32_t single, uint32_t enqueue)
>  {
>       RTE_SET_USED(enqueue);
> +     /*
> +      * Updating of ht->tail cannot happen before elements are added to or
> +      * removed from the ring, as it could result in data races between
> +      * producer and consumer threads. Therefore we need a release
> +      * barrier here.
> +      */
> +     rte_atomic_thread_fence(__ATOMIC_RELEASE);
> 
>       /*
>        * If there are other enqueues/dequeues in progress that preceded us,
> @@ -24,7 +31,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, 
> uint32_t old_val,
>       if (!single)
>               rte_wait_until_equal_32(&ht->tail, old_val, __ATOMIC_RELAXED);
> 
> -     __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
> +     __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELAXED);
>  }
> 
>  /**
> @@ -66,14 +73,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
> is_sp,
>               /* Reset n to the initial burst count */
>               n = max;
> 
> -             /* Ensure the head is read before tail */
> -             __atomic_thread_fence(__ATOMIC_ACQUIRE);
> -
> -             /* load-acquire synchronize with store-release of ht->tail
> -              * in update_tail.
> -              */
>               cons_tail = __atomic_load_n(&r->cons.tail,
> -                                     __ATOMIC_ACQUIRE);
> +                                     __ATOMIC_RELAXED);
> 
>               /* The subtraction is done between two unsigned 32bits value
>                * (the result is always modulo 32 bits even if we have
> @@ -100,6 +101,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned 
> int is_sp,
>                                       0, __ATOMIC_RELAXED,
>                                       __ATOMIC_RELAXED);
>       } while (unlikely(success == 0));
> +     /*
> +      * Ensure that updates to the ring doesn't rise above
> +      * load of the new_head in SP and MP cases.
> +      */
> +     rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
>       return n;
>  }
> 
> @@ -142,14 +148,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>               /* Restore n as it may change every loop */
>               n = max;
> 
> -             /* Ensure the head is read before tail */
> -             __atomic_thread_fence(__ATOMIC_ACQUIRE);
> -
> -             /* this load-acquire synchronize with store-release of ht->tail
> -              * in update_tail.
> -              */
>               prod_tail = __atomic_load_n(&r->prod.tail,
> -                                     __ATOMIC_ACQUIRE);
> +                                     __ATOMIC_RELAXED);
> 
>               /* The subtraction is done between two unsigned 32bits value
>                * (the result is always modulo 32 bits even if we have
> @@ -175,6 +175,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>                                                       0, __ATOMIC_RELAXED,
>                                                       __ATOMIC_RELAXED);
>       } while (unlikely(success == 0));
> +     /*
> +      * Ensure that updates to the ring doesn't rise above
> +      * load of the new_head in SP and MP cases.
> +      */
> +     rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
>       return n;
>  }
> 
> --
> 2.25.1
> 

Reply via email to