From: Konstantin Ananyev <konstantin.anan...@huawei.com>

Note upfront: that change doesn't introduce any functional or
performance changes.
It is just a code-reordering for:
 - code deduplication
 - ability in future to re-use the same code to introduce new functionality

For each sync mode corresponding move_prod_head() and
move_cons_head() are nearly identical to each other,
the only differences are:
 - do we need to use a @capacity to calculate number of entries or not.
 - what we need to update (prod/cons) and what is used as
   read-only counterpart.
So instead of having 2 copies of nearly identical functions,
introduce a new common one that could be used by both functions:
move_prod_head() and move_cons_head().

As another positive thing - we can get rid of referencing whole rte_ring
structure in that new common sub-function.

Signed-off-by: Konstantin Ananyev <konstantin.anan...@huawei.com>
---
 lib/ring/rte_ring_c11_pvt.h      | 134 +++++--------------------------
 lib/ring/rte_ring_elem_pvt.h     |  66 +++++++++++++++
 lib/ring/rte_ring_generic_pvt.h  | 121 ++++------------------------
 lib/ring/rte_ring_hts_elem_pvt.h |  85 ++++++--------------
 lib/ring/rte_ring_rts_elem_pvt.h |  85 ++++++--------------
 5 files changed, 149 insertions(+), 342 deletions(-)

diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 629b2d9288..048933ddc6 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -28,41 +28,19 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, 
uint32_t old_val,
        rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
 }
 
-/**
- * @internal This function updates the producer head for enqueue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sp
- *   Indicates whether multi-producer path is needed or not
- * @param n
- *   The number of elements we will want to enqueue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where enqueue starts
- * @param new_head
- *   Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- *   Returns the amount of free space in the ring BEFORE head was moved
- * @return
- *   Actual number of objects enqueued.
- *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
 static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
-               unsigned int n, enum rte_ring_queue_behavior behavior,
-               uint32_t *old_head, uint32_t *new_head,
-               uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+               const struct rte_ring_headtail *s, uint32_t capacity,
+               unsigned int is_st, unsigned int n,
+               enum rte_ring_queue_behavior behavior,
+               uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
 {
-       const uint32_t capacity = r->capacity;
-       uint32_t cons_tail;
-       unsigned int max = n;
+       uint32_t stail;
        int success;
+       unsigned int max = n;
 
-       *old_head = rte_atomic_load_explicit(&r->prod.head, 
rte_memory_order_relaxed);
+       *old_head = rte_atomic_load_explicit(&d->head,
+                       rte_memory_order_relaxed);
        do {
                /* Reset n to the initial burst count */
                n = max;
@@ -73,112 +51,36 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
is_sp,
                /* load-acquire synchronize with store-release of ht->tail
                 * in update_tail.
                 */
