For improved performance over the current C11 based ring implementation
following changes were made.
(1) Replace tail store with RELEASE semantics in __rte_ring_update_tail
with a RELEASE fence. Replace load of the tail with ACQUIRE semantics 
in __rte_ring_move_prod_head and __rte_ring_move_cons_head with ACQUIRE
fences.
(2) Remove ACQUIRE fences between load of the old_head and load of the
cons_tail in __rte_ring_move_prod_head and __rte_ring_move_cons_head.
These two fences are not required for the safety of the ring library.

Signed-off-by: Wathsala Vithanage <wathsala.vithan...@arm.com>
Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
---
 .mailmap                    |  1 +
 lib/ring/rte_ring_c11_pvt.h | 35 ++++++++++++++++++++---------------
 2 files changed, 21 insertions(+), 15 deletions(-)

diff --git a/.mailmap b/.mailmap
index 4018f0fc47..367115d134 100644
--- a/.mailmap
+++ b/.mailmap
@@ -1430,6 +1430,7 @@ Walter Heymans <walter.heym...@corigine.com>
 Wang Sheng-Hui <shh...@gmail.com>
 Wangyu (Eric) <seven.wan...@huawei.com>
 Waterman Cao <waterman....@intel.com>
+Wathsala Vithanage <wathsala.vithan...@arm.com>
 Weichun Chen <weichunx.c...@intel.com>
 Wei Dai <wei....@intel.com>
 Weifeng Li <liweifen...@126.com>
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index f895950df4..63fe58ce9e 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -16,6 +16,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, 
uint32_t old_val,
                uint32_t new_val, uint32_t single, uint32_t enqueue)
 {
        RTE_SET_USED(enqueue);
+       /*
+        * Updating of ht->tail cannot happen before elements are added to or
+        * removed from the ring, as it could result in data races between
+        * producer and consumer threads. Therefore we need a release
+        * barrier here.
+        */
+       rte_atomic_thread_fence(__ATOMIC_RELEASE);
 
        /*
         * If there are other enqueues/dequeues in progress that preceded us,
@@ -24,7 +31,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t 
old_val,
        if (!single)
                rte_wait_until_equal_32(&ht->tail, old_val, __ATOMIC_RELAXED);
 
-       __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
+       __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELAXED);
 }
 
 /**
@@ -66,14 +73,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
is_sp,
                /* Reset n to the initial burst count */
                n = max;
 
-               /* Ensure the head is read before tail */
-               __atomic_thread_fence(__ATOMIC_ACQUIRE);
-
-               /* load-acquire synchronize with store-release of ht->tail
-                * in update_tail.
-                */
                cons_tail = __atomic_load_n(&r->cons.tail,
-                                       __ATOMIC_ACQUIRE);
+                                       __ATOMIC_RELAXED);
 
                /* The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
@@ -100,6 +101,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
is_sp,
                                        0, __ATOMIC_RELAXED,
                                        __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
+       /*
+        * Ensure that updates to the ring doesn't rise above
+        * load of the new_head in SP and MP cases.
+        */
+       rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
        return n;
 }
 
@@ -142,14 +148,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
                /* Restore n as it may change every loop */
                n = max;
 
-               /* Ensure the head is read before tail */
-               __atomic_thread_fence(__ATOMIC_ACQUIRE);
-
-               /* this load-acquire synchronize with store-release of ht->tail
-                * in update_tail.
-                */
                prod_tail = __atomic_load_n(&r->prod.tail,
-                                       __ATOMIC_ACQUIRE);
+                                       __ATOMIC_RELAXED);
 
                /* The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
@@ -175,6 +175,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
                                                        0, __ATOMIC_RELAXED,
                                                        __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
+       /*
+        * Ensure that updates to the ring doesn't rise above
+        * load of the new_head in SP and MP cases.
+        */
+       rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
        return n;
 }
 
-- 
2.25.1

Reply via email to