reduce code duplicated between rte_ring.h and rte_typed_ring.h by having
the enqueue and dequeue code reused by rte_ring from rte_typed_ring.
Signed-off-by: Bruce Richardson <bruce.richard...@intel.com>
---
 lib/librte_ring/rte_ring.h | 380 +--------------------------------------------
 1 file changed, 8 insertions(+), 372 deletions(-)

diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 4e74efd..25c4849 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -231,366 +231,10 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned 
count);
  */
 void rte_ring_dump(FILE *f, const struct rte_ring *r);
 
-/**
- * @internal Enqueue several objects on the ring (multi-producers safe).
- *
- * This function uses a "compare and set" instruction to move the
- * producer index atomically.
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects).
- * @param n
- *   The number of objects to add in the ring from the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-                        unsigned n, enum rte_ring_queue_behavior behavior)
-{
-       uint32_t prod_head, prod_next;
-       uint32_t cons_tail, free_entries;
-       const unsigned max = n;
-       int success;
-       unsigned i, rep = 0;
-       uint32_t mask = r->prod.mask;
-       int ret;
-
-       /* Avoid the unnecessary cmpset operation below, which is also
-        * potentially harmful when n equals 0. */
-       if (n == 0)
-               return 0;
-
-       /* move prod.head atomically */
-       do {
-               /* Reset n to the initial burst count */
-               n = max;
-
-               prod_head = r->prod.head;
-               cons_tail = r->cons.tail;
-               /* The subtraction is done between two unsigned 32bits value
-                * (the result is always modulo 32 bits even if we have
-                * prod_head > cons_tail). So 'free_entries' is always between 0
-                * and size(ring)-1. */
-               free_entries = (mask + cons_tail - prod_head);
-
-               /* check that we have enough room in ring */
-               if (unlikely(n > free_entries)) {
-                       if (behavior == RTE_RING_QUEUE_FIXED) {
-                               __RING_STAT_ADD(r, enq_fail, n);
-                               return -ENOBUFS;
-                       }
-                       else {
-                               /* No free entry available */
-                               if (unlikely(free_entries == 0)) {
-                                       __RING_STAT_ADD(r, enq_fail, n);
-                                       return 0;
-                               }
-
-                               n = free_entries;
-                       }
-               }
-
-               prod_next = prod_head + n;
-               success = rte_atomic32_cmpset(&r->prod.head, prod_head,
-                                             prod_next);
-       } while (unlikely(success == 0));
-
-       /* write entries in ring */
-       ENQUEUE_PTRS();
-       rte_smp_wmb();
-
-       /* if we exceed the watermark */
-       if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-                               (int)(n | RTE_RING_QUOT_EXCEED);
-               __RING_STAT_ADD(r, enq_quota, n);
-       }
-       else {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-               __RING_STAT_ADD(r, enq_success, n);
-       }
-
-       /*
-        * If there are other enqueues in progress that preceded us,
-        * we need to wait for them to complete
-        */
-       while (unlikely(r->prod.tail != prod_head)) {
-               rte_pause();
-
-               /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-                * for other thread finish. It gives pre-empted thread a chance
-                * to proceed and finish with ring dequeue operation. */
-               if (RTE_RING_PAUSE_REP_COUNT &&
-                   ++rep == RTE_RING_PAUSE_REP_COUNT) {
-                       rep = 0;
-                       sched_yield();
-               }
-       }
-       r->prod.tail = prod_next;
-       return ret;
-}
-
-/**
- * @internal Enqueue several objects on a ring (NOT multi-producers safe).
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects).
- * @param n
- *   The number of objects to add in the ring from the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-                        unsigned n, enum rte_ring_queue_behavior behavior)
-{
-       uint32_t prod_head, cons_tail;
-       uint32_t prod_next, free_entries;
-       unsigned i;
-       uint32_t mask = r->prod.mask;
-       int ret;
-
-       prod_head = r->prod.head;
-       cons_tail = r->cons.tail;
-       /* The subtraction is done between two unsigned 32bits value
-        * (the result is always modulo 32 bits even if we have
-        * prod_head > cons_tail). So 'free_entries' is always between 0
-        * and size(ring)-1. */
-       free_entries = mask + cons_tail - prod_head;
-
-       /* check that we have enough room in ring */
-       if (unlikely(n > free_entries)) {
-               if (behavior == RTE_RING_QUEUE_FIXED) {
-                       __RING_STAT_ADD(r, enq_fail, n);
-                       return -ENOBUFS;
-               }
-               else {
-                       /* No free entry available */
-                       if (unlikely(free_entries == 0)) {
-                               __RING_STAT_ADD(r, enq_fail, n);
-                               return 0;
-                       }
-
-                       n = free_entries;
-               }
-       }
-
-       prod_next = prod_head + n;
-       r->prod.head = prod_next;
-
-       /* write entries in ring */
-       ENQUEUE_PTRS();
-       rte_smp_wmb();
-
-       /* if we exceed the watermark */
-       if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-                       (int)(n | RTE_RING_QUOT_EXCEED);
-               __RING_STAT_ADD(r, enq_quota, n);
-       }
-       else {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-               __RING_STAT_ADD(r, enq_success, n);
-       }
-
-       r->prod.tail = prod_next;
-       return ret;
-}
-
-/**
- * @internal Dequeue several objects from a ring (multi-consumers safe). When
- * the request objects are more than the available objects, only dequeue the
- * actual number of objects
- *
- * This function uses a "compare and set" instruction to move the
- * consumer index atomically.
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
- * @param n
- *   The number of objects to dequeue from the ring to the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
- */
-
-static inline int __attribute__((always_inline))
-__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
-                unsigned n, enum rte_ring_queue_behavior behavior)
-{
-       uint32_t cons_head, prod_tail;
-       uint32_t cons_next, entries;
-       const unsigned max = n;
-       int success;
-       unsigned i, rep = 0;
-       uint32_t mask = r->prod.mask;
-
-       /* Avoid the unnecessary cmpset operation below, which is also
-        * potentially harmful when n equals 0. */
-       if (n == 0)
-               return 0;
-
-       /* move cons.head atomically */
-       do {
-               /* Restore n as it may change every loop */
-               n = max;
-
-               cons_head = r->cons.head;
-               prod_tail = r->prod.tail;
-               /* The subtraction is done between two unsigned 32bits value
-                * (the result is always modulo 32 bits even if we have
-                * cons_head > prod_tail). So 'entries' is always between 0
-                * and size(ring)-1. */
-               entries = (prod_tail - cons_head);
-
-               /* Set the actual entries for dequeue */
-               if (n > entries) {
-                       if (behavior == RTE_RING_QUEUE_FIXED) {
-                               __RING_STAT_ADD(r, deq_fail, n);
-                               return -ENOENT;
-                       }
-                       else {
-                               if (unlikely(entries == 0)){
-                                       __RING_STAT_ADD(r, deq_fail, n);
-                                       return 0;
-                               }
-
-                               n = entries;
-                       }
-               }
-
-               cons_next = cons_head + n;
-               success = rte_atomic32_cmpset(&r->cons.head, cons_head,
-                                             cons_next);
-       } while (unlikely(success == 0));
-
-       /* copy in table */
-       DEQUEUE_PTRS();
-       rte_smp_rmb();
-
-       /*
-        * If there are other dequeues in progress that preceded us,
-        * we need to wait for them to complete
-        */
-       while (unlikely(r->cons.tail != cons_head)) {
-               rte_pause();
-
-               /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-                * for other thread finish. It gives pre-empted thread a chance
-                * to proceed and finish with ring dequeue operation. */
-               if (RTE_RING_PAUSE_REP_COUNT &&
-                   ++rep == RTE_RING_PAUSE_REP_COUNT) {
-                       rep = 0;
-                       sched_yield();
-               }
-       }
-       __RING_STAT_ADD(r, deq_success, n);
-       r->cons.tail = cons_next;
-
-       return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
-}
-
-/**
- * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
- * When the request objects are more than the available objects, only dequeue
- * the actual number of objects
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
- * @param n
- *   The number of objects to dequeue from the ring to the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
-                unsigned n, enum rte_ring_queue_behavior behavior)
-{
-       uint32_t cons_head, prod_tail;
-       uint32_t cons_next, entries;
-       unsigned i;
-       uint32_t mask = r->prod.mask;
-
-       cons_head = r->cons.head;
-       prod_tail = r->prod.tail;
-       /* The subtraction is done between two unsigned 32bits value
-        * (the result is always modulo 32 bits even if we have
-        * cons_head > prod_tail). So 'entries' is always between 0
-        * and size(ring)-1. */
-       entries = prod_tail - cons_head;
-
-       if (n > entries) {
-               if (behavior == RTE_RING_QUEUE_FIXED) {
-                       __RING_STAT_ADD(r, deq_fail, n);
-                       return -ENOENT;
-               }
-               else {
-                       if (unlikely(entries == 0)){
-                               __RING_STAT_ADD(r, deq_fail, n);
-                               return 0;
-                       }
-
-                       n = entries;
-               }
-       }
-
-       cons_next = cons_head + n;
-       r->cons.head = cons_next;
-
-       /* copy in table */
-       DEQUEUE_PTRS();
-       rte_smp_rmb();
-
-       __RING_STAT_ADD(r, deq_success, n);
-       r->cons.tail = cons_next;
-       return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
-}
+#define __rte_ring_mp_do_enqueue rte_void___ring_mp_do_enqueue
+#define __rte_ring_sp_do_enqueue rte_void___ring_sp_do_enqueue
+#define __rte_ring_mc_do_dequeue rte_void___ring_mc_do_dequeue
+#define __rte_ring_sc_do_dequeue rte_void___ring_sc_do_dequeue
 
 /**
  * Enqueue several objects on the ring (multi-producers safe).
@@ -882,9 +526,7 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline int
 rte_ring_full(const struct rte_ring *r)
 {
-       uint32_t prod_tail = r->prod.tail;
-       uint32_t cons_tail = r->cons.tail;
-       return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
+       return rte_void_ring_full(r);
 }
 
 /**
@@ -899,9 +541,7 @@ rte_ring_full(const struct rte_ring *r)
 static inline int
 rte_ring_empty(const struct rte_ring *r)
 {
-       uint32_t prod_tail = r->prod.tail;
-       uint32_t cons_tail = r->cons.tail;
-       return !!(cons_tail == prod_tail);
+       return rte_void_ring_empty(r);
 }
 
 /**
@@ -915,9 +555,7 @@ rte_ring_empty(const struct rte_ring *r)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-       uint32_t prod_tail = r->prod.tail;
-       uint32_t cons_tail = r->cons.tail;
-       return (prod_tail - cons_tail) & r->prod.mask;
+       return rte_void_ring_count(r);
 }
 
 /**
@@ -931,9 +569,7 @@ rte_ring_count(const struct rte_ring *r)
 static inline unsigned
 rte_ring_free_count(const struct rte_ring *r)
 {
-       uint32_t prod_tail = r->prod.tail;
-       uint32_t cons_tail = r->cons.tail;
-       return (cons_tail - prod_tail - 1) & r->prod.mask;
+       return rte_void_ring_free_count(r);
 }
 
 /**
-- 
2.9.3

Reply via email to