-               cons_tail = rte_atomic_load_explicit(&r->cons.tail,
+               stail = rte_atomic_load_explicit(&s->tail,
                                        rte_memory_order_acquire);
 
                /* The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
-                * *old_head > cons_tail). So 'free_entries' is always between 0
+                * *old_head > s->tail). So 'free_entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *free_entries = (capacity + cons_tail - *old_head);
+               *entries = (capacity + stail - *old_head);
 
                /* check that we have enough room in ring */
-               if (unlikely(n > *free_entries))
+               if (unlikely(n > *entries))
                        n = (behavior == RTE_RING_QUEUE_FIXED) ?
-                                       0 : *free_entries;
+                                       0 : *entries;
 
                if (n == 0)
                        return 0;
 
                *new_head = *old_head + n;
-               if (is_sp) {
-                       r->prod.head = *new_head;
+               if (is_st) {
+                       d->head = *new_head;
                        success = 1;
                } else
                        /* on failure, *old_head is updated */
-                       success = 
rte_atomic_compare_exchange_strong_explicit(&r->prod.head,
-                                       old_head, *new_head,
+                       success = rte_atomic_compare_exchange_strong_explicit(
+                                       &d->head, old_head, *new_head,
                                        rte_memory_order_relaxed,
                                        rte_memory_order_relaxed);
        } while (unlikely(success == 0));
        return n;
 }
 
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sc
- *   Indicates whether multi-consumer path is needed or not
- * @param n
- *   The number of elements we will want to dequeue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- *   Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- *   Returns the number of entries in the ring BEFORE head was moved
- * @return
- *   - Actual number of objects dequeued.
- *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
-               unsigned int n, enum rte_ring_queue_behavior behavior,
-               uint32_t *old_head, uint32_t *new_head,
-               uint32_t *entries)
-{
-       unsigned int max = n;
-       uint32_t prod_tail;
-       int success;
-
-       /* move cons.head atomically */
-       *old_head = rte_atomic_load_explicit(&r->cons.head, 
rte_memory_order_relaxed);
-       do {
-               /* Restore n as it may change every loop */
-               n = max;
-
-               /* Ensure the head is read before tail */
-               rte_atomic_thread_fence(rte_memory_order_acquire);
-
-               /* this load-acquire synchronize with store-release of ht->tail
-                * in update_tail.
-                */
-               prod_tail = rte_atomic_load_explicit(&r->prod.tail,
-                                       rte_memory_order_acquire);
-
-               /* The subtraction is done between two unsigned 32bits value
-                * (the result is always modulo 32 bits even if we have
-                * cons_head > prod_tail). So 'entries' is always between 0
-                * and size(ring)-1.
-                */
-               *entries = (prod_tail - *old_head);
-
-               /* Set the actual entries for dequeue */
-               if (n > *entries)
-                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-               if (unlikely(n == 0))
-                       return 0;
-
-               *new_head = *old_head + n;
-               if (is_sc) {
-                       r->cons.head = *new_head;
-                       success = 1;
-               } else
-                       /* on failure, *old_head will be updated */
-                       success = 
rte_atomic_compare_exchange_strong_explicit(&r->cons.head,
-                                                       old_head, *new_head,
-                                                       
rte_memory_order_relaxed,
-                                                       
rte_memory_order_relaxed);
-       } while (unlikely(success == 0));
-       return n;
-}
-
 #endif /* _RTE_RING_C11_PVT_H_ */
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 4b80f58980..3a83668a08 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -293,6 +293,72 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t 
cons_head,
 #include "rte_ring_generic_pvt.h"
 #endif
 
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
+               unsigned int n, enum rte_ring_queue_behavior behavior,
+               uint32_t *old_head, uint32_t *new_head,
+               uint32_t *free_entries)
+{
+       return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
+                       is_sp, n, behavior, old_head, new_head, free_entries);
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to dequeue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
+               unsigned int n, enum rte_ring_queue_behavior behavior,
+               uint32_t *old_head, uint32_t *new_head,
+               uint32_t *entries)
+{
+       return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
+                       is_sc, n, behavior, old_head, new_head, entries);
+}
+
 /**
  * @internal Enqueue several objects on the ring
  *
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index 457f41dab3..12f3595926 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -29,36 +29,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, 
uint32_t old_val,
        ht->tail = new_val;
 }
 
-/**
- * @internal This function updates the producer head for enqueue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sp
- *   Indicates whether multi-producer path is needed or not
- * @param n
- *   The number of elements we will want to enqueue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where enqueue starts
- * @param new_head
- *   Returns the current/new head value i.e. where enqueue finishes
- * @param free_entries
- *   Returns the amount of free space in the ring BEFORE head was moved
- * @return
- *   Actual number of objects enqueued.
- *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
 static __rte_always_inline unsigned int
-__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
-               unsigned int n, enum rte_ring_queue_behavior behavior,
-               uint32_t *old_head, uint32_t *new_head,
-               uint32_t *free_entries)
+__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+               const struct rte_ring_headtail *s, uint32_t capacity,
+               unsigned int is_st, unsigned int n,
+               enum rte_ring_queue_behavior behavior,
+               uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
 {
-       const uint32_t capacity = r->capacity;
        unsigned int max = n;
        int success;
 
@@ -66,7 +43,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
is_sp,
                /* Reset n to the initial burst count */
                n = max;
 
