This commit adds support for lock-free circular ring enqueue and dequeue
functions. The ring is supported on 32- and 64-bit architectures, however
it uses a 128-bit compare-and-swap instruction when run on a 64-bit
architecture, and thus is currently limited to x86_64.

The algorithm is based on Ola Liljedahl's lfring, modified to fit within
the rte ring API. With no contention, an enqueue of n pointers uses (1 + n)
CAS operations and a dequeue of n pointers uses 1. This algorithm has worse
average-case performance than the regular rte ring (particularly a
highly-contended ring with large bulk accesses), however:
- For applications with preemptible pthreads, the regular rte ring's
  worst-case performance (i.e. one thread being preempted in the
  update_tail() critical section) is much worse than the lock-free ring's.
- Software caching can mitigate the average case performance for ring-based
  algorithms. For example, a lock-free ring based mempool (a likely use
  case for this ring) with per-thread caching.

To avoid the ABA problem, each ring entry contains a modification counter.
On a 64-bit architecture, the chance of ABA occurring are effectively zero;
a 64-bit counter will take many years to wrap at current CPU frequencies.
On a 32-bit architectures, a lock-free ring must be at least 1024-entries
deep; assuming 100 cycles per ring entry access, this guarantees the ring's
modification counters will wrap on the order of days.

The lock-free ring is enabled via a new flag, RING_F_LF. Because the ring's
memsize is now a function of its flags (the lock-free ring requires 128b
for each entry), this commit adds a new argument ('flags') to
rte_ring_get_memsize(). An API deprecation notice will be sent in a
separate commit.

For ease-of-use, existing ring enqueue and dequeue functions work on both
regular and lock-free rings. This introduces an additional branch in the
datapath, but this should be a highly predictable branch.
ring_perf_autotest shows a negligible performance impact; it's hard to
distinguish a real difference versus system noise.

                                  | ring_perf_autotest cycles with branch -
             Test                 |   ring_perf_autotest cycles without
------------------------------------------------------------------
SP/SC single enq/dequeue          | 0.33
MP/MC single enq/dequeue          | -4.00
SP/SC burst enq/dequeue (size 8)  | 0.00
MP/MC burst enq/dequeue (size 8)  | 0.00
SP/SC burst enq/dequeue (size 32) | 0.00
MP/MC burst enq/dequeue (size 32) | 0.00
SC empty dequeue                  | 1.00
MC empty dequeue                  | 0.00

Single lcore:
SP/SC bulk enq/dequeue (size 8)   | 0.49
MP/MC bulk enq/dequeue (size 8)   | 0.08
SP/SC bulk enq/dequeue (size 32)  | 0.07
MP/MC bulk enq/dequeue (size 32)  | 0.09

Two physical cores:
SP/SC bulk enq/dequeue (size 8)   | 0.19
MP/MC bulk enq/dequeue (size 8)   | -0.37
SP/SC bulk enq/dequeue (size 32)  | 0.09
MP/MC bulk enq/dequeue (size 32)  | -0.05

Two NUMA nodes:
SP/SC bulk enq/dequeue (size 8)   | -1.96
MP/MC bulk enq/dequeue (size 8)   | 0.88
SP/SC bulk enq/dequeue (size 32)  | 0.10
MP/MC bulk enq/dequeue (size 32)  | 0.46

Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. Each test run three
times and the results averaged.

Signed-off-by: Gage Eads <gage.e...@intel.com>
---
 lib/librte_ring/rte_ring.c           |  92 +++++++--
 lib/librte_ring/rte_ring.h           | 308 ++++++++++++++++++++++++++---
 lib/librte_ring/rte_ring_c11_mem.h   | 366 ++++++++++++++++++++++++++++++++++-
 lib/librte_ring/rte_ring_generic.h   | 357 +++++++++++++++++++++++++++++++++-
 lib/librte_ring/rte_ring_version.map |   7 +
 5 files changed, 1082 insertions(+), 48 deletions(-)

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d215acecc..d4a176f57 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
 {
-       ssize_t sz;
+       ssize_t sz, elt_sz;
 
        /* count must be a power of 2 */
        if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
                return -EINVAL;
        }
 
-       sz = sizeof(struct rte_ring) + count * sizeof(void *);
+       elt_sz = (flags & RING_F_LF) ? 2 * sizeof(void *) : sizeof(void *);
+
+       sz = sizeof(struct rte_ring) + count * elt_sz;
        sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
        return sz;
 }
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+                                              unsigned int flags),
+                 rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+       return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -75,6 +88,8 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned 
count,
                          RTE_CACHE_LINE_MASK) != 0);
        RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
                          RTE_CACHE_LINE_MASK) != 0);