-               *old_head = r->prod.head;
+               *old_head = d->head;
 
                /* add rmb barrier to avoid load/load reorder in weak
                 * memory model. It is noop on x86
@@ -76,97 +53,27 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
is_sp,
                /*
                 *  The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
-                * *old_head > cons_tail). So 'free_entries' is always between 0
+                * *old_head > s->tail). So 'free_entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *free_entries = (capacity + r->cons.tail - *old_head);
+               *entries = (capacity + s->tail - *old_head);
 
                /* check that we have enough room in ring */
-               if (unlikely(n > *free_entries))
+               if (unlikely(n > *entries))
                        n = (behavior == RTE_RING_QUEUE_FIXED) ?
-                                       0 : *free_entries;
+                                       0 : *entries;
 
                if (n == 0)
                        return 0;
 
                *new_head = *old_head + n;
-               if (is_sp) {
-                       r->prod.head = *new_head;
+               if (is_st) {
+                       d->head = *new_head;
                        success = 1;
                } else
-                       success = rte_atomic32_cmpset((uint32_t 
*)(uintptr_t)&r->prod.head,
-                                       *old_head, *new_head);
-       } while (unlikely(success == 0));
-       return n;
-}
-
-/**
- * @internal This function updates the consumer head for dequeue
- *
- * @param r
- *   A pointer to the ring structure
- * @param is_sc
- *   Indicates whether multi-consumer path is needed or not
- * @param n
- *   The number of elements we will want to dequeue, i.e. how far should the
- *   head be moved
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
- * @param old_head
- *   Returns head value as it was before the move, i.e. where dequeue starts
- * @param new_head
- *   Returns the current/new head value i.e. where dequeue finishes
- * @param entries
- *   Returns the number of entries in the ring BEFORE head was moved
- * @return
- *   - Actual number of objects dequeued.
- *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
- */
-static __rte_always_inline unsigned int
-__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
-               unsigned int n, enum rte_ring_queue_behavior behavior,
-               uint32_t *old_head, uint32_t *new_head,
-               uint32_t *entries)
-{
-       unsigned int max = n;
-       int success;
-
-       /* move cons.head atomically */
-       do {
-               /* Restore n as it may change every loop */
-               n = max;
-
-               *old_head = r->cons.head;
-
-               /* add rmb barrier to avoid load/load reorder in weak
-                * memory model. It is noop on x86
-                */
-               rte_smp_rmb();
-
-               /* The subtraction is done between two unsigned 32bits value
-                * (the result is always modulo 32 bits even if we have
-                * cons_head > prod_tail). So 'entries' is always between 0
-                * and size(ring)-1.
-                */
-               *entries = (r->prod.tail - *old_head);
-
-               /* Set the actual entries for dequeue */
-               if (n > *entries)
-                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-               if (unlikely(n == 0))
-                       return 0;
-
-               *new_head = *old_head + n;
-               if (is_sc) {
-                       r->cons.head = *new_head;
-                       rte_smp_rmb();
-                       success = 1;
-               } else {
-                       success = rte_atomic32_cmpset((uint32_t 
*)(uintptr_t)&r->cons.head,
+                       success = rte_atomic32_cmpset(
+                                       (uint32_t *)(uintptr_t)&d->head,
                                        *old_head, *new_head);
-               }
        } while (unlikely(success == 0));
        return n;
 }
diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index 91f5eeccb9..ed5f16879f 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -50,20 +50,16 @@ __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail 
*ht,
        }
 }
 
-/**
- * @internal This function updates the producer head for enqueue
- */
-static __rte_always_inline unsigned int
-__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+static __rte_always_inline uint32_t
+__rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
+       const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num,
        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
-       uint32_t *free_entries)
+       uint32_t *entries)
 {
        uint32_t n;
        union __rte_ring_hts_pos np, op;
 
-       const uint32_t capacity = r->capacity;
-
-       op.raw = rte_atomic_load_explicit(&r->hts_prod.ht.raw, 
rte_memory_order_acquire);
+       op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire);
 
        do {
                /* Reset n to the initial burst count */
@@ -74,7 +70,7 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned 
int num,
                 * make sure that we read prod head/tail *before*
                 * reading cons tail.
                 */
-               __rte_ring_hts_head_wait(&r->hts_prod, &op);
+               __rte_ring_hts_head_wait(d, &op);
 
                /*
                 *  The subtraction is done between two unsigned 32bits value
@@ -82,12 +78,12 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned 
int num,
                 * *old_head > cons_tail). So 'free_entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *free_entries = capacity + r->cons.tail - op.pos.head;
+               *entries = capacity + s->tail - op.pos.head;
 
                /* check that we have enough room in ring */
-               if (unlikely(n > *free_entries))
+               if (unlikely(n > *entries))
                        n = (behavior == RTE_RING_QUEUE_FIXED) ?
-                                       0 : *free_entries;
+                                       0 : *entries;
 
                if (n == 0)
                        break;
@@ -100,13 +96,25 @@ __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned 
int num,
         *  - OOO reads of cons tail value
         *  - OOO copy of elems from the ring
         */
-       } while 
(rte_atomic_compare_exchange_strong_explicit(&r->hts_prod.ht.raw,
+       } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw,
                        (uint64_t *)(uintptr_t)&op.raw, np.raw,
-                       rte_memory_order_acquire, rte_memory_order_acquire) == 
0);
+                       rte_memory_order_acquire,
+                       rte_memory_order_acquire) == 0);
 
        *old_head = op.pos.head;
        return n;
 }
+/**
+ * @internal This function updates the producer head for enqueue
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+       enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+       uint32_t *free_entries)
+{
+       return __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
+                       r->capacity, num, behavior, old_head, free_entries);
+}
 
 /**
  * @internal This function updates the consumer head for dequeue
@@ -116,51 +124,8 @@ __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned 
int num,
        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
        uint32_t *entries)
 {
-       uint32_t n;
-       union __rte_ring_hts_pos np, op;
-
-       op.raw = rte_atomic_load_explicit(&r->hts_cons.ht.raw, 
rte_memory_order_acquire);
-
-       /* move cons.head atomically */
-       do {
-               /* Restore n as it may change every loop */
-               n = num;
-
-               /*
-                * wait for tail to be equal to head,
-                * make sure that we read cons head/tail *before*
-                * reading prod tail.
-                */
-               __rte_ring_hts_head_wait(&r->hts_cons, &op);
-
-               /* The subtraction is done between two unsigned 32bits value
-                * (the result is always modulo 32 bits even if we have
-                * cons_head > prod_tail). So 'entries' is always between 0
-                * and size(ring)-1.
-                */
-               *entries = r->prod.tail - op.pos.head;
-
-               /* Set the actual entries for dequeue */
-               if (n > *entries)
-                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-               if (unlikely(n == 0))
-                       break;
-
-               np.pos.tail = op.pos.tail;
-               np.pos.head = op.pos.head + n;
-
-       /*
-        * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-        *  - OOO reads of prod tail value
-        *  - OOO copy of elems from the ring
-        */
-       } while 
(rte_atomic_compare_exchange_strong_explicit(&r->hts_cons.ht.raw,
-                       (uint64_t *)(uintptr_t)&op.raw, np.raw,
-                       rte_memory_order_acquire, rte_memory_order_acquire) == 
0);
-
-       *old_head = op.pos.head;
-       return n;
+       return __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
+                       0, num, behavior, old_head, entries);
 }
 
 /**
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 122650346b..027409a3fa 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -64,20 +64,17 @@ __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail 
*ht,
        }
 }
 
-/**
- * @internal This function updates the producer head for enqueue.
- */
 static __rte_always_inline uint32_t