+       RTE_BUILD_BUG_ON(sizeof(struct rte_ring_lf_entry) !=
+                        2 * sizeof(void *));
 
        /* init the ring structure */
        memset(r, 0, sizeof(*r));
@@ -82,8 +97,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned 
count,
        if (ret < 0 || ret >= (int)sizeof(r->name))
                return -ENAMETOOLONG;
        r->flags = flags;
-       r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
-       r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
 
        if (flags & RING_F_EXACT_SZ) {
                r->size = rte_align32pow2(count + 1);
@@ -100,12 +113,46 @@ rte_ring_init(struct rte_ring *r, const char *name, 
unsigned count,
                r->mask = count - 1;
                r->capacity = r->mask;
        }
-       r->prod.head = r->cons.head = 0;
-       r->prod.tail = r->cons.tail = 0;
+
+       r->log2_size = rte_log2_u64(r->size);
+
+       if (flags & RING_F_LF) {
+               uint32_t i;
+
+               r->prod_ptr.single =
+                       (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+               r->cons_ptr.single =
+                       (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+               r->prod_ptr.head = r->cons_ptr.head = 0;
+               r->prod_ptr.tail = r->cons_ptr.tail = 0;
+
+               for (i = 0; i < r->size; i++) {
+                       struct rte_ring_lf_entry *ring_ptr, *base;
+
+                       base = (struct rte_ring_lf_entry *)&r->ring;
+
+                       ring_ptr = &base[i & r->mask];
+
+                       ring_ptr->cnt = 0;
+               }
+       } else {
+               r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+               r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+               r->prod.head = r->cons.head = 0;
+               r->prod.tail = r->cons.tail = 0;
+       }
 
        return 0;
 }
 
+/* If a ring entry is written on average every M cycles, then a ring entry is
+ * reused every M*count cycles, and a ring entry's counter repeats every
+ * M*count*2^32 cycles. If M=100 on a 2GHz system, then a 1024-entry ring's
+ * counters would repeat every 2.37 days. The likelihood of ABA occurring is
+ * considered sufficiently low for 1024-entry and larger rings.
+ */
+#define MIN_32_BIT_LF_RING_SIZE 1024
+
 /* create the ring */
 struct rte_ring *
 rte_ring_create(const char *name, unsigned count, int socket_id,
@@ -123,11 +170,25 @@ rte_ring_create(const char *name, unsigned count, int 
socket_id,
 
        ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
 
+#ifdef RTE_ARCH_64
+#if !defined(RTE_ARCH_X86_64)
+       printf("This platform does not support the atomic operation required 
for RING_F_LF\n");
+       rte_errno = EINVAL;
+       return NULL;
+#endif
+#else
+       if ((flags & RING_F_LF) && count < MIN_32_BIT_LF_RING_SIZE) {
+               printf("RING_F_LF is only supported on 32-bit platforms for 
rings with at least 1024 entries.\n");
+               rte_errno = EINVAL;
+               return NULL;
+       }
+#endif
+
        /* for an exact size ring, round up from count to a power of two */
        if (flags & RING_F_EXACT_SZ)
                count = rte_align32pow2(count + 1);
 
-       ring_size = rte_ring_get_memsize(count);
+       ring_size = rte_ring_get_memsize(count, flags);
        if (ring_size < 0) {
                rte_errno = ring_size;
                return NULL;
@@ -227,10 +288,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
        fprintf(f, "  flags=%x\n", r->flags);
        fprintf(f, "  size=%"PRIu32"\n", r->size);
        fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
-       fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-       fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
-       fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-       fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+       if (r->flags & RING_F_LF) {
+               fprintf(f, "  ct=%"PRIuPTR"\n", r->cons_ptr.tail);
+               fprintf(f, "  ch=%"PRIuPTR"\n", r->cons_ptr.head);
+               fprintf(f, "  pt=%"PRIuPTR"\n", r->prod_ptr.tail);
+               fprintf(f, "  ph=%"PRIuPTR"\n", r->prod_ptr.head);
+       } else {
+               fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
+               fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+               fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
+               fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+       }
        fprintf(f, "  used=%u\n", rte_ring_count(r));
        fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index f16d77b8a..200d7b2a0 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -20,7 +20,7 @@
  *
  * - FIFO (First In First Out)
  * - Maximum size is fixed; the pointers are stored in a table.
- * - Lockless implementation.
+ * - Lockless (and optionally, non-blocking/lock-free) implementation.
  * - Multi- or single-consumer dequeue.
  * - Multi- or single-producer enqueue.
  * - Bulk dequeue.
@@ -98,6 +98,7 @@ struct rte_ring {
        const struct rte_memzone *memzone;
                        /**< Memzone, if any, containing the rte_ring */
        uint32_t size;           /**< Size of ring. */
+       uint32_t log2_size;      /**< log2(size of ring) */
        uint32_t mask;           /**< Mask (size-1) of ring. */
        uint32_t capacity;       /**< Usable size of ring */
 
@@ -133,6 +134,18 @@ struct rte_ring {
  */
 #define RING_F_EXACT_SZ 0x0004
 #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses lock-free enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * lock-free functions have worse average-case performance than their regular
+ * rte ring counterparts. When used as the handler for a mempool, per-thread
+ * caching can mitigate the performance difference by reducing the number (and
+ * contention) of ring accesses.
+ *
+ * This flag is only supported on 32-bit and x86_64 platforms.
+ */
+#define RING_F_LF 0x0008
 
 /* @internal defines for passing to the enqueue dequeue worker functions */
 #define __IS_SP 1
@@ -150,11 +163,15 @@ struct rte_ring {
  *
  * @param count
  *   The number of elements in the ring (must be a power of 2).
+ * @param flags
+ *   The flags the ring will be created with.
  * @return
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
 
 /**
  * Initialize a ring structure.
@@ -187,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ *      dequeue and enqueue functions.
  * @return
  *   0 on success, or a negative value on error.
  */
@@ -222,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, 
unsigned count,
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ *      dequeue and enqueue functions.
  * @return
  *   On success, the pointer to the new allocated ring. NULL on error with
  *    rte_errno set appropriately. Possible errno values include:
  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config 
structure
  *    - E_RTE_SECONDARY - function was called from a secondary process instance
- *    - EINVAL - count provided is not a power of 2
+ *    - EINVAL - count provided is not a power of 2, or RING_F_LF is used on an
+ *      unsupported platform
  *    - ENOSPC - the maximum number of memzones has already been allocated
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
@@ -283,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
        } \
 } while (0)
 
+/* The actual enqueue of pointers on the lock-free ring, used by the
+ * single-producer lock-free enqueue function.
+ */
+#define ENQUEUE_PTRS_LF(r, base, prod_head, obj_table, n) do { \
+       unsigned int i; \
+       const uint32_t size = (r)->size; \
+       size_t idx = prod_head & (r)->mask; \
+       size_t new_cnt = prod_head + size; \
+       struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+       unsigned int mask = ~0x3; \
+       if (likely(idx + n < size)) { \
+               for (i = 0; i < (n & mask); i += 4, idx += 4) { \
+                       ring[idx].ptr = obj_table[i]; \
+                       ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+                       ring[idx + 1].ptr = obj_table[i + 1]; \
+                       ring[idx + 1].cnt = (new_cnt + i + 1) >> r->log2_size; \
+                       ring[idx + 2].ptr = obj_table[i + 2]; \
+                       ring[idx + 2].cnt = (new_cnt + i + 2) >> r->log2_size; \
+                       ring[idx + 3].ptr = obj_table[i + 3]; \
+                       ring[idx + 3].cnt = (new_cnt + i + 3) >> r->log2_size; \
+               } \
+               switch (n & 0x3) { \
+               case 3: \
+                       ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+                       ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+               case 2: \
+                       ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+                       ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+               case 1: \
+                       ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+                       ring[idx++].ptr = obj_table[i++]; \
+               } \
+       } else { \
+               for (i = 0; idx < size; i++, idx++) { \
+                       ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+                       ring[idx].ptr = obj_table[i]; \
+               } \
+               for (idx = 0; i < n; i++, idx++) {    \
+                       ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+                       ring[idx].ptr = obj_table[i]; \
+               } \
+       } \
+} while (0)
+
 /* the actual copy of pointers on the ring to obj_table.
  * Placed here since identical code needed in both
  * single and multi consumer dequeue functions */
@@ -314,6 +384,43 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
        } \
 } while (0)
 
+/* The actual copy of pointers on the lock-free ring to obj_table. */
+#define DEQUEUE_PTRS_LF(r, base, cons_head, obj_table, n) do { \
+       unsigned int i; \
+       size_t idx = cons_head & (r)->mask; \
+       const uint32_t size = (r)->size; \
+       struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+       unsigned int mask = ~0x3; \
+       if (likely(idx + n < size)) { \
+               for (i = 0; i < (n & mask); i += 4, idx += 4) {\
+                       obj_table[i] = ring[idx].ptr; \
+                       obj_table[i + 1] = ring[idx + 1].ptr; \
+                       obj_table[i + 2] = ring[idx + 2].ptr; \
+                       obj_table[i + 3] = ring[idx + 3].ptr; \
+               } \
+               switch (n & 0x3) { \
+               case 3: \
+                       obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+               case 2: \
+                       obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+               case 1: \
+                       obj_table[i++] = ring[idx++].ptr; \
+               } \
+       } else { \
+               for (i = 0; idx < size; i++, idx++) \
+                       obj_table[i] = ring[idx].ptr; \
+               for (idx = 0; i < n; i++, idx++) \
+                       obj_table[i] = ring[idx].ptr; \
+       } \
+} while (0)
+
+
+/* @internal 128-bit structure used by the lock-free ring */
+struct rte_ring_lf_entry {
+       void *ptr; /**< Data pointer */
+       uintptr_t cnt; /**< Modification counter */
+};
+
 /* Between load and load. there might be cpu reorder in weak model
  * (powerpc/arm).
  * There are 2 choices for the users
@@ -330,6 +437,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 #endif
 
 /**
+ * @internal Enqueue several objects on the lock-free ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue(struct rte_ring *r, void * const *obj_table,
+                        unsigned int n, enum rte_ring_queue_behavior behavior,
+                        unsigned int is_sp, unsigned int *free_space)
+{
+       if (is_sp)
+               return __rte_ring_do_lf_enqueue_sp(r, obj_table, n,
+                                                  behavior, free_space);
+       else
+               return __rte_ring_do_lf_enqueue_mp(r, obj_table, n,
+                                                  behavior, free_space);
+}
+
+/**
+ * @internal Dequeue several objects from the lock-free ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue(struct rte_ring *r, void **obj_table,
+                unsigned int n, enum rte_ring_queue_behavior behavior,
+                unsigned int is_sc, unsigned int *available)
+{
+       if (is_sc)
+               return __rte_ring_do_lf_dequeue_sc(r, obj_table, n,
+                                                  behavior, available);
+       else
+               return __rte_ring_do_lf_dequeue_mc(r, obj_table, n,
+                                                  behavior, available);
+}
+
+/**
  * @internal Enqueue several objects on the ring
  *
   * @param r
@@ -436,8 +607,14 @@ static __rte_always_inline unsigned int
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
                         unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-                       __IS_MP, free_space);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_enqueue(r, obj_table, n,
+                                               RTE_RING_QUEUE_FIXED, __IS_MP,
+                                               free_space);
+       else
+               return __rte_ring_do_enqueue(r, obj_table, n,
+                                            RTE_RING_QUEUE_FIXED, __IS_MP,
+                                            free_space);
 }
 
 /**
@@ -459,8 +636,14 @@ static __rte_always_inline unsigned int
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
                         unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-                       __IS_SP, free_space);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_enqueue(r, obj_table, n,
+                                               RTE_RING_QUEUE_FIXED, __IS_SP,
+                                               free_space);
+       else
+               return __rte_ring_do_enqueue(r, obj_table, n,
+                                            RTE_RING_QUEUE_FIXED, __IS_SP,
+                                            free_space);
 }
 
 /**
@@ -486,8 +669,14 @@ static __rte_always_inline unsigned int
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
                      unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-                       r->prod.single, free_space);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_enqueue(r, obj_table, n,
+                                               RTE_RING_QUEUE_FIXED,
+                                               r->prod_ptr.single, free_space);
+       else
+               return __rte_ring_do_enqueue(r, obj_table, n,
+                                            RTE_RING_QUEUE_FIXED,
+                                            r->prod.single, free_space);
 }
 
 /**
@@ -570,8 +759,14 @@ static __rte_always_inline unsigned int
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
                unsigned int n, unsigned int *available)
 {
-       return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-                       __IS_MC, available);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_dequeue(r, obj_table, n,
+                                               RTE_RING_QUEUE_FIXED, __IS_MC,
+                                               available);
+       else
+               return __rte_ring_do_dequeue(r, obj_table, n,
+                                            RTE_RING_QUEUE_FIXED, __IS_MC,
+                                            available);
 }
 
 /**
@@ -594,8 +789,14 @@ static __rte_always_inline unsigned int
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
                unsigned int n, unsigned int *available)
 {
-       return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-                       __IS_SC, available);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_dequeue(r, obj_table, n,
+                                               RTE_RING_QUEUE_FIXED, __IS_SC,
+                                               available);
+       else
+               return __rte_ring_do_dequeue(r, obj_table, n,
+                                            RTE_RING_QUEUE_FIXED, __IS_SC,
+                                            available);
 }
 
 /**
@@ -621,8 +822,14 @@ static __rte_always_inline unsigned int
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
                unsigned int *available)
 {
-       return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-                               r->cons.single, available);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_dequeue(r, obj_table, n,
+                                               RTE_RING_QUEUE_FIXED,
+                                               r->cons_ptr.single, available);
+       else
+               return __rte_ring_do_dequeue(r, obj_table, n,
+                                            RTE_RING_QUEUE_FIXED,
+                                            r->cons.single, available);
 }
 
 /**
@@ -697,9 +904,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-       uint32_t prod_tail = r->prod.tail;
-       uint32_t cons_tail = r->cons.tail;
-       uint32_t count = (prod_tail - cons_tail) & r->mask;
+       uint32_t count;
+
+       if (r->flags & RING_F_LF)
+               count = (r->prod_ptr.tail - r->cons_ptr.tail) & r->mask;
+       else
+               count = (r->prod.tail - r->cons.tail) & r->mask;
+
        return (count > r->capacity) ? r->capacity : count;
 }
 
@@ -819,8 +1030,14 @@ static __rte_always_inline unsigned
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
                         unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_do_enqueue(r, obj_table, n,
-                       RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_enqueue(r, obj_table, n,
+                                               RTE_RING_QUEUE_VARIABLE,
+                                               __IS_MP, free_space);
+       else
+               return __rte_ring_do_enqueue(r, obj_table, n,
+                                            RTE_RING_QUEUE_VARIABLE,
+                                            __IS_MP, free_space);
 }
 
 /**
@@ -842,8 +1059,14 @@ static __rte_always_inline unsigned
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
                         unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_do_enqueue(r, obj_table, n,
-                       RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_enqueue(r, obj_table, n,
+                                               RTE_RING_QUEUE_VARIABLE,
+                                               __IS_SP, free_space);
+       else
+               return __rte_ring_do_enqueue(r, obj_table, n,
+                                            RTE_RING_QUEUE_VARIABLE,
+                                            __IS_SP, free_space);
 }
 
 /**
@@ -869,8 +1092,14 @@ static __rte_always_inline unsigned
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
                      unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
-                       r->prod.single, free_space);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_enqueue(r, obj_table, n,
+                                               RTE_RING_QUEUE_VARIABLE,
+                                               r->prod_ptr.single, free_space);
+       else
+               return __rte_ring_do_enqueue(r, obj_table, n,
+                                            RTE_RING_QUEUE_VARIABLE,
+                                            r->prod.single, free_space);
 }
 
 /**
@@ -897,8 +1126,14 @@ static __rte_always_inline unsigned
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
                unsigned int n, unsigned int *available)
 {
-       return __rte_ring_do_dequeue(r, obj_table, n,
-                       RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_dequeue(r, obj_table, n,
+                                               RTE_RING_QUEUE_VARIABLE,
+                                               __IS_MC, available);
+       else
+               return __rte_ring_do_dequeue(r, obj_table, n,
+                                            RTE_RING_QUEUE_VARIABLE,
+                                            __IS_MC, available);
 }
 
 /**
@@ -922,8 +1157,14 @@ static __rte_always_inline unsigned
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
                unsigned int n, unsigned int *available)
 {
-       return __rte_ring_do_dequeue(r, obj_table, n,
-                       RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_dequeue(r, obj_table, n,
+                                               RTE_RING_QUEUE_VARIABLE,
+                                               __IS_SC, available);
+       else
+               return __rte_ring_do_dequeue(r, obj_table, n,
+                                            RTE_RING_QUEUE_VARIABLE,
+                                            __IS_SC, available);
 }
 
 /**
@@ -949,9 +1190,14 @@ static __rte_always_inline unsigned
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
                unsigned int n, unsigned int *available)
 {
-       return __rte_ring_do_dequeue(r, obj_table, n,
-                               RTE_RING_QUEUE_VARIABLE,
-                               r->cons.single, available);
+       if (r->flags & RING_F_LF)
+               return __rte_ring_do_lf_dequeue(r, obj_table, n,
+                                               RTE_RING_QUEUE_VARIABLE,
+                                               r->cons_ptr.single, available);
+       else
+               return __rte_ring_do_dequeue(r, obj_table, n,
+                                            RTE_RING_QUEUE_VARIABLE,
+                                            r->cons.single, available);
 }
 
 #ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_c11_mem.h 
b/lib/librte_ring/rte_ring_c11_mem.h
index 545caf257..3e94ac2c4 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -1,5 +1,7 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  *
+ * Copyright (c) 2018-2019 Intel Corporation
+ * Copyright (c) 2018-2019 Arm Limited
  * Copyright (c) 2017,2018 HXT-semitech Corporation.
  * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org
  * All rights reserved.
@@ -221,8 +223,8 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned 
int is_sp,
                /* Ensure the head is read before tail */
                __atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-               /* load-acquire synchronize with store-release of ht->tail
-                * in update_tail.
+               /* load-acquire synchronize with store-release of tail in
+                * __rte_ring_do_lf_dequeue_{sc, mc}.
                 */
                cons_tail = __atomic_load_n(&r->cons_ptr.tail,
                                        __ATOMIC_ACQUIRE);
@@ -247,6 +249,7 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned 
int is_sp,
                                        0, __ATOMIC_RELAXED,
                                        __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
+
        return n;
 }
 
@@ -293,8 +296,8 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned 
int is_sc,
                /* Ensure the head is read before tail */
                __atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-               /* this load-acquire synchronize with store-release of ht->tail
-                * in update_tail.
+               /* load-acquire synchronize with store-release of tail in
+                * __rte_ring_do_lf_enqueue_{sp, mp}.
                 */
                prod_tail = __atomic_load_n(&r->prod_ptr.tail,
                                        __ATOMIC_ACQUIRE);
@@ -318,6 +321,361 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, 
unsigned int is_sc,
                                                        0, __ATOMIC_RELAXED,
                                                        __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
+
+       return n;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *free_space)
+{
+       uint32_t free_entries;
+       uintptr_t head, next;
+
+       n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+                                         &head, &next, &free_entries);
+       if (n == 0)
+               goto end;
+
+       ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+       __atomic_store_n(&r->prod_ptr.tail,
+                        r->prod_ptr.tail + n,
+                        __ATOMIC_RELEASE);
+end:
+       if (free_space != NULL)
+               *free_space = free_entries - n;
+       return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ *   Get the next producer tail index.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the ring's tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_load_tail(struct rte_ring *r, uintptr_t idx)
+{
+       uintptr_t fresh = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+
+       if ((intptr_t)(idx - fresh) < 0)
+               idx = fresh; /* fresh is after idx, use it instead */
+       else
+               idx++; /* Continue with next slot */
+
+       return idx;
+}
+
+/**
+ * @internal
+ *   Update the ring's producer tail index. If another thread already updated
+ *   the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the shared tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline void
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+       volatile uintptr_t *loc = &r->prod_ptr.tail;
+       uintptr_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+
+       do {
+               /* Check if the tail has already been updated. */
+               if ((intptr_t)(val - old) < 0)
+                       return;
+
+               /* Else val >= old, need to update *loc */
+       } while (!__atomic_compare_exchange_n(loc, &old, val,
+                                             1, __ATOMIC_RELEASE,
+                                             __ATOMIC_RELAXED));
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+       RTE_SET_USED(r);
+       RTE_SET_USED(obj_table);
+       RTE_SET_USED(n);
+       RTE_SET_USED(behavior);
+       RTE_SET_USED(free_space);
+       printf("[%s()] RING_F_LF requires an experimental API."
+              " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+              , __func__);
+       return 0;
+#else
+       struct rte_ring_lf_entry *base;
+       uintptr_t head, next, tail;
+       unsigned int i;
+       uint32_t avail;
+
+       /* Atomically update the prod head to reserve n slots. The prod tail
+        * is modified at the end of the function.
+        */
+       n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+                                         &head, &next, &avail);
+
+       tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+       head = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE);
+
+       if (unlikely(n == 0))
+               goto end;
+
+       base = (struct rte_ring_lf_entry *)&r->ring;
+
+       for (i = 0; i < n; i++) {
+               unsigned int retries = 0;
+               int success = 0;
+
+               /* Enqueue to the tail entry. If another thread wins the race,
+                * retry with the new tail.
+                */
+               do {
+                       struct rte_ring_lf_entry old_value, new_value;
+                       struct rte_ring_lf_entry *ring_ptr;
+
+                       ring_ptr = &base[tail & r->mask];
+
+                       old_value = *ring_ptr;
+
+                       if (old_value.cnt != (tail >> r->log2_size)) {
+                               /* This slot has already been used. Depending
+                                * on how far behind this thread is, either go
+                                * to the next slot or reload the tail.
+                                */
+                               uintptr_t next_tail;
+
+                               next_tail = (tail + r->size) >> r->log2_size;
+
+                               if (old_value.cnt != next_tail ||
+                                   ++retries == ENQ_RETRY_LIMIT) {
+                                       /* This thread either fell 2+ laps
+                                        * behind or hit the retry limit, so
+                                        * reload the tail index.
+                                        */
+                                       tail = __rte_ring_lf_load_tail(r, tail);
+                                       retries = 0;
+                               } else {
+                                       /* Slot already used, try the next. */
+                                       tail++;
+
+                               }
+
+                               continue;
+                       }
+
+                       /* Found a free slot, try to enqueue next element. */
+                       new_value.ptr = obj_table[i];
+                       new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+                       success = rte_atomic128_cmp_exchange(
+                                       (rte_int128_t *)ring_ptr,
+                                       (rte_int128_t *)&old_value,
+                                       (rte_int128_t *)&new_value,
+                                       1, __ATOMIC_RELEASE,
+                                       __ATOMIC_RELAXED);
+#else
+                       success = __atomic_compare_exchange(
+                                       (uint64_t *)ring_ptr,
+                                       &old_value,
+                                       &new_value,
+                                       1, __ATOMIC_RELEASE,
+                                       __ATOMIC_RELAXED);
+#endif
+               } while (success == 0);
+
+               /* Only increment tail if the CAS succeeds, since it can
+                * spuriously fail on some architectures.
+                */
+               tail++;
+       }
+
+end:
+       __rte_ring_lf_update_tail(r, tail);
+
+       if (free_space != NULL)
+               *free_space = avail - n;
+       return n;
+#endif
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *available)
+{
+       uintptr_t cons_tail, prod_tail, avail;
+
+       cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+       prod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE);
+
+       avail = prod_tail - cons_tail;
+
+       /* Set the actual entries for dequeue */
+       if (unlikely(avail < n))
+               n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+       if (unlikely(n == 0))
+               goto end;
+
+       DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+       /* Use a read barrier and store-relaxed so we don't unnecessarily order
+        * writes.
+        */
+       rte_smp_rmb();
+
+       __atomic_store_n(&r->cons_ptr.tail, cons_tail + n, __ATOMIC_RELAXED);
+end:
+       if (available != NULL)
+               *available = avail - n;
+
+       return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *available)
+{
+       uintptr_t cons_tail, prod_tail, avail;
+
+       cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+
+       do {
+               /* Load tail on every iteration to avoid spurious queue empty
+                * situations.
+                */
+               prod_tail = __atomic_load_n(&r->prod_ptr.tail,
+                                           __ATOMIC_ACQUIRE);
+
+               avail = prod_tail - cons_tail;
+
+               /* Set the actual entries for dequeue */
+               if (unlikely(avail < n))
+                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+               if (unlikely(n == 0))
+                       goto end;
+
+               DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+               /* Use a read barrier and store-relaxed so we don't
+                * unnecessarily order writes.
+                */
+               rte_smp_rmb();
+
+       } while (!__atomic_compare_exchange_n(&r->cons_ptr.tail,
+                                             &cons_tail, cons_tail + n,
+                                             0, __ATOMIC_RELAXED,
+                                             __ATOMIC_RELAXED));
+
+end:
+       if (available != NULL)
+               *available = avail - n;
+
        return n;
 }
 