-__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+__rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
+       const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
-       uint32_t *free_entries)
+       uint32_t *entries)
 {
        uint32_t n;
        union __rte_ring_rts_poscnt nh, oh;
 
-       const uint32_t capacity = r->capacity;
-
-       oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, 
rte_memory_order_acquire);
+       oh.raw = rte_atomic_load_explicit(&d->head.raw,
+                       rte_memory_order_acquire);
 
        do {
                /* Reset n to the initial burst count */
@@ -88,7 +85,7 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t 
num,
                 * make sure that we read prod head *before*
                 * reading cons tail.
                 */
-               __rte_ring_rts_head_wait(&r->rts_prod, &oh);
+               __rte_ring_rts_head_wait(d, &oh);
 
                /*
                 *  The subtraction is done between two unsigned 32bits value
@@ -96,12 +93,12 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t 
num,
                 * *old_head > cons_tail). So 'free_entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *free_entries = capacity + r->cons.tail - oh.val.pos;
+               *entries = capacity + s->tail - oh.val.pos;
 
                /* check that we have enough room in ring */
-               if (unlikely(n > *free_entries))
+               if (unlikely(n > *entries))
                        n = (behavior == RTE_RING_QUEUE_FIXED) ?
-                                       0 : *free_entries;
+                                       0 : *entries;
 
                if (n == 0)
                        break;
@@ -114,14 +111,27 @@ __rte_ring_rts_move_prod_head(struct rte_ring *r, 
uint32_t num,
         *  - OOO reads of cons tail value
         *  - OOO copy of elems to the ring
         */
-       } while 
(rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
+       } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
                        (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
-                       rte_memory_order_acquire, rte_memory_order_acquire) == 
0);
+                       rte_memory_order_acquire,
+                       rte_memory_order_acquire) == 0);
 
        *old_head = oh.val.pos;
        return n;
 }
 
+/**
+ * @internal This function updates the producer head for enqueue.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+       enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+       uint32_t *free_entries)
+{
+       return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
+                       r->capacity, num, behavior, old_head, free_entries);
+}
+
 /**
  * @internal This function updates the consumer head for dequeue
  */
@@ -130,51 +140,8 @@ __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t 
num,
        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
        uint32_t *entries)
 {
-       uint32_t n;
-       union __rte_ring_rts_poscnt nh, oh;
-
-       oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, 
rte_memory_order_acquire);
-
-       /* move cons.head atomically */
-       do {
-               /* Restore n as it may change every loop */
-               n = num;
-
-               /*
-                * wait for cons head/tail distance,
-                * make sure that we read cons head *before*
-                * reading prod tail.
-                */
-               __rte_ring_rts_head_wait(&r->rts_cons, &oh);
-
-               /* The subtraction is done between two unsigned 32bits value
-                * (the result is always modulo 32 bits even if we have
-                * cons_head > prod_tail). So 'entries' is always between 0
-                * and size(ring)-1.
-                */
-               *entries = r->prod.tail - oh.val.pos;
-
-               /* Set the actual entries for dequeue */
-               if (n > *entries)
-                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
-               if (unlikely(n == 0))
-                       break;
-
-               nh.val.pos = oh.val.pos + n;
-               nh.val.cnt = oh.val.cnt + 1;
-
-       /*
-        * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-        *  - OOO reads of prod tail value
-        *  - OOO copy of elems from the ring
-        */
-       } while 
(rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
-                       (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
-                       rte_memory_order_acquire, rte_memory_order_acquire) == 
0);
-
-       *old_head = oh.val.pos;
-       return n;
+       return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
+                       0, num, behavior, old_head, entries);
 }
 
 /**
-- 
2.35.3

Reply via email to