diff --git a/lib/librte_ring/rte_ring_generic.h 
b/lib/librte_ring/rte_ring_generic.h
index 6a0e1bbfb..322d003ad 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -1,6 +1,7 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2010-2019 Intel Corporation
+ * Copyright (c) 2018-2019 Arm Limited
  * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org
  * All rights reserved.
  * Derived from FreeBSD's bufring.h
@@ -297,4 +298,358 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, 
unsigned int is_sc,
        return n;
 }
 
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *free_space)
+{
+       uint32_t free_entries;
+       uintptr_t head, next;
+
+       n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+                                         &head, &next, &free_entries);
+       if (n == 0)
+               goto end;
+
+       ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+       rte_smp_wmb();
+
+       r->prod_ptr.tail += n;
+end:
+       if (free_space != NULL)
+               *free_space = free_entries - n;
+       return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ *   Get the next producer tail index.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the ring's tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_load_tail(struct rte_ring *r, uintptr_t idx)
+{
+       uintptr_t fresh = r->prod_ptr.tail;
+
+       if ((intptr_t)(idx - fresh) < 0)
+               /* fresh is after idx, use it instead */
+               idx = fresh;
+       else
+               /* Continue with next slot */
+               idx++;
+
+       return idx;
+}
+
+/**
+ * @internal
+ *   Update the ring's producer tail index. If another thread already updated
+ *   the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the shared tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline void
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+       volatile uintptr_t *loc = &r->prod_ptr.tail;
+       uintptr_t old;
+
+       do {
+               old = *loc;
+
+               /* Check if the tail has already been updated. */
+               if ((intptr_t)(val - old) < 0)
+                       return;
+
+               /* Else val >= old, need to update *loc */
+       } while (!__sync_bool_compare_and_swap(loc, old, val));
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+       RTE_SET_USED(r);
+       RTE_SET_USED(obj_table);
+       RTE_SET_USED(n);
+       RTE_SET_USED(behavior);
+       RTE_SET_USED(free_space);
+       printf("[%s()] RING_F_LF requires an experimental API."
+              " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+              , __func__);
+       return 0;
+#else
+       struct rte_ring_lf_entry *base;
+       uintptr_t head, next, tail;
+       unsigned int i;
+       uint32_t avail;
+
+       /* Atomically update the prod head to reserve n slots. The prod tail
+        * is modified at the end of the function.
+        */
+       n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+                                         &head, &next, &avail);
+
+       tail = r->prod_ptr.tail;
+
+       rte_smp_rmb();
+
+       head = r->cons_ptr.tail;
+
+       if (unlikely(n == 0))
+               goto end;
+
+       base = (struct rte_ring_lf_entry *)&r->ring;
+
+       for (i = 0; i < n; i++) {
+               unsigned int retries = 0;
+               int success = 0;
+
+               /* Enqueue to the tail entry. If another thread wins the race,
+                * retry with the new tail.
+                */
+               do {
+                       struct rte_ring_lf_entry old_value, new_value;
+                       struct rte_ring_lf_entry *ring_ptr;
+
+                       ring_ptr = &base[tail & r->mask];
+
+                       old_value = *ring_ptr;
+
+                       if (old_value.cnt != (tail >> r->log2_size)) {
+                               /* This slot has already been used. Depending
+                                * on how far behind this thread is, either go
+                                * to the next slot or reload the tail.
+                                */
+                               uintptr_t next_tail;
+
+                               next_tail = (tail + r->size) >> r->log2_size;
+
+                               if (old_value.cnt != next_tail ||
+                                   ++retries == ENQ_RETRY_LIMIT) {
+                                       /* This thread either fell 2+ laps
+                                        * behind or hit the retry limit, so
+                                        * reload the tail index.
+                                        */
+                                       tail = __rte_ring_lf_load_tail(r, tail);
+                                       retries = 0;
+                               } else {
+                                       /* Slot already used, try the next. */
+                                       tail++;
+
+                               }
+
+                               continue;
+                       }
+
+                       /* Found a free slot, try to enqueue next element. */
+                       new_value.ptr = obj_table[i];
+                       new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+                       success = rte_atomic128_cmp_exchange(
+                                       (rte_int128_t *)ring_ptr,
+                                       (rte_int128_t *)&old_value,
+                                       (rte_int128_t *)&new_value,
+                                       1, __ATOMIC_RELEASE,
+                                       __ATOMIC_RELAXED);
+#else
+                       uint64_t *old_ptr = (uint64_t *)&old_value;
+                       uint64_t *new_ptr = (uint64_t *)&new_value;
+
+                       success = rte_atomic64_cmpset(
+                                       (volatile uint64_t *)ring_ptr,
+                                       *old_ptr, *new_ptr);
+#endif
+               } while (success == 0);
+
+               /* Only increment tail if the CAS succeeds, since it can
+                * spuriously fail on some architectures.
+                */
+               tail++;
+       }
+
+end:
+
+       __rte_ring_lf_update_tail(r, tail);
+
+       if (free_space != NULL)
+               *free_space = avail - n;
+       return n;
+#endif
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *available)
+{
+       uintptr_t cons_tail, prod_tail, avail;
+
+       cons_tail = r->cons_ptr.tail;
+
+       rte_smp_rmb();
+
+       prod_tail = r->prod_ptr.tail;
+
+       avail = prod_tail - cons_tail;
+
+       /* Set the actual entries for dequeue */
+       if (unlikely(avail < n))
+               n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+       if (unlikely(n == 0))
+               goto end;
+
+       DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+       rte_smp_rmb();
+
+       r->cons_ptr.tail += n;
+end:
+       if (available != NULL)
+               *available = avail - n;
+
+       return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+                           unsigned int n,
+                           enum rte_ring_queue_behavior behavior,
+                           unsigned int *available)
+{
+       uintptr_t cons_tail, prod_tail, avail;
+
+       do {
+               cons_tail = r->cons_ptr.tail;
+
+               rte_smp_rmb();
+
+               /* Load tail on every iteration to avoid spurious queue empty
+                * situations.
+                */
+               prod_tail = r->prod_ptr.tail;
+
+               avail = prod_tail - cons_tail;
+
+               /* Set the actual entries for dequeue */
+               if (unlikely(avail < n))
+                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+               if (unlikely(n == 0))
+                       goto end;
+
+               DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+       } while (!__sync_bool_compare_and_swap(&r->cons_ptr.tail,
+                                              cons_tail, cons_tail + n));
+
+end:
+       if (available != NULL)
+               *available = avail - n;
+
+       return n;
+}
+
 #endif /* _RTE_RING_GENERIC_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map 
b/lib/librte_ring/rte_ring_version.map
index d935efd0d..8969467af 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,10 @@ DPDK_2.2 {
        rte_ring_free;
 
 } DPDK_2.0;
+
+DPDK_19.05 {
+       global:
+
+       rte_ring_get_memsize;
+
+} DPDK_2.2;
-- 
2.13.6

Reply via email to